Latency fixes

Former-commit-id: 6857c4c2085d3b0902d7b2ece66b3fe8828dd805
This commit is contained in:
John Sully 2020-07-01 21:59:38 -04:00
parent d28ef47458
commit 10585e4b3b
2 changed files with 13 additions and 10 deletions

View File

@ -164,7 +164,7 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
int ret = write(conn->fd, data, data_len);
if (ret < 0 && errno != EAGAIN) {
conn->last_errno = errno;
conn->state = CONN_STATE_ERROR;
conn->state.store(CONN_STATE_ERROR, std::memory_order_relaxed);
}
return ret;
@ -173,10 +173,10 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
int ret = read(conn->fd, buf, buf_len);
if (!ret) {
conn->state = CONN_STATE_CLOSED;
conn->state.store(CONN_STATE_CLOSED, std::memory_order_release);
} else if (ret < 0 && errno != EAGAIN) {
conn->last_errno = errno;
conn->state = CONN_STATE_ERROR;
conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
}
return ret;
@ -253,14 +253,14 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
UNUSED(fd);
connection *conn = (connection*)clientData;
if (conn->state == CONN_STATE_CONNECTING &&
if (conn->state.load(std::memory_order_relaxed) == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {
if (connGetSocketError(conn)) {
conn->last_errno = errno;
conn->state = CONN_STATE_ERROR;
conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
} else {
conn->state = CONN_STATE_CONNECTED;
conn->state.store(CONN_STATE_CONNECTED, std::memory_order_release);
}
if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
@ -407,7 +407,7 @@ int connRecvTimeout(connection *conn, long long ms) {
}
int connGetState(connection *conn) {
return conn->state;
return conn->state.load(std::memory_order_relaxed);
}
void connSetThreadAffinity(connection *conn, int cpu) {

View File

@ -1928,9 +1928,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
}
}
AeLocker locker;
locker.arm(nullptr);
ProcessPendingAsyncWrites();
if (listLength(serverTL->clients_pending_asyncwrite))
{
AeLocker locker;
locker.arm(nullptr);
ProcessPendingAsyncWrites();
}
return processed;
}