diff --git a/src/connection.cpp b/src/connection.cpp index fe987e3f0..72e3f9f4b 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -29,6 +29,7 @@ #include "server.h" #include "connhelpers.h" +#include "aelocker.h" /* The connections module provides a lean abstraction of network connections * to avoid direct socket and async event management across the Redis code base. @@ -206,14 +207,15 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; - int flags = AE_WRITABLE; if (fThreadSafe) - flags |= AE_WRITE_THREADSAFE; + conn->flags |= CONN_FLAG_WRITE_THREADSAFE; + else + conn->flags &= ~CONN_FLAG_WRITE_THREADSAFE; if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); else - if (aeCreateFileEvent(serverTL->el,conn->fd,flags, + if (aeCreateFileEvent(serverTL->el,conn->fd,AE_WRITABLE|AE_WRITE_THREADSAFE, conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } @@ -223,17 +225,18 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu */ static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) { if (func == conn->read_handler) return C_OK; - - int flags = AE_READABLE; + if (fThreadSafe) - flags |= AE_READ_THREADSAFE; + conn->flags |= CONN_FLAG_READ_THREADSAFE; + else + conn->flags &= ~CONN_FLAG_READ_THREADSAFE; conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_READABLE); else if (aeCreateFileEvent(serverTL->el,conn->fd, - flags,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; + AE_READABLE|AE_READ_THREADSAFE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } @@ -281,15 +284,27 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD /* Handle normal I/O flows */ if (!invert && call_read) { + AeLocker lock; + if (!(conn->flags & CONN_FLAG_READ_THREADSAFE)) + lock.arm(nullptr); + if (!callHandler(conn, conn->read_handler)) return; } /* Fire the writable event. */ if (call_write) { + AeLocker lock; + if (!(conn->flags & CONN_FLAG_WRITE_THREADSAFE)) + lock.arm(nullptr); + if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { + AeLocker lock; + if (!(conn->flags & CONN_FLAG_READ_THREADSAFE)) + lock.arm(nullptr); + if (!callHandler(conn, conn->read_handler)) return; } } diff --git a/src/rdb.cpp b/src/rdb.cpp index 8ae4a0651..4e9fc891b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2641,9 +2641,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { g_pserver->rdb_child_pid = childpid; g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { - serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); - } + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); + } + }); } return (childpid == -1) ? C_ERR : C_OK; } diff --git a/src/replication.cpp b/src/replication.cpp index 853aaa4a3..95bbadd9a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1129,6 +1129,8 @@ void putSlaveOnline(client *replica) { } void sendBulkToSlave(connection *conn) { + serverAssert(GlobalLocksAcquired()); + client *replica = (client*)connGetPrivateData(conn); serverAssert(FCorrectThread(replica)); char buf[PROTO_IOBUF_LEN]; @@ -1192,9 +1194,11 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { g_pserver->rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (g_pserver->rdb_pipe_numconns_writing == 0) { - if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { - serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); - } + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); + } + }); } } @@ -1205,6 +1209,12 @@ void rdbPipeWriteHandler(struct connection *conn) { client *slave = (client*)connGetPrivateData(conn); AssertCorrectThread(slave); int nwritten; + + if (slave->flags & CLIENT_CLOSE_ASAP) { + rdbPipeWriteHandlerConnRemoved(conn); + return; + } + if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff, g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1) { @@ -1246,6 +1256,9 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, UNUSED(mask); UNUSED(clientData); UNUSED(eventLoop); + + serverAssert(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el == eventLoop); + int i; if (!g_pserver->rdb_pipe_buff) g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN); @@ -1262,7 +1275,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, if (!conn) continue; client *slave = (client*)connGetPrivateData(conn); - freeClient(slave); + freeClientAsync(slave); g_pserver->rdb_pipe_conns[i] = NULL; } killRDBChild(); @@ -1294,6 +1307,10 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, continue; client *slave = (client*)connGetPrivateData(conn); + serverAssert(slave->conn == conn); + if (slave->flags & CLIENT_CLOSE_ASAP) + continue; + // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will // be sending anything while in this replication state so it is OK if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { @@ -1314,8 +1331,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, * setup write handler (and disable pipe read handler, below) */ if (nwritten != g_pserver->rdb_pipe_bufflen) { g_pserver->rdb_pipe_numconns_writing++; - aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] { - connSetWriteHandler(conn, rdbPipeWriteHandler); + slave->casyncOpsPending++; + aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [slave] { + slave->casyncOpsPending--; + if (slave->flags & CLIENT_CLOSE_ASAP) + return; + connSetWriteHandler(slave->conn, rdbPipeWriteHandler); }); } stillAlive++; diff --git a/src/server.cpp b/src/server.cpp index 722ef467d..a2509c654 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2127,6 +2127,8 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); + updateCachedTime(1); + /* If another threads unblocked one of our clients, and this thread has been idle then beforeSleep won't have a chance to process the unblocking. So we also process them here in the cron job to ensure they don't starve.