From a098681bbf13f382145945d946544ac6d11b8055 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 20:14:24 -0500 Subject: [PATCH] Cluster multithreading fixes Former-commit-id: 3dd78a6101df0a980e520dcb55d80651bfc5a3a7 --- src/cluster.cpp | 38 ++++++++++++++++++++++++++++++++------ src/networking.cpp | 24 +++++++++++++++++------- src/server.cpp | 18 ++++++++++++------ src/server.h | 5 +++-- tests/cluster/run.tcl | 2 ++ 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 6ba5cfef7..3fd513221 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) { * This function will just make sure that the original node associated * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { + if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN) + { + // we can't perform this operation on this thread, queue it on the main thread + if (link->node) + link->node->link = NULL; + link->node = nullptr; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + freeClusterLink(link); + }); + return; + } if (link->fd != -1) { aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE); } @@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) { * consumed by write(). We don't try to optimize this for speed too much * as this is a very low traffic channel. */ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN); clusterLink *link = (clusterLink*) privdata; ssize_t nwritten; UNUSED(el); UNUSED(mask); - nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); + // We're about to release the lock, so the link's sndbuf needs to be owned fully by us + // allocate a new one in case anyone tries to write while we're waiting + sds sndbuf = link->sndbuf; + link->sndbuf = sdsempty(); + + aeReleaseLock(); + nwritten = write(fd, sndbuf, sdslen(sndbuf)); + aeAcquireLock(); + if (nwritten <= 0) { serverLog(LL_DEBUG,"I/O error writing to node link: %s", (nwritten == -1) ? strerror(errno) : "short write"); + sdsfree(sndbuf); handleLinkIOError(link); return; } - sdsrange(link->sndbuf,nwritten,-1); + sdsrange(sndbuf,nwritten,-1); + // Restore our send buffer, ensuring any unsent data is first + sndbuf = sdscat(sndbuf, link->sndbuf); + sdsfree(link->sndbuf); + link->sndbuf = sndbuf; if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE); + aeDeleteFileEvent(el, link->fd, AE_WRITABLE); } /* Read data. Try to read the first field of the header first to check the @@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * the link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { + serverAssert(GlobalLocksAcquired()); if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, - clusterWriteHandler,link); + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, + clusterWriteHandler,link,false); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); @@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) { void resetManualFailover(void) { if (g_pserver->cluster->mf_end && clientsArePaused()) { g_pserver->clients_pause_end_time = 0; - clientsArePaused(); /* Just use the side effect of the function. */ + unpauseClientsIfNecessary(); } g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */ g_pserver->cluster->mf_can_start = 0; diff --git a/src/networking.cpp b/src/networking.cpp index a14994fbc..ef3ee2683 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2960,38 +2960,48 @@ void flushSlavesOutputBuffers(void) { * than the time left for the previous pause, no change is made to the * left duration. */ void pauseClients(mstime_t end) { - if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time) + serverAssert(GlobalLocksAcquired()); + if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time) g_pserver->clients_pause_end_time = end; - g_pserver->clients_paused = 1; + + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].clients_paused = true; + } } /* Return non-zero if clients are currently paused. As a side effect the * function checks if the pause time was reached and clear it. */ int clientsArePaused(void) { - if (g_pserver->clients_paused && + return serverTL->clients_paused; +} + +void unpauseClientsIfNecessary() +{ + serverAssert(GlobalLocksAcquired()); + if (serverTL->clients_paused && g_pserver->clients_pause_end_time < g_pserver->mstime) { - aeAcquireLock(); listNode *ln; listIter li; client *c; - g_pserver->clients_paused = 0; + serverTL->clients_paused = 0; /* Put all the clients in the unblocked clients queue in order to * force the re-processing of the input buffer if any. */ listRewind(g_pserver->clients,&li); while ((ln = listNext(&li)) != NULL) { c = (client*)listNodeValue(ln); + if (!FCorrectThread(c)) + continue; /* Don't touch slaves and blocked clients. * The latter pending requests will be processed when unblocked. */ if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; queueClientForReprocessing(c); } - aeReleaseLock(); } - return g_pserver->clients_paused; } /* This function is called by Redis in order to process a few events from diff --git a/src/server.cpp b/src/server.cpp index 0f0dfe122..f01c8bb00 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1692,6 +1692,9 @@ void clientsCron(int iel) { fastlock_unlock(&c->lock); } } + + /* Free any pending clients */ + freeClientsInAsyncFreeQueue(iel); } /* This function handles 'background' operations we are required to do @@ -1812,6 +1815,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Update the time cache. */ updateCachedTime(); + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); + g_pserver->hz = g_pserver->config_hz; /* Adapt the g_pserver->hz value to the number of configured clients. If we have * many clients, we want to call serverCron() with an higher frequency. */ @@ -1819,7 +1825,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { while (listLength(g_pserver->clients) / g_pserver->hz > MAX_CLIENTS_PER_CLOCK_TICK) { - g_pserver->hz *= 2; + g_pserver->hz += g_pserver->hz; // *= 2 if (g_pserver->hz > CONFIG_MAX_HZ) { g_pserver->hz = CONFIG_MAX_HZ; break; @@ -2019,9 +2025,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Clear the paused clients flag if needed. */ - clientsArePaused(); /* Don't check return value, just use the side effect.*/ - /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); @@ -2078,6 +2081,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData { processUnblockedClients(iel); } + + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves clientsCron(iel); @@ -2871,6 +2877,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->cclients = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; + pvar->clients_paused = 0; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", @@ -2967,7 +2974,6 @@ void initServer(void) { g_pserver->ready_keys = listCreate(); g_pserver->clients_waiting_acks = listCreate(); g_pserver->get_ack_from_slaves = 0; - g_pserver->clients_paused = 0; cserver.system_memory_size = zmalloc_get_memory_size(); createSharedObjects(); @@ -4088,7 +4094,7 @@ sds genRedisInfoString(const char *section) { g_pserver->port, (intmax_t)uptime, (intmax_t)(uptime/(3600*24)), - g_pserver->hz, + g_pserver->hz.load(), g_pserver->config_hz, (unsigned long) lruclock, cserver.executable ? cserver.executable : "", diff --git a/src/server.h b/src/server.h index 276c939c3..29c2db301 100644 --- a/src/server.h +++ b/src/server.h @@ -1426,6 +1426,7 @@ struct redisServerThreadVars { aeEventLoop *el; int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ + int clients_paused; /* True if clients are currently paused */ std::vector clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; @@ -1518,7 +1519,7 @@ struct redisServer { int config_hz; /* Configured HZ value. May be different than the actual 'hz' field value if dynamic-hz is enabled. */ - int hz; /* serverCron() calls frequency in hertz */ + std::atomic hz; /* serverCron() calls frequency in hertz */ redisDb *db; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ @@ -1553,7 +1554,6 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ rax *clients_index; /* Active clients dictionary by client ID. */ - int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ std::atomic next_client_id; /* Next client unique ID. Incremental. */ @@ -2042,6 +2042,7 @@ void disconnectSlavesExcept(unsigned char *uuid); int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen); void pauseClients(mstime_t duration); int clientsArePaused(void); +void unpauseClientsIfNecessary(); int processEventsWhileBlocked(int iel); int handleClientsWithPendingWrites(int iel); int clientHasPendingReplies(client *c); diff --git a/tests/cluster/run.tcl b/tests/cluster/run.tcl index 93603ddc9..3d96e6c41 100644 --- a/tests/cluster/run.tcl +++ b/tests/cluster/run.tcl @@ -14,6 +14,8 @@ proc main {} { spawn_instance redis $::redis_base_port $::instances_count { "cluster-enabled yes" "appendonly yes" + "testmode yes" + "server-threads 3" } run_tests cleanup