diff --git a/src/replication.cpp b/src/replication.cpp index 36a6cb109..6b18d27ac 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1478,7 +1478,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, * setup write handler (and disable pipe read handler, below) */ if (nwritten != g_pserver->rdb_pipe_bufflen) { g_pserver->rdb_pipe_numconns_writing++; - aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] { + slave->postFunction([conn](client *) { connSetWriteHandler(conn, rdbPipeWriteHandler); }); } @@ -1607,21 +1607,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } else { - aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { - // Because the client could have been closed while the lambda waited to run we need to - // verify the replica is still connected - listIter li; - listNode *ln; - listRewind(g_pserver->slaves,&li); - bool fFound = false; - while ((ln = listNext(&li))) { - if (listNodeValue(ln) == replica) { - fFound = true; - break; - } - } - if (!fFound) - return; + replica->postFunction([](client *replica) { connSetWriteHandler(replica->conn,NULL); if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) { freeClient(replica); diff --git a/src/server.cpp b/src/server.cpp index 8d2a10129..ddebe3ac8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3887,6 +3887,15 @@ int processCommand(client *c, int callFlags) { return C_OK; } +bool client::postFunction(std::function fn) { + this->casyncOpsPending++; + return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{ + std::lock_guardlock)> lock(this->lock); + --casyncOpsPending; + fn(this); + }) == AE_OK; +} + /*================================== Shutdown =============================== */ /* Close listening sockets. Also unlink the unix domain socket if diff --git a/src/server.h b/src/server.h index 9274538c5..e3c62796b 100644 --- a/src/server.h +++ b/src/server.h @@ -1335,6 +1335,9 @@ typedef struct client { int iel; /* the event loop index we're registered with */ struct fastlock lock; int master_error; + + // post a function from a non-client thread to run on its client thread + bool postFunction(std::function fn); } client; struct saveparam {