From 53d7d09f076f13d33acea2515a2439dade1836c6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Jun 2021 01:54:38 +0000 Subject: [PATCH] Remove fPendingReplicaWrite flag which can instead be calculated on demand Former-commit-id: ae26afd13f955eb230b5c2cab20ec90db9b714ad --- src/networking.cpp | 128 +++++++++++++++++++++----------------------- src/replication.cpp | 8 +-- src/server.h | 5 +- 3 files changed, 67 insertions(+), 74 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 767fe9c2b..690b03a51 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -158,7 +158,6 @@ client *createClient(connection *conn, int iel) { c->flags = 0; c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWriteHandler = FALSE; - c->fPendingReplicaWrite = FALSE; c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -318,7 +317,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c); + if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1132,7 +1131,7 @@ void copyClientOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { - return (c->bufpos || listLength(c->reply)); + return (c->bufpos || listLength(c->reply) || c->FPendingReplicaWrite()); } static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; @@ -1785,66 +1784,9 @@ int writeToClient(client *c, int handler_installed) { std::unique_locklock)> lock(c->lock); - while(clientHasPendingReplies(c)) { - serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); - if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } else { - o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); - if (o->used == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - continue; - } - - nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == o->used) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); - } - } - /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a replica or a monitor (otherwise, on high-speed traffic, the - * replication/output buffer will grow indefinitely) */ - if (totwritten > NET_MAX_WRITES_PER_EVENT && - (g_pserver->maxmemory == 0 || - zmalloc_used_memory() < g_pserver->maxmemory) && - !(c->flags & CLIENT_SLAVE)) break; - } - /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); serverAssert(c->repl_curr_off != -1); @@ -1872,15 +1814,67 @@ int writeToClient(client *c, int handler_installed) { totwritten += nwritten; c->repl_curr_off += nwritten; serverAssert(c->repl_curr_off <= c->repl_end_off); - /* If the client's current offset matches the last offset it can read from, there is no pending write */ - if (c->repl_curr_off == c->repl_end_off){ - c->fPendingReplicaWrite = false; - } } /* If the second part of a write didn't go through, we still need to register that */ if (nwritten2ndStage == -1) nwritten = -1; } + } else { + while(clientHasPendingReplies(c)) { + serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); + if (c->bufpos > 0) { + nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; + totwritten += nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); + if (o->used == 0) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + continue; + } + + nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; + totwritten += nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == o->used) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } + } + /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from + * super fast link that is always able to accept data (in real world + * scenario think about 'KEYS *' against the loopback interface). + * + * However if we are over the maxmemory limit we ignore that and + * just deliver as much data as it is possible to deliver. + * + * Moreover, we also send as much as possible if the client is + * a replica or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ + if (totwritten > NET_MAX_WRITES_PER_EVENT && + (g_pserver->maxmemory == 0 || + zmalloc_used_memory() < g_pserver->maxmemory) && + !(c->flags & CLIENT_SLAVE)) break; + } } g_pserver->stat_net_output_bytes += totwritten; @@ -1900,7 +1894,7 @@ int writeToClient(client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } - if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) { + if (!clientHasPendingReplies(c)) { c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -2080,7 +2074,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) { + if (clientHasPendingReplies(c)) { if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); } @@ -3705,7 +3699,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { /* In the case of a replica client, writes to said replica are using data from the replication backlog * as opposed to it's own internal buffer, this number should keep track of that */ unsigned long getClientReplicationBacklogSharedUsage(client *c) { - return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; + return (!(c->flags & CLIENT_SLAVE) || !c->FPendingReplicaWrite() ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; } /* This function returns the number of bytes that Redis is diff --git a/src/replication.cpp b/src/replication.cpp index b9465680e..94b35e314 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -722,7 +722,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) { /* Force the partial sync to be queued */ prepareClientToWrite(c); - c->fPendingReplicaWrite = true; return len; } @@ -4974,11 +4973,8 @@ void flushReplBacklogToClients() replica->repl_end_off = g_pserver->master_repl_offset; /* Only if the there isn't already a pending write do we prepare the client to write */ - if (!replica->fPendingReplicaWrite){ - serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); - prepareClientToWrite(replica); - replica->fPendingReplicaWrite = true; - } + serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); + prepareClientToWrite(replica); } if (fAsyncWrite) diff --git a/src/server.h b/src/server.h index 0d6f766ce..07608632e 100644 --- a/src/server.h +++ b/src/server.h @@ -1594,7 +1594,6 @@ struct client { * when sending data to this replica. */ long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset * to prevent needing the global lock */ - int fPendingReplicaWrite; /* Is there a write queued for this replica? */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ @@ -1657,6 +1656,10 @@ struct client { robj **argv; size_t argv_len_sumActive = 0; + bool FPendingReplicaWrite() const { + return repl_curr_off != repl_end_off; + } + // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const;