From 96c825d300dbec6e44ca3da331e1623e2b930d17 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 00:15:47 +0000 Subject: [PATCH 1/6] Clients should initialize with an mvcc checkpoint Former-commit-id: 5183cb721774be1b769d130359e4bddb1eb8224b --- src/networking.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.cpp b/src/networking.cpp index b25eeaf71..b1ee4a9a8 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -200,7 +200,7 @@ client *createClient(connection *conn, int iel) { c->paused_list_node = NULL; c->client_tracking_redirection = 0; c->casyncOpsPending = 0; - c->mvccCheckpoint = 0; + c->mvccCheckpoint = getMvccTstamp(); c->master_error = 0; memset(c->uuid, 0, UUID_BINARY_LEN); From 34d7a95ea6d2af1fdf7c75042e96cd68554a155d Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 00:19:52 +0000 Subject: [PATCH 2/6] Create snapshots less often, and use them if they exist Former-commit-id: ba8a00074171d346813247de0b218e08c8f07b92 --- src/networking.cpp | 2 +- src/server.cpp | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index b1ee4a9a8..893b4a5c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2712,7 +2712,7 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || g_fTestMode)) + if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); if (!c->vecqueuedcmd.empty()) serverTL->vecclientsProcess.push_back(c); diff --git a/src/server.cpp b/src/server.cpp index eb50c18f5..ddc6af2e9 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2070,6 +2070,15 @@ int hash_spin_worker() { * rehashing. */ void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); + + /* end any snapshots created by fast async commands */ + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (serverTL->rgdbSnapshot[idb] != nullptr) { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + } + if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -2783,13 +2792,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); - for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (serverTL->rgdbSnapshot[idb] != nullptr) { - g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = nullptr; - } - } - size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > g_pserver->stat_peak_memory) g_pserver->stat_peak_memory = zmalloc_used; From bba9b516bfc0f6455d7827b6cf01fa66eb6e1069 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 05:24:28 +0000 Subject: [PATCH 3/6] Async commands incorrectly think they are dirty Former-commit-id: 4874247931425767156ab3da934c00a4d4832bcf --- src/server.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index ddc6af2e9..e78ec485d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4405,7 +4405,10 @@ void call(client *c, int flags) { serverTL->commandsExecuted++; const long duration = elapsedUs(call_timer); c->duration = duration; - dirty = g_pserver->dirty-dirty; + if (flags & CMD_CALL_ASYNC) + dirty = 0; // dirty is bogus in this case as there's no synchronization + else + dirty = g_pserver->dirty-dirty; if (dirty < 0) dirty = 0; if (dirty) From 99ff67283c901deedd26d808fb27edda5b336152 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 17:44:06 +0000 Subject: [PATCH 4/6] prefetch crashes intermittently when a snapshot exists Former-commit-id: 4a2657023d5d8218c815ce77c2676fd53a634b3f --- src/db.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index a87b64f8c..6afdfde9a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3160,6 +3160,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command if (command.argc >= 2) { const char *cmd = szFromObj(command.argv[0]); if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { + if (c->db->m_spdbSnapshotHOLDER != nullptr) + return; // this is dangerous enough without a snapshot around auto h = dictSdsHash(szFromObj(command.argv[1])); for (int iht = 0; iht < 2; ++iht) { auto hT = h & c->db->m_pdict->ht[iht].sizemask; From 3294e4f7884df12d7c88bc934431961b10781187 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 17:45:49 +0000 Subject: [PATCH 5/6] Move the snapshot stale threshold to a named constant Former-commit-id: 46d2aaf17abb0fb021aa6d7b393ffc143493d339 --- src/server.h | 3 +++ src/snapshot.cpp | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server.h b/src/server.h index 819690b62..482be14a3 100644 --- a/src/server.h +++ b/src/server.h @@ -1263,6 +1263,8 @@ public: // These need to be fixed using redisDbPersistentData::size; using redisDbPersistentData::expireSize; + + static const uint64_t msStaleThreshold = 500; }; /* Redis database representation. There are multiple databases identified @@ -1350,6 +1352,7 @@ public: long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ + uint64_t mvccLastSnapshot = 0; }; /* Declare database backup that include redis main DBs and slots to keys map. diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 795df287d..a13f5d9f5 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -654,9 +654,7 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const bool redisDbPersistentDataSnapshot::FStale() const { - // 0.5 seconds considered stale; - static const uint64_t msStale = 500; - return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale; + return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= redisDbPersistentDataSnapshot::msStaleThreshold; } void dictGCAsyncFree(dictAsyncRehashCtl *async) { From 154c9d1f79f47ed4b0ffaea7396760ab0d9928de Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 17:46:56 +0000 Subject: [PATCH 6/6] Rework the tuning to be better for mixed read/write workloads Former-commit-id: a4fdd3e3cb41160b20e92e1b1f4f4ebd2ee86a4a --- src/db.cpp | 8 ++++++-- src/networking.cpp | 13 +++++++++++-- src/server.cpp | 18 ++++++++++-------- src/server.h | 9 ++++++++- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 6afdfde9a..78f627643 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -220,12 +220,16 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) { AeLocker locker; locker.arm(serverTL->current_client); - if (serverTL->rgdbSnapshot[idb] != nullptr) + if (serverTL->rgdbSnapshot[idb] != nullptr) { db->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + serverTL->rgdbSnapshot[idb] = nullptr; + } else { + serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + } if (serverTL->rgdbSnapshot[idb] == nullptr) { // We still need to service the read o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE); + serverTL->disable_async_commands = true; // don't try this again } } if (serverTL->rgdbSnapshot[idb] != nullptr) { diff --git a/src/networking.cpp b/src/networking.cpp index 893b4a5c0..60f446837 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2712,8 +2712,17 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) - processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { + // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often + // so we exclude them unless the snapshot we need already exists + bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint; + bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2); + + // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it + if (!fWriteTooRecent || fSnapshotExists) { + processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + } + } if (!c->vecqueuedcmd.empty()) serverTL->vecclientsProcess.push_back(c); } else { diff --git a/src/server.cpp b/src/server.cpp index e78ec485d..1e1e67e68 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2071,14 +2071,6 @@ int hash_spin_worker() { void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); - /* end any snapshots created by fast async commands */ - for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (serverTL->rgdbSnapshot[idb] != nullptr) { - g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = nullptr; - } - } - if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -2792,6 +2784,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + /* end any snapshots created by fast async commands */ + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (serverTL->rgdbSnapshot[idb] != nullptr) { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + } + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > g_pserver->stat_peak_memory) g_pserver->stat_peak_memory = zmalloc_used; @@ -2993,6 +2993,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->trackChanges(false); + + serverTL->disable_async_commands = false; } } diff --git a/src/server.h b/src/server.h index 482be14a3..0d0b0de9d 100644 --- a/src/server.h +++ b/src/server.h @@ -1333,7 +1333,6 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::commitChanges; using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpire; - using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::restoreSnapshot; using redisDbPersistentData::removeAllCachedValues; @@ -1344,6 +1343,13 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::FRehashing; public: + const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { + auto psnapshot = redisDbPersistentData::createSnapshot(mvccCheckpoint, fOptional); + if (psnapshot != nullptr) + mvccLastSnapshot = psnapshot->mvccCheckpoint(); + return psnapshot; + } + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -2026,6 +2032,7 @@ struct redisServerThreadVars { bool fRetrySetAofEvent = false; bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ + bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ std::vector vecclientsProcess; dictAsyncRehashCtl *rehashCtl = nullptr;