From 10585e4b3b09e637ca46a12174051bfcc9c06454 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 1 Jul 2020 21:59:38 -0400 Subject: [PATCH] Latency fixes Former-commit-id: 6857c4c2085d3b0902d7b2ece66b3fe8828dd805 --- src/connection.cpp | 14 +++++++------- src/networking.cpp | 9 ++++++--- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 05f49f689..defd5eb9d 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -164,7 +164,7 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len) int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_relaxed); } return ret; @@ -173,10 +173,10 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len) static int connSocketRead(connection *conn, void *buf, size_t buf_len) { int ret = read(conn->fd, buf, buf_len); if (!ret) { - conn->state = CONN_STATE_CLOSED; + conn->state.store(CONN_STATE_CLOSED, std::memory_order_release); } else if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_release); } return ret; @@ -253,14 +253,14 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD UNUSED(fd); connection *conn = (connection*)clientData; - if (conn->state == CONN_STATE_CONNECTING && + if (conn->state.load(std::memory_order_relaxed) == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) { if (connGetSocketError(conn)) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_release); } else { - conn->state = CONN_STATE_CONNECTED; + conn->state.store(CONN_STATE_CONNECTED, std::memory_order_release); } if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); @@ -407,7 +407,7 @@ int connRecvTimeout(connection *conn, long long ms) { } int connGetState(connection *conn) { - return conn->state; + return conn->state.load(std::memory_order_relaxed); } void connSetThreadAffinity(connection *conn, int cpu) { diff --git a/src/networking.cpp b/src/networking.cpp index 619db0786..f54870795 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1928,9 +1928,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { } } - AeLocker locker; - locker.arm(nullptr); - ProcessPendingAsyncWrites(); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + AeLocker locker; + locker.arm(nullptr); + ProcessPendingAsyncWrites(); + } return processed; }