Former-commit-id: 5b88af1067d344f121b2d65e34fb40e229722575
This commit is contained in:
malavan 2021-11-02 19:14:47 +00:00
commit 322ec31191
7 changed files with 69 additions and 37 deletions

View File

@ -1834,3 +1834,14 @@ server-threads 2
# Enable FLASH support? (Enterprise Only)
# storage-provider flash /path/to/flash/db
# KeyDB will attempt to balance clients across threads evenly; However, replica clients
# are usually much more expensive than a normal client, and so KeyDB will try to assign
# fewer clients to threads with a replica. The weighting factor below is intented to help tune
# this behavior. A replica weighting factor of 2 means we treat a replica as the equivalent
# of two normal clients. Adjusting this value may improve performance when replication is
# used. The best weighting is workload specific - e.g. read heavy workloads should set
# this to 1. Very write heavy workloads may benefit from higher numbers.
#
# By default KeyDB sets this to 2.
replica-weighting-factor 2

View File

@ -2803,6 +2803,7 @@ standardConfig configs[] = {
createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 20, INTEGER_CONFIG, NULL, NULL),
createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-weighting-factor", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, g_pserver->replicaIsolationFactor, 2, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL),

View File

@ -1144,6 +1144,8 @@ int chooseBestThreadForAccept()
int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
// Note: Its repl factor less one because cclients also includes replicas, so we don't want to double count
cclientsThread += (g_pserver->rgthreadvar[iel].cclientsReplica) * (g_pserver->replicaIsolationFactor-1);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel;
if (cclientsThread < cclientsMin)
@ -1668,6 +1670,7 @@ bool freeClient(client *c) {
ln = listSearchKey(l,c);
serverAssert(ln != NULL);
listDelNode(l,ln);
g_pserver->rgthreadvar[c->iel].cclientsReplica--;
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
@ -1790,36 +1793,43 @@ int writeToClient(client *c, int handler_installed) {
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
while (clientHasPendingReplies(c)) {
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
* in the event of a wrap around write */
/* normal case with no wrap around */
if (repl_end_idx >= repl_curr_idx){
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
/* wrap around case */
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
if (nwritten == -1)
break;
} else {
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
if (nwritten2ndStage != -1)
nwritten += nwritten2ndStage;
}
break;
}
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwritten2ndStage == -1) nwritten = -1;
}
} else {
while(clientHasPendingReplies(c)) {
@ -2060,7 +2070,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
/* Try to write buffers to the client socket. */
/* Try to write buffers to the client socket, unless its a replica in multithread mode */
if (writeToClient(c,0) == C_ERR)
{
if (c->flags & CLIENT_CLOSE_ASAP)
@ -2547,7 +2557,7 @@ void parseClientCommandBuffer(client *c) {
}
/* Prefetch outside the lock for better perf */
if (g_pserver->prefetch_enabled && cserver.cthreads > 1 && cqueriesStart < c->vecqueuedcmd.size() &&
if (g_pserver->prefetch_enabled && (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) && cqueriesStart < c->vecqueuedcmd.size() &&
(g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2) || cserver.cthreads == 1) && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
@ -2694,7 +2704,7 @@ void readQueryFromClient(connection *conn) {
return;
}
if (cserver.cthreads > 1) {
if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) {
parseClientCommandBuffer(c);
serverTL->vecclientsProcess.push_back(c);
} else {

View File

@ -889,6 +889,7 @@ int masterTryPartialResynchronization(client *c) {
c->repl_ack_time = g_pserver->unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@ -992,6 +993,7 @@ int startBgsaveForReplication(int mincapa) {
replica->replstate = REPL_STATE_NONE;
replica->flags &= ~CLIENT_SLAVE;
listDelNode(g_pserver->slaves,ln);
g_pserver->rgthreadvar[replica->iel].cclientsReplica--;
addReplyError(replica,
"BGSAVE failed, replication can't continue");
replica->flags |= CLIENT_CLOSE_AFTER_REPLY;
@ -1121,6 +1123,7 @@ void syncCommand(client *c) {
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(g_pserver->slaves,c);
g_pserver->rgthreadvar[c->iel].cclientsReplica++;
/* Create the replication backlog if needed. */
if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) {

View File

@ -2112,12 +2112,11 @@ void databasesCron(bool fMainThread) {
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
if (serverTL->rehashCtl != nullptr) {
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) {
break;
} else {
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
}
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
serverAssert(serverTL->rehashCtl == nullptr);
@ -3794,8 +3793,6 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->in_eval = 0;
pvar->in_exec = 0;
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
pvar->current_client = nullptr;
pvar->fRetrySetAofEvent = false;
if (pvar->el == NULL) {
@ -3804,6 +3801,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
strerror(errno));
exit(1);
}
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");

View File

@ -2024,6 +2024,7 @@ struct redisServerThreadVars {
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
list *clients_pending_asyncwrite;
int cclients;
int cclientsReplica = 0;
client *current_client; /* Current client */
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
@ -2217,6 +2218,8 @@ struct redisServer {
int active_expire_enabled; /* Can be disabled for testing purposes. */
int replicaIsolationFactor = 1;
/* Fields used only for stats */
long long stat_numcommands; /* Number of processed commands */
long long stat_numconnections; /* Number of connections received */

View File

@ -44,6 +44,11 @@ start_server {tags {"introspection"}} {
set e
} {ERR*}
test {replica-weighting-factor does not accept values less than 1} {
catch {r config set replica-weighting-factor 0} e
set e
} {ERR*}
test {CLIENT SETNAME can assign a name to this connection} {
assert_equal [r client setname myname] {OK}
r client list