diff --git a/src/db.cpp b/src/db.cpp index 38f62c7ee..f129e7c03 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2110,12 +2110,14 @@ void redisDbPersistentData::storeDatabase() void redisDbPersistentData::processChanges() { + serverAssert(GlobalLocksAcquired()); + --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); if (m_spstorage != nullptr) { - if (m_fTrackingChanges == 0) + if (m_fTrackingChanges >= 0) { if (m_fAllChanged) { @@ -2139,9 +2141,9 @@ void redisDbPersistentData::processChanges() sdsfree(sdsKey); } } + m_setchanged.clear(); } } - m_setchanged.clear(); } redisDbPersistentData::~redisDbPersistentData() @@ -2192,4 +2194,9 @@ void redisDbPersistentData::removeCachedValue(const char *key) serverAssert(de != nullptr); decrRefCount((robj*)dictGetVal(de)); dictSetVal(m_pdict, de, nullptr); +} + +void redisDbPersistentData::trackChanges() +{ + m_fTrackingChanges++; } \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 535f26caa..bf6c0d5e0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2174,8 +2174,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); - /* Handle writes with pending output buffers. */ + static thread_local bool fFirstRun = true; + if (!fFirstRun) { + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].processChanges(); + } + else { + fFirstRun = false; + } aeReleaseLock(); + + /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); if (serverTL->gcEpoch != 0) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); @@ -2204,6 +2213,19 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Check if there are clients unblocked by modules that implement * blocking commands. */ moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); + + static thread_local bool fFirstRun = true; + if (!fFirstRun) { + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].processChanges(); + } + else { + fFirstRun = false; + } + aeReleaseLock(); /* Handle writes with pending output buffers. */ @@ -2233,6 +2255,10 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverAssert(serverTL->gcEpoch == 0); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); + aeAcquireLock(); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].trackChanges(); + aeReleaseLock(); } /* =========================== Server initialization ======================== */ @@ -3750,11 +3776,7 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { - for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb].trackChanges(); call(c,callFlags); - for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb].processChanges(); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) handleClientsBlockedOnKeys(); diff --git a/src/server.h b/src/server.h index fb7de567b..96e6e0c1f 100644 --- a/src/server.h +++ b/src/server.h @@ -1269,7 +1269,7 @@ public: void setStorageProvider(IStorage *pstorage); - void trackChanges() { m_fTrackingChanges++; } + void trackChanges(); void processChanges(); // This should only be used if you look at the key, we do not fixup