diff --git a/src/networking.cpp b/src/networking.cpp index b1ee4a9a8..893b4a5c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2712,7 +2712,7 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || g_fTestMode)) + if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); if (!c->vecqueuedcmd.empty()) serverTL->vecclientsProcess.push_back(c); diff --git a/src/server.cpp b/src/server.cpp index eb50c18f5..ddc6af2e9 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2070,6 +2070,15 @@ int hash_spin_worker() { * rehashing. */ void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); + + /* end any snapshots created by fast async commands */ + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (serverTL->rgdbSnapshot[idb] != nullptr) { + g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); + serverTL->rgdbSnapshot[idb] = nullptr; + } + } + if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -2783,13 +2792,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); - for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (serverTL->rgdbSnapshot[idb] != nullptr) { - g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]); - serverTL->rgdbSnapshot[idb] = nullptr; - } - } - size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > g_pserver->stat_peak_memory) g_pserver->stat_peak_memory = zmalloc_used;