diff --git a/src/networking.cpp b/src/networking.cpp index 94f1c21c9..f5698282b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1911,12 +1911,13 @@ void ProcessPendingAsyncWrites() } else { - if (!c->fPendingAsyncWriteHandler) { - c->fPendingAsyncWriteHandler = true; + bool expected = false; + if (c->fPendingAsyncWriteHandler.compare_exchange_strong(expected, true)) { bool fResult = c->postFunction([](client *c) { c->fPendingAsyncWriteHandler = false; - connSetWriteHandler(c->conn, sendReplyToClient, true); - }); + clientInstallWriteHandler(c); + handleClientsWithPendingWrites(c->iel, g_pserver->aof_state); + }, false); if (!fResult) c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset diff --git a/src/server.cpp b/src/server.cpp index 32610a974..3cf0b0189 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3992,13 +3992,13 @@ int processCommand(client *c, int callFlags) { return C_OK; } -bool client::postFunction(std::function fn) { +bool client::postFunction(std::function fn, bool fLock) { this->casyncOpsPending++; return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{ std::lock_guardlock)> lock(this->lock); --casyncOpsPending; fn(this); - }) == AE_OK; + }, false, fLock) == AE_OK; } /*================================== Shutdown =============================== */ diff --git a/src/server.h b/src/server.h index 3e0c48f77..198633f90 100644 --- a/src/server.h +++ b/src/server.h @@ -1083,7 +1083,7 @@ typedef struct client { std::atomic flags; /* Client flags: CLIENT_* macros. */ int casyncOpsPending; int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ - int fPendingAsyncWriteHandler; + std::atomic fPendingAsyncWriteHandler; int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a replica. */ int repl_put_online_on_ack; /* Install replica write handler on ACK. */ @@ -1153,7 +1153,7 @@ typedef struct client { int master_error; // post a function from a non-client thread to run on its client thread - bool postFunction(std::function fn); + bool postFunction(std::function fn, bool fLock = true); } client; struct saveparam {