Optimize remote storage by bulk saving keys after processing operations but before sending responses to clients

Former-commit-id: 63a6dc2a29680d5d3f3f245168ca7de5d6bee1eb
This commit is contained in:
John Sully 2019-12-08 10:56:05 -05:00
parent 5f481a206d
commit dcebb3ab11
3 changed files with 37 additions and 8 deletions

View File

@ -2110,12 +2110,14 @@ void redisDbPersistentData::storeDatabase()
void redisDbPersistentData::processChanges() void redisDbPersistentData::processChanges()
{ {
serverAssert(GlobalLocksAcquired());
--m_fTrackingChanges; --m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0); serverAssert(m_fTrackingChanges >= 0);
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
{ {
if (m_fTrackingChanges == 0) if (m_fTrackingChanges >= 0)
{ {
if (m_fAllChanged) if (m_fAllChanged)
{ {
@ -2139,9 +2141,9 @@ void redisDbPersistentData::processChanges()
sdsfree(sdsKey); sdsfree(sdsKey);
} }
} }
m_setchanged.clear();
} }
} }
m_setchanged.clear();
} }
redisDbPersistentData::~redisDbPersistentData() redisDbPersistentData::~redisDbPersistentData()
@ -2192,4 +2194,9 @@ void redisDbPersistentData::removeCachedValue(const char *key)
serverAssert(de != nullptr); serverAssert(de != nullptr);
decrRefCount((robj*)dictGetVal(de)); decrRefCount((robj*)dictGetVal(de));
dictSetVal(m_pdict, de, nullptr); dictSetVal(m_pdict, de, nullptr);
}
void redisDbPersistentData::trackChanges()
{
m_fTrackingChanges++;
} }

View File

@ -2174,8 +2174,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Write the AOF buffer on disk */ /* Write the AOF buffer on disk */
flushAppendOnlyFile(0); flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */ static thread_local bool fFirstRun = true;
if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
}
else {
fFirstRun = false;
}
aeReleaseLock(); aeReleaseLock();
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
if (serverTL->gcEpoch != 0) if (serverTL->gcEpoch != 0)
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
@ -2204,6 +2213,19 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
/* Check if there are clients unblocked by modules that implement /* Check if there are clients unblocked by modules that implement
* blocking commands. */ * blocking commands. */
moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
static thread_local bool fFirstRun = true;
if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
}
else {
fFirstRun = false;
}
aeReleaseLock(); aeReleaseLock();
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
@ -2233,6 +2255,10 @@ void afterSleep(struct aeEventLoop *eventLoop) {
serverAssert(serverTL->gcEpoch == 0); serverAssert(serverTL->gcEpoch == 0);
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].trackChanges();
aeReleaseLock();
} }
/* =========================== Server initialization ======================== */ /* =========================== Server initialization ======================== */
@ -3750,11 +3776,7 @@ int processCommand(client *c, int callFlags) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].trackChanges();
call(c,callFlags); call(c,callFlags);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
if (listLength(g_pserver->ready_keys)) if (listLength(g_pserver->ready_keys))
handleClientsBlockedOnKeys(); handleClientsBlockedOnKeys();

View File

@ -1269,7 +1269,7 @@ public:
void setStorageProvider(IStorage *pstorage); void setStorageProvider(IStorage *pstorage);
void trackChanges() { m_fTrackingChanges++; } void trackChanges();
void processChanges(); void processChanges();
// This should only be used if you look at the key, we do not fixup // This should only be used if you look at the key, we do not fixup