diff --git a/src/networking.cpp b/src/networking.cpp index 34dea1452..d32912bd8 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread serverAssert(FCorrectThread(c) || fAsync); - serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + if (FCorrectThread(c)) { + serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + } else { + serverAssert(GlobalLocksAcquired()); + } if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. // do not install a write handler diff --git a/src/replication.cpp b/src/replication.cpp index 5abfd2514..d2f948567 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -385,7 +385,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_locklock)> lock(replica->lock); + std::unique_locklock)> lock(replica->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(replica)) + lock.lock(); if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) { replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; @@ -434,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *replica = (client*)ln->value; - std::lock_guardlock)> ulock(replica->lock); + std::unique_locklock)> ulock(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ulock.lock(); if (FMasterHost(replica)) continue; // Active Active case, don't feed back @@ -483,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; - std::lock_guardlock)> lock(monitor->lock); + std::unique_locklock)> lock(monitor->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(c)) + lock.lock(); addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); @@ -1206,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } else { - aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{ + aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { + // Because the client could have been closed while the lambda waited to run we need to + // verify the replica is still connected + listIter li; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -3379,4 +3401,4 @@ void updateMasterAuth() if (cserver.default_masteruser) mi->masteruser = zstrdup(cserver.default_masteruser); } -} \ No newline at end of file +}