From 2194d9147682975e7f82539ce2f119bdccc8969f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 02:44:14 -0400 Subject: [PATCH 01/15] Fix potential race in pubsub Former-commit-id: 427c5999f167256dc3a6a0f49ad28313f700a155 --- src/pubsub.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 46677487f..0c657a588 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -301,9 +301,11 @@ int pubsubPublishMessage(robj *channel, robj *message) { client *c = reinterpret_cast(ln->value); if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored continue; - fastlock_lock(&c->lock); + if (FCorrectThread(c)) + fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); - fastlock_unlock(&c->lock); + if (FCorrectThread(c)) + fastlock_unlock(&c->lock); receivers++; } } @@ -321,10 +323,12 @@ int pubsubPublishMessage(robj *channel, robj *message) { { if (pat->pclient->flags & CLIENT_CLOSE_ASAP) continue; - fastlock_lock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); - fastlock_unlock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_unlock(&pat->pclient->lock); receivers++; } } From 75b22fafc2b74f935b3117cb3baf5360b413941e Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 4 Nov 2019 17:31:29 -0800 Subject: [PATCH 02/15] Fix test failure with tcl8.5 (test only issue) Former-commit-id: 5e40ea6ee7f0f34a9e11fdd7518d81383dd73c41 --- tests/unit/bitops.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/bitops.tcl b/tests/unit/bitops.tcl index f8a5cbe18..a280721f0 100644 --- a/tests/unit/bitops.tcl +++ b/tests/unit/bitops.tcl @@ -238,7 +238,7 @@ start_server {tags {"bitops"}} { r set a "abcdefg" r bitop lshift x a 8 r get x - } "\x00abcdefg" + } "\000abcdefg" test {BITOP lshift char} { r set a "\xAA" From 451c6e81f4c4a83160cb14f5556a334c61eaff26 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:34:13 -0500 Subject: [PATCH 03/15] Improve AE Assert message Former-commit-id: cb0fc7cca2406cf24fc238d6b6e1247c60d86704 --- src/ae.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index 0deec264f..87212f704 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -84,7 +84,7 @@ fastlock g_lock("AE (global)"); #endif thread_local aeEventLoop *g_eventLoopThisThread = NULL; -#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0) +#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0) /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ From 93844525eafa8986f9f20109116cf6df0001bf92 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:49:32 -0500 Subject: [PATCH 04/15] killing clients should take effect ASAP Former-commit-id: d0ccb074d5451cd457fe88efeb007cdb9746cb7f --- src/networking.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index d32912bd8..a14994fbc 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2521,9 +2521,17 @@ NULL close_this_client = 1; } else { if (FCorrectThread(client)) + { freeClient(client); + } else + { + int iel = client->iel; freeClientAsync(client); + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + freeClientsInAsyncFreeQueue(iel); + }); + } } killed++; } From b8cc7e2b9c9223de33c1d6513decb80261998461 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:57:29 -0500 Subject: [PATCH 05/15] Debug sleep should apply to all threads Former-commit-id: 41b678814b2c2ff93935b57e630028aaf2e9ae62 --- src/debug.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/debug.cpp b/src/debug.cpp index 234f197be..9854d0fd4 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -595,9 +595,19 @@ NULL double dtime = strtod(szFromObj(c->argv[2]),NULL); long long utime = dtime*1000000; struct timespec tv; - tv.tv_sec = utime / 1000000; tv.tv_nsec = (utime % 1000000) * 1000; + + // Ensure all threads sleep + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (iel == ielFromEventLoop(serverTL->el)) + continue; // we will sleep ourselves below + aePostFunction(g_pserver->rgthreadvar[iel].el, [tv]{ + nanosleep(&tv, NULL); + }); + } + nanosleep(&tv, NULL); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"set-active-expire") && From a098681bbf13f382145945d946544ac6d11b8055 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 20:14:24 -0500 Subject: [PATCH 06/15] Cluster multithreading fixes Former-commit-id: 3dd78a6101df0a980e520dcb55d80651bfc5a3a7 --- src/cluster.cpp | 38 ++++++++++++++++++++++++++++++++------ src/networking.cpp | 24 +++++++++++++++++------- src/server.cpp | 18 ++++++++++++------ src/server.h | 5 +++-- tests/cluster/run.tcl | 2 ++ 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 6ba5cfef7..3fd513221 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) { * This function will just make sure that the original node associated * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { + if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN) + { + // we can't perform this operation on this thread, queue it on the main thread + if (link->node) + link->node->link = NULL; + link->node = nullptr; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + freeClusterLink(link); + }); + return; + } if (link->fd != -1) { aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE); } @@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) { * consumed by write(). We don't try to optimize this for speed too much * as this is a very low traffic channel. */ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN); clusterLink *link = (clusterLink*) privdata; ssize_t nwritten; UNUSED(el); UNUSED(mask); - nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); + // We're about to release the lock, so the link's sndbuf needs to be owned fully by us + // allocate a new one in case anyone tries to write while we're waiting + sds sndbuf = link->sndbuf; + link->sndbuf = sdsempty(); + + aeReleaseLock(); + nwritten = write(fd, sndbuf, sdslen(sndbuf)); + aeAcquireLock(); + if (nwritten <= 0) { serverLog(LL_DEBUG,"I/O error writing to node link: %s", (nwritten == -1) ? strerror(errno) : "short write"); + sdsfree(sndbuf); handleLinkIOError(link); return; } - sdsrange(link->sndbuf,nwritten,-1); + sdsrange(sndbuf,nwritten,-1); + // Restore our send buffer, ensuring any unsent data is first + sndbuf = sdscat(sndbuf, link->sndbuf); + sdsfree(link->sndbuf); + link->sndbuf = sndbuf; if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE); + aeDeleteFileEvent(el, link->fd, AE_WRITABLE); } /* Read data. Try to read the first field of the header first to check the @@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * the link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { + serverAssert(GlobalLocksAcquired()); if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, - clusterWriteHandler,link); + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, + clusterWriteHandler,link,false); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); @@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) { void resetManualFailover(void) { if (g_pserver->cluster->mf_end && clientsArePaused()) { g_pserver->clients_pause_end_time = 0; - clientsArePaused(); /* Just use the side effect of the function. */ + unpauseClientsIfNecessary(); } g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */ g_pserver->cluster->mf_can_start = 0; diff --git a/src/networking.cpp b/src/networking.cpp index a14994fbc..ef3ee2683 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2960,38 +2960,48 @@ void flushSlavesOutputBuffers(void) { * than the time left for the previous pause, no change is made to the * left duration. */ void pauseClients(mstime_t end) { - if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time) + serverAssert(GlobalLocksAcquired()); + if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time) g_pserver->clients_pause_end_time = end; - g_pserver->clients_paused = 1; + + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].clients_paused = true; + } } /* Return non-zero if clients are currently paused. As a side effect the * function checks if the pause time was reached and clear it. */ int clientsArePaused(void) { - if (g_pserver->clients_paused && + return serverTL->clients_paused; +} + +void unpauseClientsIfNecessary() +{ + serverAssert(GlobalLocksAcquired()); + if (serverTL->clients_paused && g_pserver->clients_pause_end_time < g_pserver->mstime) { - aeAcquireLock(); listNode *ln; listIter li; client *c; - g_pserver->clients_paused = 0; + serverTL->clients_paused = 0; /* Put all the clients in the unblocked clients queue in order to * force the re-processing of the input buffer if any. */ listRewind(g_pserver->clients,&li); while ((ln = listNext(&li)) != NULL) { c = (client*)listNodeValue(ln); + if (!FCorrectThread(c)) + continue; /* Don't touch slaves and blocked clients. * The latter pending requests will be processed when unblocked. */ if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; queueClientForReprocessing(c); } - aeReleaseLock(); } - return g_pserver->clients_paused; } /* This function is called by Redis in order to process a few events from diff --git a/src/server.cpp b/src/server.cpp index 0f0dfe122..f01c8bb00 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1692,6 +1692,9 @@ void clientsCron(int iel) { fastlock_unlock(&c->lock); } } + + /* Free any pending clients */ + freeClientsInAsyncFreeQueue(iel); } /* This function handles 'background' operations we are required to do @@ -1812,6 +1815,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Update the time cache. */ updateCachedTime(); + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); + g_pserver->hz = g_pserver->config_hz; /* Adapt the g_pserver->hz value to the number of configured clients. If we have * many clients, we want to call serverCron() with an higher frequency. */ @@ -1819,7 +1825,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { while (listLength(g_pserver->clients) / g_pserver->hz > MAX_CLIENTS_PER_CLOCK_TICK) { - g_pserver->hz *= 2; + g_pserver->hz += g_pserver->hz; // *= 2 if (g_pserver->hz > CONFIG_MAX_HZ) { g_pserver->hz = CONFIG_MAX_HZ; break; @@ -2019,9 +2025,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Clear the paused clients flag if needed. */ - clientsArePaused(); /* Don't check return value, just use the side effect.*/ - /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); @@ -2078,6 +2081,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData { processUnblockedClients(iel); } + + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves clientsCron(iel); @@ -2871,6 +2877,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->cclients = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; + pvar->clients_paused = 0; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", @@ -2967,7 +2974,6 @@ void initServer(void) { g_pserver->ready_keys = listCreate(); g_pserver->clients_waiting_acks = listCreate(); g_pserver->get_ack_from_slaves = 0; - g_pserver->clients_paused = 0; cserver.system_memory_size = zmalloc_get_memory_size(); createSharedObjects(); @@ -4088,7 +4094,7 @@ sds genRedisInfoString(const char *section) { g_pserver->port, (intmax_t)uptime, (intmax_t)(uptime/(3600*24)), - g_pserver->hz, + g_pserver->hz.load(), g_pserver->config_hz, (unsigned long) lruclock, cserver.executable ? cserver.executable : "", diff --git a/src/server.h b/src/server.h index 276c939c3..29c2db301 100644 --- a/src/server.h +++ b/src/server.h @@ -1426,6 +1426,7 @@ struct redisServerThreadVars { aeEventLoop *el; int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ + int clients_paused; /* True if clients are currently paused */ std::vector clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; @@ -1518,7 +1519,7 @@ struct redisServer { int config_hz; /* Configured HZ value. May be different than the actual 'hz' field value if dynamic-hz is enabled. */ - int hz; /* serverCron() calls frequency in hertz */ + std::atomic hz; /* serverCron() calls frequency in hertz */ redisDb *db; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ @@ -1553,7 +1554,6 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ rax *clients_index; /* Active clients dictionary by client ID. */ - int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ std::atomic next_client_id; /* Next client unique ID. Incremental. */ @@ -2042,6 +2042,7 @@ void disconnectSlavesExcept(unsigned char *uuid); int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen); void pauseClients(mstime_t duration); int clientsArePaused(void); +void unpauseClientsIfNecessary(); int processEventsWhileBlocked(int iel); int handleClientsWithPendingWrites(int iel); int clientHasPendingReplies(client *c); diff --git a/tests/cluster/run.tcl b/tests/cluster/run.tcl index 93603ddc9..3d96e6c41 100644 --- a/tests/cluster/run.tcl +++ b/tests/cluster/run.tcl @@ -14,6 +14,8 @@ proc main {} { spawn_instance redis $::redis_base_port $::instances_count { "cluster-enabled yes" "appendonly yes" + "testmode yes" + "server-threads 3" } run_tests cleanup From baffeff5c7bdb5797475c6c33e429e3c0af3cf42 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 14:52:12 -0500 Subject: [PATCH 07/15] Improve perf of reading cluster bitfield Former-commit-id: 9371c005aa7ffc2060b1b787e4268bc25336ca15 --- src/cluster.cpp | 91 ++++++++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 3fd513221..f6a6e03dc 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4180,57 +4180,76 @@ void clusterReplyMultiBulkSlots(client *c) { dictIterator *di = dictGetSafeIterator(g_pserver->cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = (clusterNode*)dictGetVal(de); - int j = 0, start = -1; + int start = -1; /* Skip slaves (that are iterated when producing the output of their * master) and masters not serving any slot. */ if (!nodeIsMaster(node) || node->numslots == 0) continue; + + static_assert((CLUSTER_SLOTS % (sizeof(uint32_t)*8)) == 0, "code below assumes the bitfield is a multiple of sizeof(unsinged)"); - for (j = 0; j < CLUSTER_SLOTS; j++) { - int bit, i; - - if ((bit = clusterNodeGetSlotBit(node,j)) != 0) { - if (start == -1) start = j; + for (unsigned iw = 0; iw < (CLUSTER_SLOTS/sizeof(uint32_t)/8); ++iw) + { + uint32_t wordCur = reinterpret_cast(node->slots)[iw]; + if (iw != ((CLUSTER_SLOTS/sizeof(uint32_t)/8)-1)) + { + if (start == -1 && wordCur == 0) + continue; + if (start != -1 && (wordCur+1)==0) + continue; } - if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { - int nested_elements = 3; /* slots (2) + master addr (1). */ - void *nested_replylen = addReplyDeferredLen(c); - if (bit && j == CLUSTER_SLOTS-1) j++; - - /* If slot exists in output map, add to it's list. - * else, create a new output map for this slot */ - if (start == j-1) { - addReplyLongLong(c, start); /* only one slot; low==high */ - addReplyLongLong(c, start); - } else { - addReplyLongLong(c, start); /* low */ - addReplyLongLong(c, j-1); /* high */ + unsigned ibitStartLoop = iw*sizeof(uint32_t)*8; + + for (unsigned j = ibitStartLoop; j < (iw+1)*sizeof(uint32_t)*8; j++) { + int i; + int bit = (int)(wordCur & 1); + wordCur >>= 1; + if (bit != 0) { + if (start == -1) start = j; } - start = -1; + if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { + int nested_elements = 3; /* slots (2) + master addr (1). */ + void *nested_replylen = addReplyDeferredLen(c); - /* First node reply position is always the master */ - addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->ip); - addReplyLongLong(c, node->port); - addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + if (bit && j == CLUSTER_SLOTS-1) j++; - /* Remaining nodes in reply are replicas for slot range */ - for (i = 0; i < node->numslaves; i++) { - /* This loop is copy/pasted from clusterGenNodeDescription() - * with modifications for per-slot node aggregation */ - if (nodeFailed(node->slaves[i])) continue; + /* If slot exists in output map, add to it's list. + * else, create a new output map for this slot */ + if (start == j-1) { + addReplyLongLong(c, start); /* only one slot; low==high */ + addReplyLongLong(c, start); + } else { + addReplyLongLong(c, start); /* low */ + addReplyLongLong(c, j-1); /* high */ + } + start = -1; + + /* First node reply position is always the master */ addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->slaves[i]->ip); - addReplyLongLong(c, node->slaves[i]->port); - addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN); - nested_elements++; + addReplyBulkCString(c, node->ip); + addReplyLongLong(c, node->port); + addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + + /* Remaining nodes in reply are replicas for slot range */ + for (i = 0; i < node->numslaves; i++) { + /* This loop is copy/pasted from clusterGenNodeDescription() + * with modifications for per-slot node aggregation */ + if (nodeFailed(node->slaves[i])) continue; + addReplyArrayLen(c, 3); + addReplyBulkCString(c, node->slaves[i]->ip); + addReplyLongLong(c, node->slaves[i]->port); + addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN); + nested_elements++; + } + setDeferredArrayLen(c, nested_replylen, nested_elements); + num_masters++; } - setDeferredArrayLen(c, nested_replylen, nested_elements); - num_masters++; } } + serverAssert(start == -1); } + dictReleaseIterator(di); setDeferredArrayLen(c, slot_replylen, num_masters); } From a27c1c9f0a09b614309c08c67eb238c3e4ebcb26 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 15:39:47 -0500 Subject: [PATCH 08/15] Revert "Debug sleep should apply to all threads" This reverts commit b8cc7e2b9c9223de33c1d6513decb80261998461 [formerly 41b678814b2c2ff93935b57e630028aaf2e9ae62]. Former-commit-id: 3ae75c2d2bd952d0a075b9ba257a08f962fe0739 --- src/debug.cpp | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 9854d0fd4..234f197be 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -595,19 +595,9 @@ NULL double dtime = strtod(szFromObj(c->argv[2]),NULL); long long utime = dtime*1000000; struct timespec tv; + tv.tv_sec = utime / 1000000; tv.tv_nsec = (utime % 1000000) * 1000; - - // Ensure all threads sleep - for (int iel = 0; iel < cserver.cthreads; ++iel) - { - if (iel == ielFromEventLoop(serverTL->el)) - continue; // we will sleep ourselves below - aePostFunction(g_pserver->rgthreadvar[iel].el, [tv]{ - nanosleep(&tv, NULL); - }); - } - nanosleep(&tv, NULL); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"set-active-expire") && From 72bbf16c2f36a9188e8a5a54fd878d60931d0f54 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 16:06:49 -0500 Subject: [PATCH 09/15] Fix failure to wakeup from futex sleep due to fastlock_unlock reading the wrong offset in the asm version. Also fix false sharing in spinlock Former-commit-id: 4c8603815cf525c75dcc360fddeab9ca6fe70ae6 --- src/fastlock.cpp | 5 +++- src/fastlock.h | 10 +++++--- src/fastlock_x64.asm | 54 ++++++++++++++++++++++++-------------------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index c74ca8358..4a64b20fc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -258,7 +258,10 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name) lock->m_depth = 0; lock->m_pidOwner = -1; lock->futex = 0; - lock->szName = name; + int cch = strlen(name); + cch = std::min(cch, sizeof(lock->szName)-1); + memcpy(lock->szName, name, cch); + lock->szName[cch] = '\0'; ANNOTATE_RWLOCK_CREATE(lock); } diff --git a/src/fastlock.h b/src/fastlock.h index 0117049a6..44c09ac17 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -1,5 +1,6 @@ #pragma once #include +#include #ifdef __cplusplus extern "C" { @@ -40,12 +41,13 @@ struct ticket struct fastlock { - volatile struct ticket m_ticket; - volatile int m_pidOwner; volatile int m_depth; + char szName[56]; + /* Volatile data on seperate cache line */ + volatile struct ticket m_ticket; unsigned futex; - const char *szName; + char padding[56]; // ensure ticket and futex are on their own independent cache line #ifdef __cplusplus fastlock(const char *name) @@ -81,3 +83,5 @@ struct fastlock bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only #endif }; + +static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct"); \ No newline at end of file diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 6c9df490e..80791a6de 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -16,10 +16,11 @@ fastlock_lock: .cfi_startproc .cfi_def_cfa rsp, 8 # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail # First get our TID and put it in ecx push rdi # we need our struct pointer (also balance the stack for the call) @@ -29,18 +30,18 @@ fastlock_lock: pop rdi # get our pointer back .cfi_adjust_cfa_offset -8 - cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LLocked # Don't spin in that case xor eax, eax # eliminate partial register dependency inc eax # we want to add one - lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition + lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition # ax now contains the ticket # OK Start the wait loop xor ecx, ecx .ALIGN 16 .LLoop: - mov edx, [rdi] + mov edx, [rdi+64] cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause @@ -72,8 +73,8 @@ fastlock_lock: jmp .LLoop # Get back in the game .ALIGN 16 .LLocked: - mov [rdi+4], esi # lock->m_pidOwner = gettid() - inc dword ptr [rdi+8] # lock->m_depth++ + mov [rdi], esi # lock->m_pidOwner = gettid() + inc dword ptr [rdi+4] # lock->m_depth++ ret .cfi_endproc @@ -82,10 +83,11 @@ fastlock_lock: .type fastlock_trylock,@function fastlock_trylock: # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail # First get our TID and put it in ecx push rdi # we need our struct pointer (also balance the stack for the call) @@ -93,29 +95,29 @@ fastlock_trylock: mov esi, eax # back it up in esi pop rdi # get our pointer back - cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LRecursive # Don't spin in that case - mov eax, [rdi] # get both active and avail counters + mov eax, [rdi+64] # get both active and avail counters mov ecx, eax # duplicate in ecx ror ecx, 16 # swap upper and lower 16-bits cmp eax, ecx # are the upper and lower 16-bits the same? jnz .LAlreadyLocked # If not return failure # at this point we know eax+ecx have [avail][active] and they are both the same - add ecx, 0x10000 # increment avail, ecx is now our wanted value - lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail) - jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing + add ecx, 0x10000 # increment avail, ecx is now our wanted value + lock cmpxchg [rdi+64], ecx # If rdi still contains the value in eax, put in ecx (inc avail) + jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing xor eax, eax - inc eax # return SUCCESS! (eax=1) - mov [rdi+4], esi # lock->m_pidOwner = gettid() - mov dword ptr [rdi+8], eax # lock->m_depth = 1 + inc eax # return SUCCESS! (eax=1) + mov [rdi], esi # lock->m_pidOwner = gettid() + mov dword ptr [rdi+4], eax # lock->m_depth = 1 ret .ALIGN 16 .LRecursive: xor eax, eax inc eax # return SUCCESS! (eax=1) - inc dword ptr [rdi+8] # lock->m_depth++ + inc dword ptr [rdi+4] # lock->m_depth++ ret .ALIGN 16 .LAlreadyLocked: @@ -126,23 +128,25 @@ fastlock_trylock: .global fastlock_unlock fastlock_unlock: # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail push r11 - sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state + sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it - mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi] # get current active (this one) + mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore) + mov ecx, [rdi+64] # get current active (this one) inc ecx # bump it to the next thread - mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) # At this point the lock is removed, however we must wake up any pending futexs mov r9d, 1 # eax is the bitmask for 2 threads rol r9d, cl # place the mask in the right spot for the next 2 threads + add rdi, 64 # rdi now points to the token .ALIGN 16 .LRetryWake: - mov r11d, [rdi+12] # load the futex mask + mov r11d, [rdi+4] # load the futex mask and r11d, r9d # are any threads waiting on a futex? jz .LDone # if not we're done. # we have to wake the futexs From 465b35ec523c15eca69336ab4866ef1e9f030124 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 16:24:23 -0500 Subject: [PATCH 10/15] Update issue templates Former-commit-id: becd3fc5a34d9c7d665531290c017f13d1b03c16 --- .github/ISSUE_TEMPLATE/bug_report.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 000000000..91b34ebe0 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,20 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: '' +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +** Log Files ** +These should be KeyDB logs, not syslogs or logs from your container manager. If you are reporting a crash there will be a line in your log stating: +"=== KEYDB BUG REPORT START: Cut & paste starting from here ===" + +Please copy everything after this line. + +**To Reproduce** +Do you know how to reproduce this? If so please provide repro steps. From 852206db90cbe29b15c0d569f1bc9eb1799757e7 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 16:25:33 -0500 Subject: [PATCH 11/15] Update issue templates Former-commit-id: 3ea56ffb38efd4fbfbb096481668be69cc61b15f --- .github/ISSUE_TEMPLATE/feature_request.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 000000000..bbcbbe7d6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: '' +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. From f136492c45650332df2d46cdf0a18b254e1673fa Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 17:00:40 -0500 Subject: [PATCH 12/15] Additional asserts ensuring the client is creating on the correct thread Former-commit-id: 937702ea1dd0a4331dd7c66ee9f5c2c3f2354d10 --- src/networking.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.cpp b/src/networking.cpp index ef3ee2683..97744c410 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -99,6 +99,7 @@ client *createClient(int fd, int iel) { * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { + serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (cserver.tcpkeepalive) From 633b7398e1d039862c0a8d22a1d864778385598d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 19:44:31 -0500 Subject: [PATCH 13/15] Fix issue #107, active replicas do their own expires Former-commit-id: 8e4f323439df29a5e8c0de9db7a848291721fd07 --- src/db.cpp | 6 ++++-- src/expire.cpp | 2 +- src/replication.cpp | 23 +++++++++++++++++++++-- src/server.cpp | 6 +++--- src/server.h | 1 + tests/integration/replication-active.tcl | 21 +++++++++++++++++++++ 6 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index d8aa0dc47..605baaf40 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1374,7 +1374,9 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { if (g_pserver->aof_state != AOF_OFF) feedAppendOnlyFile(cserver.delCommand,db->id,argv,2); - replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); decrRefCount(argv[0]); decrRefCount(argv[1]); @@ -1442,7 +1444,7 @@ int expireIfNeeded(redisDb *db, robj *key) { * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ - if (listLength(g_pserver->masters)) return 1; + if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1; /* Delete the key */ g_pserver->stat_expiredkeys++; diff --git a/src/expire.cpp b/src/expire.cpp index 62b5e7a6e..fdad83638 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -535,7 +535,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ - if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) { + if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) { robj *aux; int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : diff --git a/src/replication.cpp b/src/replication.cpp index d2f948567..89f73eb06 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -669,6 +669,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ @@ -1002,6 +1003,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_EOF; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2")) c->slave_capa |= SLAVE_CAPA_PSYNC2; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) + c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1071,6 +1074,14 @@ void putSlaveOnline(client *replica) { refreshGoodSlavesCount(); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(replica)); + + if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica) + { + serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations." + "\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica)); + serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions." + "\n\tPlease fix your replica topology"); + } } void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -2094,8 +2105,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", - "capa","eof","capa","psync2",NULL); + if (g_pserver->fActiveReplica) + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); + } + else + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2",NULL); + } if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; diff --git a/src/server.cpp b/src/server.cpp index f01c8bb00..6fc44f387 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1703,9 +1703,9 @@ void clientsCron(int iel) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) { + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (listLength(g_pserver->masters)) { + } else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { expireSlaveKeys(); } @@ -2105,7 +2105,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked diff --git a/src/server.h b/src/server.h index 29c2db301..81a5da8c3 100644 --- a/src/server.h +++ b/src/server.h @@ -433,6 +433,7 @@ public: #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ +#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 2ba761766..4f37f2adf 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] + set master_pid [s process_id] # Use a short replication timeout on the slave, so that if there # are no bugs the timeout is triggered in a reasonable amount @@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid + after 3000 + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info + assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} + } + exec kill -SIGCONT $slave_pid + test {Active replica different databases} { $master select 3 $master set testkey abcd From 88dcedc6e9a99116ce4cd57d098fa99908efb47e Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 20:35:14 -0500 Subject: [PATCH 14/15] Add missing dependency to dockerfile Former-commit-id: 39c6fbddb1dc6738c3ea7cbc2bd5d5bc7fb46ccf --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 931f30d59..d3cea108e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ FROM ubuntu:18.04 RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -qqy \ - build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev \ + build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev libcurl4-openssl-dev \ && apt-get clean CMD make From 3668715ba3869e3fd10144fbbb6e66b2c720b004 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Nov 2019 20:05:16 -0500 Subject: [PATCH 15/15] Fix issue #83 Former-commit-id: 3028a890ef11cd99b2c7538de0f480d2466eb150 --- src/object.cpp | 28 +++++++++++++ src/rdb.cpp | 9 +++- src/replication.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++- src/server.cpp | 1 + src/server.h | 90 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 224 insertions(+), 4 deletions(-) diff --git a/src/object.cpp b/src/object.cpp index e7f053a24..2a9c3f215 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) { return C_OK; } +int getUnsignedLongLongFromObject(robj *o, uint64_t *target) { + uint64_t value; + + if (o == NULL) { + value = 0; + } else { + serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); + if (sdsEncodedObject(o)) { + char *pchEnd = nullptr; + errno = 0; + value = strtoull(szFromObj(o), &pchEnd, 10); + if (value == 0) { + // potential error + if (errno != 0) + return C_ERR; + if (pchEnd == szFromObj(o)) + return C_ERR; + } + } else if (o->encoding == OBJ_ENCODING_INT) { + value = (long)ptrFromObj(o); + } else { + serverPanic("Unknown string encoding"); + } + } + if (target) *target = value; + return C_OK; +} + int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) { long long value; if (getLongLongFromObject(o, &value) != C_OK) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 476b2e2e4..1c5b25d16 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2115,12 +2115,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; + bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) { + bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now; + if (fStaleMvccKey || fExpiredKey) { + if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) { + // We have a key that we've already deleted and is not back in our database. + // We'll need to inform the sending master of the delete if it is also a replica of us + rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key); + } decrRefCount(key); key = nullptr; decrRefCount(val); diff --git a/src/replication.cpp b/src/replication.cpp index 89f73eb06..c912abba8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd); void replicationSendAck(redisMaster *mi); void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); +static void propagateMasterStaleKeys(); /* --------------------------- Utility functions ---------------------------- */ @@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters() return false; } +client *replicaFromMaster(redisMaster *mi) +{ + if (mi->master == nullptr) + return nullptr; + + listIter liReplica; + listNode *lnReplica; + listRewind(g_pserver->slaves, &liReplica); + while ((lnReplica = listNext(&liReplica)) != nullptr) + { + client *replica = (client*)listNodeValue(lnReplica); + if (FSameHost(mi->master, replica)) + return replica; + } + return nullptr; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char uuid[40] = {'\0'}; uuid_unparse(cserver.uuid, uuid); char proto[1024]; - int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid); + int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); + int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); + cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that + + char szMvcc[128]; + uint64_t mvccTstamp = getMvccTstamp(); + int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); + int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); + cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); + feedReplicationBacklog(szMvcc, cchMvcc); } } @@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { { addReplyAsync(replica,shared.crlf); addReplyProtoAsync(replica, szDbNum, cchDbNum); + addReplyProtoAsync(replica, szMvcc, cchMvcc); } } @@ -1587,6 +1615,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (g_pserver->fActiveReplica) + { + rsi.mvccMinThreshold = mi->mvccLastSync; + if (mi->staleKeyMap != nullptr) + mi->staleKeyMap->clear(); + else + mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); + rsi.mi = mi; + } if (rdbLoadFile(rdb_filename, &rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(mi); @@ -2382,6 +2419,7 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + delete mi->staleKeyMap; zfree(mi); } @@ -3215,6 +3253,8 @@ void replicationCron(void) { } } + propagateMasterStaleKeys(); + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ @@ -3361,6 +3401,17 @@ void replicaReplayCommand(client *c) } } + uint64_t mvcc = 0; + if (c->argc >= 5) + { + if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK) + { + addReplyError(c, "Invalid MVCC Timestamp"); + s_pstate->Cancel(); + return; + } + } + if (FSameUuidNoNil(uuid, cserver.uuid)) { addReply(c, shared.ok); @@ -3387,6 +3438,11 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); + redisMaster *mi = MasterInfoFromClient(c); + if (mi != nullptr) // this should never be null but I'd prefer not to crash + { + mi->mvccLastSync = mvcc; + } } else { @@ -3421,3 +3477,43 @@ void updateMasterAuth() mi->masteruser = zstrdup(cserver.default_masteruser); } } + +static void propagateMasterStaleKeys() +{ + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + robj *rgobj[2]; + + rgobj[0] = createEmbeddedStringObject("DEL", 3); + + while ((ln = listNext(&li)) != nullptr) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->staleKeyMap != nullptr) + { + if (mi->master != nullptr) + { + for (auto &pair : *mi->staleKeyMap) + { + if (pair.second.empty()) + continue; + + client *replica = replicaFromMaster(mi); + if (replica == nullptr) + continue; + + for (auto &spkey : pair.second) + { + rgobj[1] = spkey.get(); + replicationFeedSlave(replica, pair.first, rgobj, 2, false); + } + } + delete mi->staleKeyMap; + mi->staleKeyMap = nullptr; + } + } + } + + decrRefCount(rgobj[0]); +} \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 6fc44f387..3c8c588cd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2319,6 +2319,7 @@ void initMasterInfo(redisMaster *master) master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ + master->mvccLastSync = 0; } void initServerConfig(void) { diff --git a/src/server.h b/src/server.h index 81a5da8c3..ad7f2826c 100644 --- a/src/server.h +++ b/src/server.h @@ -54,6 +54,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -144,6 +145,87 @@ public: } }; +void decrRefCount(robj_roptr o); +void incrRefCount(robj_roptr o); +class robj_sharedptr +{ + redisObject *m_ptr; + +public: + robj_sharedptr() + : m_ptr(nullptr) + {} + robj_sharedptr(redisObject *ptr) + : m_ptr(ptr) + { + incrRefCount(ptr); + } + ~robj_sharedptr() + { + if (m_ptr) + decrRefCount(m_ptr); + } + robj_sharedptr(const robj_sharedptr& other) + { + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + } + + robj_sharedptr(robj_sharedptr&& other) + { + m_ptr = other.m_ptr; + other.m_ptr = nullptr; + } + + robj_sharedptr &operator=(const robj_sharedptr& other) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + return *this; + } + robj_sharedptr &operator=(redisObject *ptr) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = ptr; + incrRefCount(m_ptr); + return *this; + } + + bool operator==(const robj_sharedptr &other) const + { + return m_ptr == other.m_ptr; + } + + bool operator!=(const robj_sharedptr &other) const + { + return m_ptr != other.m_ptr; + } + + redisObject* operator->() const + { + return m_ptr; + } + + bool operator!() const + { + return !m_ptr; + } + + operator bool() const{ + return !!m_ptr; + } + + operator redisObject *() + { + return (redisObject*)m_ptr; + } + + redisObject *get() { return m_ptr; } +}; + /* Error codes */ #define C_OK 0 #define C_ERR -1 @@ -1391,9 +1473,11 @@ typedef struct rdbSaveInfo { char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ long long repl_offset; /* Replication offset. */ int fForceSetKey; + uint64_t mvccMinThreshold; + struct redisMaster *mi; } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} +#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, nullptr} struct malloc_stats { size_t zmalloc_used; @@ -1467,6 +1551,9 @@ struct redisMaster { unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ + uint64_t mvccLastSync; + /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ + std::map> *staleKeyMap; }; // Const vars are not changed after worker threads are launched @@ -2156,6 +2243,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg); int getDoubleFromObject(const robj *o, double *target); int getLongLongFromObject(robj *o, long long *target); +int getUnsignedLongLongFromObject(robj *o, uint64_t *target); int getLongDoubleFromObject(robj *o, long double *target); int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg); const char *strEncoding(int encoding);