diff --git a/src/db.cpp b/src/db.cpp index b3d355b1a..6a23848ff 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -219,12 +219,16 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) { AeLocker locker; locker.arm(serverTL->current_client); - if (serverTL->rgdbSnapshot[idb] != nullptr) + if (serverTL->rgdbSnapshot[idb] != nullptr) { db->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + serverTL->rgdbSnapshot[idb] = nullptr; + } else { + serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true); + } if (serverTL->rgdbSnapshot[idb] == nullptr) { // We still need to service the read o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE); + serverTL->disable_async_commands = true; // don't try this again } } if (serverTL->rgdbSnapshot[idb] != nullptr) { @@ -3159,6 +3163,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command if (command.argc >= 2) { const char *cmd = szFromObj(command.argv[0]); if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { + if (c->db->m_spdbSnapshotHOLDER != nullptr) + return; // this is dangerous enough without a snapshot around auto h = dictSdsHash(szFromObj(command.argv[1])); for (int iht = 0; iht < 2; ++iht) { auto hT = h & c->db->m_pdict->ht[iht].sizemask; diff --git a/src/networking.cpp b/src/networking.cpp index 7ec72e60c..6b40d24f0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -200,7 +200,7 @@ client *createClient(connection *conn, int iel) { c->paused_list_node = NULL; c->client_tracking_redirection = 0; c->casyncOpsPending = 0; - c->mvccCheckpoint = 0; + c->mvccCheckpoint = getMvccTstamp(); c->master_error = 0; memset(c->uuid, 0, UUID_BINARY_LEN); @@ -2712,8 +2712,17 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || g_fTestMode)) - processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { + // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often + // so we exclude them unless the snapshot we need already exists + bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint; + bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2); + + // The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it + if (!fWriteTooRecent || fSnapshotExists) { + processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); + } + } if (!c->vecqueuedcmd.empty()) serverTL->vecclientsProcess.push_back(c); } else { diff --git a/src/server.cpp b/src/server.cpp index 3abbec816..3de23c0bb 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2070,6 +2070,7 @@ int hash_spin_worker() { * rehashing. */ void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); + if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -2783,6 +2784,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + /* end any snapshots created by fast async commands */ for (int idb = 0; idb < cserver.dbnum; ++idb) { if (serverTL->rgdbSnapshot[idb] != nullptr) { g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); @@ -2991,6 +2993,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->trackChanges(false); + + serverTL->disable_async_commands = false; } } @@ -4402,7 +4406,10 @@ void call(client *c, int flags) { serverTL->commandsExecuted++; const long duration = elapsedUs(call_timer); c->duration = duration; - dirty = g_pserver->dirty-dirty; + if (flags & CMD_CALL_ASYNC) + dirty = 0; // dirty is bogus in this case as there's no synchronization + else + dirty = g_pserver->dirty-dirty; if (dirty < 0) dirty = 0; if (dirty) diff --git a/src/server.h b/src/server.h index 421aed87f..f83756d95 100644 --- a/src/server.h +++ b/src/server.h @@ -1263,6 +1263,8 @@ public: // These need to be fixed using redisDbPersistentData::size; using redisDbPersistentData::expireSize; + + static const uint64_t msStaleThreshold = 500; }; /* Redis database representation. There are multiple databases identified @@ -1331,7 +1333,6 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::commitChanges; using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpire; - using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::restoreSnapshot; using redisDbPersistentData::removeAllCachedValues; @@ -1342,6 +1343,13 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::FRehashing; public: + const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { + auto psnapshot = redisDbPersistentData::createSnapshot(mvccCheckpoint, fOptional); + if (psnapshot != nullptr) + mvccLastSnapshot = psnapshot->mvccCheckpoint(); + return psnapshot; + } + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -1350,6 +1358,7 @@ public: long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ + uint64_t mvccLastSnapshot = 0; }; /* Declare database backup that include redis main DBs and slots to keys map. @@ -2024,6 +2033,7 @@ struct redisServerThreadVars { bool fRetrySetAofEvent = false; bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ + bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ std::vector vecclientsProcess; dictAsyncRehashCtl *rehashCtl = nullptr; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 795df287d..a13f5d9f5 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -654,9 +654,7 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const bool redisDbPersistentDataSnapshot::FStale() const { - // 0.5 seconds considered stale; - static const uint64_t msStale = 500; - return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale; + return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= redisDbPersistentDataSnapshot::msStaleThreshold; } void dictGCAsyncFree(dictAsyncRehashCtl *async) {