diff --git a/keydb.conf b/keydb.conf index 50f9784a1..352f07663 100644 --- a/keydb.conf +++ b/keydb.conf @@ -1834,3 +1834,14 @@ server-threads 2 # Enable FLASH support? (Enterprise Only) # storage-provider flash /path/to/flash/db + +# KeyDB will attempt to balance clients across threads evenly; However, replica clients +# are usually much more expensive than a normal client, and so KeyDB will try to assign +# fewer clients to threads with a replica. The weighting factor below is intented to help tune +# this behavior. A replica weighting factor of 2 means we treat a replica as the equivalent +# of two normal clients. Adjusting this value may improve performance when replication is +# used. The best weighting is workload specific - e.g. read heavy workloads should set +# this to 1. Very write heavy workloads may benefit from higher numbers. +# +# By default KeyDB sets this to 2. +replica-weighting-factor 2 \ No newline at end of file diff --git a/src/config.cpp b/src/config.cpp index 701565e1e..e2028f4cc 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2803,6 +2803,7 @@ standardConfig configs[] = { createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 20, INTEGER_CONFIG, NULL, NULL), createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL), createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL), + createIntConfig("replica-weighting-factor", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, g_pserver->replicaIsolationFactor, 2, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL), diff --git a/src/networking.cpp b/src/networking.cpp index d8b3e9a5d..29c4ac206 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1144,6 +1144,8 @@ int chooseBestThreadForAccept() int cclientsThread; atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + // Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count + cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1); if (cclientsThread < cserver.thread_min_client_threshold) return iel; if (cclientsThread < cclientsMin) @@ -1668,6 +1670,7 @@ bool freeClient(client *c) { ln = listSearchKey(l,c); serverAssert(ln != NULL); listDelNode(l,ln); + g_pserver->rgthreadvar[c->iel].cclientsReplica--; /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ @@ -1790,36 +1793,43 @@ int writeToClient(client *c, int handler_installed) { is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); - serverAssert(c->repl_curr_off != -1); - if (c->repl_curr_off != c->repl_end_off){ - long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); - long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog - * in the event of a wrap around write */ - /* normal case with no wrap around */ - if (repl_end_idx >= repl_curr_idx){ - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); - /* wrap around case */ + while (clientHasPendingReplies(c)) { + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + serverAssert(c->repl_curr_off != -1); + + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ + /* normal case with no wrap around */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ + } else { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; + } + } + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + serverAssert(c->repl_curr_off <= c->repl_end_off); + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwritten2ndStage == -1) nwritten = -1; + if (nwritten == -1) + break; } else { - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); - /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ - nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); - if (nwritten2ndStage != -1) - nwritten += nwritten2ndStage; - } + break; } - - /* only increment bytes if an error didn't occur */ - if (nwritten > 0){ - totwritten += nwritten; - c->repl_curr_off += nwritten; - serverAssert(c->repl_curr_off <= c->repl_end_off); - } - - /* If the second part of a write didn't go through, we still need to register that */ - if (nwritten2ndStage == -1) nwritten = -1; } } else { while(clientHasPendingReplies(c)) { @@ -2060,7 +2070,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; - /* Try to write buffers to the client socket. */ + /* Try to write buffers to the client socket, unless its a replica in multithread mode */ if (writeToClient(c,0) == C_ERR) { if (c->flags & CLIENT_CLOSE_ASAP) @@ -2547,7 +2557,7 @@ void parseClientCommandBuffer(client *c) { } /* Prefetch outside the lock for better perf */ - if (g_pserver->prefetch_enabled && cserver.cthreads > 1 && cqueriesStart < c->vecqueuedcmd.size() && + if (g_pserver->prefetch_enabled && (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) && cqueriesStart < c->vecqueuedcmd.size() && (g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2) || cserver.cthreads == 1) && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { @@ -2694,7 +2704,7 @@ void readQueryFromClient(connection *conn) { return; } - if (cserver.cthreads > 1) { + if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { parseClientCommandBuffer(c); serverTL->vecclientsProcess.push_back(c); } else { diff --git a/src/replication.cpp b/src/replication.cpp index 2725a9ce4..91036723a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -889,6 +889,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -992,6 +993,7 @@ int startBgsaveForReplication(int mincapa) { replica->replstate = REPL_STATE_NONE; replica->flags &= ~CLIENT_SLAVE; listDelNode(g_pserver->slaves,ln); + g_pserver->rgthreadvar[replica->iel].cclientsReplica--; addReplyError(replica, "BGSAVE failed, replication can't continue"); replica->flags |= CLIENT_CLOSE_AFTER_REPLY; @@ -1121,6 +1123,7 @@ void syncCommand(client *c) { c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* Create the replication backlog if needed. */ if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) { diff --git a/src/server.cpp b/src/server.cpp index 16a9dc378..4b71a7f8c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2112,12 +2112,11 @@ void databasesCron(bool fMainThread) { if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { if (serverTL->rehashCtl != nullptr) { - if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { + if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { break; - } else { - dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); - serverTL->rehashCtl = nullptr; - } + } + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); + serverTL->rehashCtl = nullptr; } serverAssert(serverTL->rehashCtl == nullptr); @@ -3794,8 +3793,6 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->in_eval = 0; pvar->in_exec = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); - aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE); - aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); pvar->current_client = nullptr; pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { @@ -3804,6 +3801,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) strerror(errno)); exit(1); } + aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE); + aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite"); diff --git a/src/server.h b/src/server.h index 890db48b5..5a067090b 100644 --- a/src/server.h +++ b/src/server.h @@ -2024,6 +2024,7 @@ struct redisServerThreadVars { list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; int cclients; + int cclientsReplica = 0; client *current_client; /* Current client */ long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ @@ -2217,6 +2218,8 @@ struct redisServer { int active_expire_enabled; /* Can be disabled for testing purposes. */ + int replicaIsolationFactor = 1; + /* Fields used only for stats */ long long stat_numcommands; /* Number of processed commands */ long long stat_numconnections; /* Number of connections received */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 2a0474ed8..b562530f6 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -44,6 +44,11 @@ start_server {tags {"introspection"}} { set e } {ERR*} + test {replica-weighting-factor does not accept values less than 1} { + catch {r config set replica-weighting-factor 0} e + set e + } {ERR*} + test {CLIENT SETNAME can assign a name to this connection} { assert_equal [r client setname myname] {OK} r client list