From d57883bf6443b679d22b6942b05af49dab188d22 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 15 Oct 2021 16:22:42 +0000 Subject: [PATCH 1/7] Permit prefetch for FLASH scenarios in single thread mode Former-commit-id: 6d0b90ed43cc9d1196903ddbc7d50cd40e439e42 --- src/networking.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.cpp b/src/networking.cpp index d8b3e9a5d..1638328c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2547,7 +2547,7 @@ void parseClientCommandBuffer(client *c) { } /* Prefetch outside the lock for better perf */ - if (g_pserver->prefetch_enabled && cserver.cthreads > 1 && cqueriesStart < c->vecqueuedcmd.size() && + if (g_pserver->prefetch_enabled && (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) && cqueriesStart < c->vecqueuedcmd.size() && (g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2) || cserver.cthreads == 1) && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { From 43a62493f872874e965b3edbbfaba199c292d675 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Oct 2021 03:13:36 +0000 Subject: [PATCH 2/7] Additional change to ensure FLASH storage goes through the multithread path Former-commit-id: 422ea0723f0b8718f28ef9c1cc4d5f56d374af46 --- src/networking.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.cpp b/src/networking.cpp index 1638328c0..72b2ec59e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2694,7 +2694,7 @@ void readQueryFromClient(connection *conn) { return; } - if (cserver.cthreads > 1) { + if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { parseClientCommandBuffer(c); serverTL->vecclientsProcess.push_back(c); } else { From 7b3337d244bf718f0fb15b0db8a194a3ed9dd974 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Oct 2021 22:46:17 +0000 Subject: [PATCH 3/7] Ensure async rehash completes before we start a new time. Degrad to sync hash if necessary to ensure this Former-commit-id: 0f830facc7c6bc6668af9bb2e10b6e13a13227aa --- src/server.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index ef1039cea..f257883e0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2112,12 +2112,11 @@ void databasesCron(bool fMainThread) { if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { if (serverTL->rehashCtl != nullptr) { - if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { + if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { break; - } else { - dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); - serverTL->rehashCtl = nullptr; - } + } + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); + serverTL->rehashCtl = nullptr; } serverAssert(serverTL->rehashCtl == nullptr); From 3f089054080d98e8a87d8bdbf2c5f4deba1595a1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Oct 2021 23:45:46 +0000 Subject: [PATCH 4/7] Do not dereference a nullptr if there are too many files open Former-commit-id: 4674eb29a261e8b046953398c94354fc3e550c2a --- src/server.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index f257883e0..1794f51e8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3793,8 +3793,6 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->in_eval = 0; pvar->in_exec = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); - aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE); - aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); pvar->current_client = nullptr; pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { @@ -3803,6 +3801,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) strerror(errno)); exit(1); } + aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE); + aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite"); From fbe9ff66804cc8d48e3919ab1e18bea9949b6327 Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 21 Oct 2021 23:50:50 +0000 Subject: [PATCH 5/7] null check for delete override Former-commit-id: f5f2f5e200a5ff1b0306998624b758d5a4c10825 --- src/new.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/new.cpp b/src/new.cpp index 4775e207a..4e6b07dfd 100644 --- a/src/new.cpp +++ b/src/new.cpp @@ -27,14 +27,17 @@ void *operator new(std::size_t size, const std::nothrow_t &) noexcept return zmalloc(size, MALLOC_LOCAL); } +//need to do null checks for delete since the compiler can optimize out null checks in zfree void operator delete(void * p) noexcept { - zfree(p); + if (p != nullptr) + zfree(p); } void operator delete(void *p, std::size_t) noexcept { - zfree(p); + if (p != nullptr) + zfree(p); } #endif From d41aa34ba38595d8e6e6c2bafd5e2b0a6d7447b9 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Oct 2021 03:16:33 +0000 Subject: [PATCH 6/7] Fix slower performance during replication by better balancing client load on threads servicing a replica Former-commit-id: 496f91d3f169fcfe6d94c2ea69cee402f8eb60ca --- src/networking.cpp | 65 ++++++++++++++++++++++++++------------------- src/replication.cpp | 3 +++ src/server.h | 1 + 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 72b2ec59e..d1ccf28a9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1144,6 +1144,7 @@ int chooseBestThreadForAccept() int cclientsThread; atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + cclientsThread *= (g_pserver->rgthreadvar[iel].cclientsReplica+1); if (cclientsThread < cserver.thread_min_client_threshold) return iel; if (cclientsThread < cclientsMin) @@ -1668,6 +1669,7 @@ bool freeClient(client *c) { ln = listSearchKey(l,c); serverAssert(ln != NULL); listDelNode(l,ln); + g_pserver->rgthreadvar[c->iel].cclientsReplica--; /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ @@ -1790,36 +1792,43 @@ int writeToClient(client *c, int handler_installed) { is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); - serverAssert(c->repl_curr_off != -1); - if (c->repl_curr_off != c->repl_end_off){ - long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); - long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog - * in the event of a wrap around write */ - /* normal case with no wrap around */ - if (repl_end_idx >= repl_curr_idx){ - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); - /* wrap around case */ + while (clientHasPendingReplies(c)) { + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + serverAssert(c->repl_curr_off != -1); + + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ + /* normal case with no wrap around */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ + } else { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; + } + } + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + serverAssert(c->repl_curr_off <= c->repl_end_off); + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwritten2ndStage == -1) nwritten = -1; + if (nwritten == -1) + break; } else { - nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); - /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ - nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); - if (nwritten2ndStage != -1) - nwritten += nwritten2ndStage; - } + break; } - - /* only increment bytes if an error didn't occur */ - if (nwritten > 0){ - totwritten += nwritten; - c->repl_curr_off += nwritten; - serverAssert(c->repl_curr_off <= c->repl_end_off); - } - - /* If the second part of a write didn't go through, we still need to register that */ - if (nwritten2ndStage == -1) nwritten = -1; } } else { while(clientHasPendingReplies(c)) { @@ -2060,7 +2069,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; - /* Try to write buffers to the client socket. */ + /* Try to write buffers to the client socket, unless its a replica in multithread mode */ if (writeToClient(c,0) == C_ERR) { if (c->flags & CLIENT_CLOSE_ASAP) diff --git a/src/replication.cpp b/src/replication.cpp index 2725a9ce4..91036723a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -889,6 +889,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -992,6 +993,7 @@ int startBgsaveForReplication(int mincapa) { replica->replstate = REPL_STATE_NONE; replica->flags &= ~CLIENT_SLAVE; listDelNode(g_pserver->slaves,ln); + g_pserver->rgthreadvar[replica->iel].cclientsReplica--; addReplyError(replica, "BGSAVE failed, replication can't continue"); replica->flags |= CLIENT_CLOSE_AFTER_REPLY; @@ -1121,6 +1123,7 @@ void syncCommand(client *c) { c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(g_pserver->slaves,c); + g_pserver->rgthreadvar[c->iel].cclientsReplica++; /* Create the replication backlog if needed. */ if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) { diff --git a/src/server.h b/src/server.h index 45f44df8f..1b2c02f04 100644 --- a/src/server.h +++ b/src/server.h @@ -2022,6 +2022,7 @@ struct redisServerThreadVars { list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; int cclients; + int cclientsReplica = 0; client *current_client; /* Current client */ long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ From 73215b2eebb38e682caca6eeb1e7d7a1f0114a07 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 29 Oct 2021 17:59:46 +0000 Subject: [PATCH 7/7] Make the replica weighting configurable Former-commit-id: be6a8a7e68acb5cfbe950f13b903e6f7b98c5a39 --- keydb.conf | 11 +++++++++++ src/config.cpp | 1 + src/networking.cpp | 3 ++- src/server.h | 2 ++ tests/unit/introspection.tcl | 5 +++++ 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/keydb.conf b/keydb.conf index 50f9784a1..352f07663 100644 --- a/keydb.conf +++ b/keydb.conf @@ -1834,3 +1834,14 @@ server-threads 2 # Enable FLASH support? (Enterprise Only) # storage-provider flash /path/to/flash/db + +# KeyDB will attempt to balance clients across threads evenly; However, replica clients +# are usually much more expensive than a normal client, and so KeyDB will try to assign +# fewer clients to threads with a replica. The weighting factor below is intented to help tune +# this behavior. A replica weighting factor of 2 means we treat a replica as the equivalent +# of two normal clients. Adjusting this value may improve performance when replication is +# used. The best weighting is workload specific - e.g. read heavy workloads should set +# this to 1. Very write heavy workloads may benefit from higher numbers. +# +# By default KeyDB sets this to 2. +replica-weighting-factor 2 \ No newline at end of file diff --git a/src/config.cpp b/src/config.cpp index 701565e1e..e2028f4cc 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2803,6 +2803,7 @@ standardConfig configs[] = { createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 20, INTEGER_CONFIG, NULL, NULL), createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL), createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL), + createIntConfig("replica-weighting-factor", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, g_pserver->replicaIsolationFactor, 2, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL), diff --git a/src/networking.cpp b/src/networking.cpp index d1ccf28a9..29c4ac206 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1144,7 +1144,8 @@ int chooseBestThreadForAccept() int cclientsThread; atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); - cclientsThread *= (g_pserver->rgthreadvar[iel].cclientsReplica+1); + // Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count + cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1); if (cclientsThread < cserver.thread_min_client_threshold) return iel; if (cclientsThread < cclientsMin) diff --git a/src/server.h b/src/server.h index 1b2c02f04..8997c3e73 100644 --- a/src/server.h +++ b/src/server.h @@ -2216,6 +2216,8 @@ struct redisServer { int active_expire_enabled; /* Can be disabled for testing purposes. */ + int replicaIsolationFactor = 1; + /* Fields used only for stats */ long long stat_numcommands; /* Number of processed commands */ long long stat_numconnections; /* Number of connections received */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 2a0474ed8..b562530f6 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -44,6 +44,11 @@ start_server {tags {"introspection"}} { set e } {ERR*} + test {replica-weighting-factor does not accept values less than 1} { + catch {r config set replica-weighting-factor 0} e + set e + } {ERR*} + test {CLIENT SETNAME can assign a name to this connection} { assert_equal [r client setname myname] {OK} r client list