Prevent issue where count can be out of sync temporarily, causing crashes where we expect the count to be perfect

Former-commit-id: 77c9f36413c6f0cbb0b13a7ec746746c97faadcd
This commit is contained in:
John Sully 2020-03-24 00:21:12 -04:00
parent ae81c227fe
commit 79f48a214e
5 changed files with 27 additions and 35 deletions

View File

@ -899,8 +899,8 @@ int loadAppendOnlyFile(char *filename) {
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
auto vec = g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges();
g_pserver->db[idb]->commitChanges(vec); g_pserver->db[idb]->commitChanges();
} }
fclose(fp); fclose(fp);
freeFakeClient(fakeClient); freeFakeClient(fakeClient);

View File

@ -2257,13 +2257,12 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di); dictReleaseIterator(di);
} }
redisDbPersistentData::changelist redisDbPersistentData::processChanges() void redisDbPersistentData::processChanges()
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
--m_fTrackingChanges; --m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0); serverAssert(m_fTrackingChanges >= 0);
changelist vecRet;
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
{ {
@ -2285,23 +2284,18 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
continue; continue;
robj *o = (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate);
sdsfree(temp);
} }
} }
m_setchanged.clear(); m_setchanged.clear();
m_cnewKeysPending = 0; m_cnewKeysPending = 0;
} }
} }
return vecRet;
} }
void redisDbPersistentData::commitChanges(const changelist &vec) void redisDbPersistentData::commitChanges()
{ {
for (auto &pair : vec)
{
m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate);
}
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->endWriteBatch(); m_spstorage->endWriteBatch();
} }
@ -2379,8 +2373,8 @@ void redisDbPersistentData::removeAllCachedValues()
// First we have to flush the tracked changes // First we have to flush the tracked changes
if (m_fTrackingChanges) if (m_fTrackingChanges)
{ {
auto vec = processChanges(); processChanges();
commitChanges(vec); commitChanges();
trackChanges(false); trackChanges(false);
} }

View File

@ -2534,8 +2534,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
auto vec = g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges();
g_pserver->db[idb]->commitChanges(vec); g_pserver->db[idb]->commitChanges();
} }
return C_OK; return C_OK;

View File

@ -2339,21 +2339,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true; static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released // note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<std::pair<redisDb*, redisDbPersistentData::changelist>> vecchanges; std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (!fFirstRun) { if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb) {
{ vecdb.push_back(g_pserver->db[idb]);
auto vec = g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges();
vecchanges.emplace_back(g_pserver->db[idb], std::move(vec));
} }
} } else {
else {
fFirstRun = false; fFirstRun = false;
} }
aeReleaseLock(); aeReleaseLock();
for (auto &pair : vecchanges) for (redisDb *db : vecdb)
pair.first->commitChanges(pair.second); db->commitChanges();
handleClientsWithPendingWrites(iel); handleClientsWithPendingWrites(iel);
if (serverTL->gcEpoch != 0) if (serverTL->gcEpoch != 0)

View File

@ -1293,16 +1293,8 @@ public:
// to allow you to release the global lock before commiting. To prevent deadlocks you *must* // to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as // either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered // a second look is kept to ensure writes to secondary storage are ordered
struct changedesc void processChanges();
{ void commitChanges();
sdsimmutablestring strkey;
bool fUpdate;
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
};
typedef std::vector<std::pair<changedesc, unique_sds_ptr>> changelist;
changelist processChanges();
void commitChanges(const changelist &vec);
// This should only be used if you look at the key, we do not fixup // This should only be used if you look at the key, we do not fixup
// objects stored elsewhere // objects stored elsewhere
@ -1326,6 +1318,13 @@ protected:
uint64_t m_mvccCheckpoint = 0; uint64_t m_mvccCheckpoint = 0;
private: private:
struct changedesc
{
sdsimmutablestring strkey;
bool fUpdate;
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
};
struct changedescCmp struct changedescCmp
{ {
using is_transparent = void; // C++14 to allow comparisons with different types using is_transparent = void; // C++14 to allow comparisons with different types