Remove fPendingReplicaWrite flag which can instead be calculated on demand

Former-commit-id: ae26afd13f955eb230b5c2cab20ec90db9b714ad
This commit is contained in:
John Sully 2021-06-25 01:54:38 +00:00
parent 4ac475ea20
commit 53d7d09f07
3 changed files with 67 additions and 74 deletions

View File

@ -158,7 +158,6 @@ client *createClient(connection *conn, int iel) {
c->flags = 0;
c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->fPendingReplicaWrite = FALSE;
c->ctime = c->lastinteraction = g_pserver->unixtime;
/* If the default user does not require authentication, the user is
* directly authenticated. */
@ -318,7 +317,7 @@ int prepareClientToWrite(client *c) {
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c);
if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c);
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@ -1132,7 +1131,7 @@ void copyClientOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply));
return (c->bufpos || listLength(c->reply) || c->FPendingReplicaWrite());
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
@ -1785,6 +1784,42 @@ int writeToClient(client *c, int handler_installed) {
std::unique_lock<decltype(c->lock)> lock(c->lock);
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
}
} else {
while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) {
@ -1840,47 +1875,6 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
/* If the client's current offset matches the last offset it can read from, there is no pending write */
if (c->repl_curr_off == c->repl_end_off){
c->fPendingReplicaWrite = false;
}
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
}
}
g_pserver->stat_net_output_bytes += totwritten;
@ -1900,7 +1894,7 @@ int writeToClient(client *c, int handler_installed) {
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
}
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) connSetWriteHandler(c->conn, NULL);
@ -2080,7 +2074,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) {
if (clientHasPendingReplies(c)) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
freeClientAsync(c);
}
@ -3705,7 +3699,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
/* In the case of a replica client, writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
return (!(c->flags & CLIENT_SLAVE) || !c->FPendingReplicaWrite() ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
}
/* This function returns the number of bytes that Redis is

View File

@ -722,7 +722,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
/* Force the partial sync to be queued */
prepareClientToWrite(c);
c->fPendingReplicaWrite = true;
return len;
}
@ -4974,11 +4973,8 @@ void flushReplBacklogToClients()
replica->repl_end_off = g_pserver->master_repl_offset;
/* Only if the there isn't already a pending write do we prepare the client to write */
if (!replica->fPendingReplicaWrite){
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
prepareClientToWrite(replica);
replica->fPendingReplicaWrite = true;
}
}
if (fAsyncWrite)

View File

@ -1594,7 +1594,6 @@ struct client {
* when sending data to this replica. */
long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset
* to prevent needing the global lock */
int fPendingReplicaWrite; /* Is there a write queued for this replica? */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
@ -1657,6 +1656,10 @@ struct client {
robj **argv;
size_t argv_len_sumActive = 0;
bool FPendingReplicaWrite() const {
return repl_curr_off != repl_end_off;
}
// post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const;