From d41aa34ba38595d8e6e6c2bafd5e2b0a6d7447b9 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Oct 2021 03:16:33 +0000 Subject: [PATCH] Fix slower performance during replication by better balancing client load on threads servicing a replica Former-commit-id: 496f91d3f169fcfe6d94c2ea69cee402f8eb60ca --- src/networking.cpp | 65 ++++++++++++++++++++++++++------------------- src/replication.cpp | 3 +++ src/server.h | 1 + 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 72b2ec59e..d1ccf28a9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1144,6 +1144,7 @@ int chooseBestThreadForAccept() int cclientsThread; atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + cclientsThread *= (g_pserver->rgthreadvar[iel].cclientsReplica+1); if (cclientsThread < cserver.thread_min_client_threshold) return iel; if (cclientsThread < cclientsMin) @@ -1668,6 +1669,7 @@ bool freeClient(client *c) { ln = listSearchKey(l,c); serverAssert(ln != NULL); listDelNode(l,ln); + g_pserver->rgthreadvar[c->iel].cclientsReplica--; /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ @@ -1790,36 +1792,43 @@ int writeToClient(client *c, int handler_installed) { is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); - serverAssert(c->repl_curr_off != -1); - if (c->repl_curr_off != c->repl_end_off){ - long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); - long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog - * in the event of a wrap around write */ - /* normal case with no wrap around */ - if (repl_end_idx >= repl_curr_idx){ - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); - /* wrap around case */ + while (clientHasPendingReplies(c)) { + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + serverAssert(c->repl_curr_off != -1); + + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ + /* normal case with no wrap around */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ + } else { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; + } + } + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + serverAssert(c->repl_curr_off <= c->repl_end_off); + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwritten2ndStage == -1) nwritten = -1; + if (nwritten == -1) + break; } else { - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); - /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ - nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); - if (nwritten2ndStage != -1) - nwritten += nwritten2ndStage; - } + break; } - - /* only increment bytes if an error didn't occur */ - if (nwritten > 0){ - totwritten += nwritten; - c->repl_curr_off += nwritten; - serverAssert(c->repl_curr_off <= c->repl_end_off); - } - - /* If the second part of a write didn't go through, we still need to register that */ - if (nwritten2ndStage == -1) nwritten = -1; } } else { while(clientHasPendingReplies(c)) { @@ -2060,7 +2069,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; - /* Try to write buffers to the client socket. */ + /* Try to write buffers to the client socket, unless its a replica in multithread mode */ if (writeToClient(c,0) == C_ERR) { if (c->flags & CLIENT_CLOSE_ASAP) diff --git a/src/replication.cpp b/src/replication.cpp index 2725a9ce4..91036723a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -889,6 +889,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -992,6 +993,7 @@ int startBgsaveForReplication(int mincapa) { replica->replstate = REPL_STATE_NONE; replica->flags &= ~CLIENT_SLAVE; listDelNode(g_pserver->slaves,ln); + g_pserver->rgthreadvar[replica->iel].cclientsReplica--; addReplyError(replica, "BGSAVE failed, replication can't continue"); replica->flags |= CLIENT_CLOSE_AFTER_REPLY; @@ -1121,6 +1123,7 @@ void syncCommand(client *c) { c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* Create the replication backlog if needed. */ if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) { diff --git a/src/server.h b/src/server.h index 45f44df8f..1b2c02f04 100644 --- a/src/server.h +++ b/src/server.h @@ -2022,6 +2022,7 @@ struct redisServerThreadVars { list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; int cclients; + int cclientsReplica = 0; client *current_client; /* Current client */ long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */