From 154c9d1f79f47ed4b0ffaea7396760ab0d9928de Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Sep 2021 17:46:56 +0000 Subject: [PATCH] Rework the tuning to be better for mixed read/write workloads Former-commit-id: a4fdd3e3cb41160b20e92e1b1f4f4ebd2ee86a4a --- src/db.cpp | 8 ++++++-- src/networking.cpp | 13 +++++++++++-- src/server.cpp | 18 ++++++++++-------- src/server.h | 9 ++++++++- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 6afdfde9a..78f627643 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -220,12 +220,16 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) { AeLocker locker; locker.arm(serverTL->current_client); - if (serverTL->rgdbSnapshot[idb] != nullptr) + if (serverTL->rgdbSnapshot[idb] != nullptr) { db->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + serverTL->rgdbSnapshot[idb] = nullptr; + } else { + serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + } if (serverTL->rgdbSnapshot[idb] == nullptr) { // We still need to service the read o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE); + serverTL->disable_async_commands = true; // don't try this again } } if (serverTL->rgdbSnapshot[idb] != nullptr) { diff --git a/src/networking.cpp b/src/networking.cpp index 893b4a5c0..60f446837 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2712,8 +2712,17 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) - processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { + // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often + // so we exclude them unless the snapshot we need already exists + bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint; + bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2); + + // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it + if (!fWriteTooRecent || fSnapshotExists) { + processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + } + } if (!c->vecqueuedcmd.empty()) serverTL->vecclientsProcess.push_back(c); } else { diff --git a/src/server.cpp b/src/server.cpp index e78ec485d..1e1e67e68 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2071,14 +2071,6 @@ int hash_spin_worker() { void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); - /* end any snapshots created by fast async commands */ - for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (serverTL->rgdbSnapshot[idb] != nullptr) { - g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = nullptr; - } - } - if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -2792,6 +2784,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + /* end any snapshots created by fast async commands */ + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (serverTL->rgdbSnapshot[idb] != nullptr) { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + } + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > g_pserver->stat_peak_memory) g_pserver->stat_peak_memory = zmalloc_used; @@ -2993,6 +2993,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->trackChanges(false); + + serverTL->disable_async_commands = false; } } diff --git a/src/server.h b/src/server.h index 482be14a3..0d0b0de9d 100644 --- a/src/server.h +++ b/src/server.h @@ -1333,7 +1333,6 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::commitChanges; using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpire; - using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::restoreSnapshot; using redisDbPersistentData::removeAllCachedValues; @@ -1344,6 +1343,13 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::FRehashing; public: + const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { + auto psnapshot = redisDbPersistentData::createSnapshot(mvccCheckpoint, fOptional); + if (psnapshot != nullptr) + mvccLastSnapshot = psnapshot->mvccCheckpoint(); + return psnapshot; + } + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -2026,6 +2032,7 @@ struct redisServerThreadVars { bool fRetrySetAofEvent = false; bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ + bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ std::vector vecclientsProcess; dictAsyncRehashCtl *rehashCtl = nullptr;