Fix slower performance during replication by better balancing client load on threads servicing a replica

Former-commit-id: 496f91d3f169fcfe6d94c2ea69cee402f8eb60ca
This commit is contained in:
John Sully 2021-10-22 03:16:33 +00:00
parent 2d0f54eb8f
commit d41aa34ba3
3 changed files with 41 additions and 28 deletions

View File

@ -1144,6 +1144,7 @@ int chooseBestThreadForAccept()
int cclientsThread; int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
cclientsThread *= (g_pserver->rgthreadvar[iel].cclientsReplica+1);
if (cclientsThread < cserver.thread_min_client_threshold) if (cclientsThread < cserver.thread_min_client_threshold)
return iel; return iel;
if (cclientsThread < cclientsMin) if (cclientsThread < cclientsMin)
@ -1668,6 +1669,7 @@ bool freeClient(client *c) {
ln = listSearchKey(l,c); ln = listSearchKey(l,c);
serverAssert(ln != NULL); serverAssert(ln != NULL);
listDelNode(l,ln); listDelNode(l,ln);
g_pserver->rgthreadvar[c->iel].cclientsReplica--;
/* We need to remember the time when we started to have zero /* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication * attached slaves, as after some time we'll free the replication
* backlog. */ * backlog. */
@ -1790,6 +1792,8 @@ int writeToClient(client *c, int handler_installed) {
is a replica, so only attempt to do so if that's the case. */ is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock); std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
while (clientHasPendingReplies(c)) {
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1); serverAssert(c->repl_curr_off != -1);
@ -1820,6 +1824,11 @@ int writeToClient(client *c, int handler_installed) {
/* If the second part of a write didn't go through, we still need to register that */ /* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1; if (nwritten2ndStage == -1) nwritten = -1;
if (nwritten == -1)
break;
} else {
break;
}
} }
} else { } else {
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
@ -2060,7 +2069,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* Don't write to clients that are going to be closed anyway. */ /* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue; if (c->flags & CLIENT_CLOSE_ASAP) continue;
/* Try to write buffers to the client socket. */ /* Try to write buffers to the client socket, unless its a replica in multithread mode */
if (writeToClient(c,0) == C_ERR) if (writeToClient(c,0) == C_ERR)
{ {
if (c->flags & CLIENT_CLOSE_ASAP) if (c->flags & CLIENT_CLOSE_ASAP)

View File

@ -889,6 +889,7 @@ int masterTryPartialResynchronization(client *c) {
c->repl_ack_time = g_pserver->unixtime; c->repl_ack_time = g_pserver->unixtime;
c->repl_put_online_on_ack = 0; c->repl_put_online_on_ack = 0;
listAddNodeTail(g_pserver->slaves,c); listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* We can't use the connection buffers since they are used to accumulate /* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is * new commands at this stage. But we are sure the socket send buffer is
@ -992,6 +993,7 @@ int startBgsaveForReplication(int mincapa) {
replica->replstate = REPL_STATE_NONE; replica->replstate = REPL_STATE_NONE;
replica->flags &= ~CLIENT_SLAVE; replica->flags &= ~CLIENT_SLAVE;
listDelNode(g_pserver->slaves,ln); listDelNode(g_pserver->slaves,ln);
g_pserver->rgthreadvar[replica->iel].cclientsReplica--;
addReplyError(replica, addReplyError(replica,
"BGSAVE failed, replication can't continue"); "BGSAVE failed, replication can't continue");
replica->flags |= CLIENT_CLOSE_AFTER_REPLY; replica->flags |= CLIENT_CLOSE_AFTER_REPLY;
@ -1121,6 +1123,7 @@ void syncCommand(client *c) {
c->repldbfd = -1; c->repldbfd = -1;
c->flags |= CLIENT_SLAVE; c->flags |= CLIENT_SLAVE;
listAddNodeTail(g_pserver->slaves,c); listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* Create the replication backlog if needed. */ /* Create the replication backlog if needed. */
if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) { if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) {

View File

@ -2022,6 +2022,7 @@ struct redisServerThreadVars {
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
list *clients_pending_asyncwrite; list *clients_pending_asyncwrite;
int cclients; int cclients;
int cclientsReplica = 0;
client *current_client; /* Current client */ client *current_client; /* Current client */
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */ long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */