diff --git a/src/networking.cpp b/src/networking.cpp index 719cfcc1b..f117839b2 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1920,8 +1920,7 @@ void ProcessPendingAsyncWrites() * get it called, and so forth. */ int handleClientsWithPendingWrites(int iel, int aof_state) { std::unique_lock lockf(g_pserver->rgthreadvar[iel].lockPendingWrite); - auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write; - int processed = (int)vec.size(); + int processed = 0; serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; @@ -1936,16 +1935,17 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { ae_flags |= AE_BARRIER; } - while(!vec.empty()) { - client *c = vec.back(); + auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write); + processed += (int)vec.size(); + + for (client *c : vec) { AssertCorrectThread(c); - c->flags &= ~CLIENT_PENDING_WRITE; - vec.pop_back(); + uint64_t flags = c->flags.fetch_and(~CLIENT_PENDING_WRITE, std::memory_order_relaxed); /* If a client is protected, don't do anything, - * that may trigger write error or recreate handler. */ - if (c->flags & CLIENT_PROTECTED) continue; + * that may trigger write error or recreate handler. */ + if (flags & CLIENT_PROTECTED) continue; std::unique_locklock)> lock(c->lock); @@ -1964,7 +1964,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { } /* If after the synchronous writes above we still have data to - * output to the client, we need to install the writable handler. */ + * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) freeClientAsync(c);