From b51ece810f5f2050a434f55cd396f1d844192aec Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Nov 2019 19:47:12 -0500 Subject: [PATCH] Final design of forkless background save. expires NYI Former-commit-id: e2dc24b441bf52b181c820c853e0bc7524254f3f --- src/db.cpp | 171 +++++++++++++++++++++++++++-------------------- src/lazyfree.cpp | 2 + src/rdb.cpp | 25 +++++-- src/server.cpp | 8 --- src/server.h | 39 +++++++++-- 5 files changed, 153 insertions(+), 92 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index f8d766c3c..125b23852 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -371,6 +371,8 @@ bool redisDbPersistentData::syncDelete(robj *key) if (itr != nullptr && itr.val()->FExpires()) removeExpire(key, itr); if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) { + if (m_pdbSnapshot != nullptr) + dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr); if (g_pserver->cluster_enabled) slotToKeyDel(key); return 1; } else { @@ -636,7 +638,7 @@ bool redisDbPersistentData::iterate(std::function fn) bool fResult = true; while((de = dictNext(di)) != nullptr) { - ensure(de); + ensure((const char*)dictGetKey(de), &de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) { fResult = false; @@ -1793,6 +1795,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) { void redisDbPersistentData::initialize() { m_pdict = dictCreate(&dbDictType,this); + m_pdictTombstone = dictCreate(&dbDictType,this); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = false; m_fTrackingChanges = 0; @@ -1846,24 +1849,27 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_setexpire = new (MALLOC_LOCAL) expireset(); if (m_pstorage != nullptr) m_pstorage->clear(); + m_pdbSnapshot = nullptr; } /* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) { - redisDbPersistentData aux = *db1; + redisDbPersistentData aux = std::move(*db1); db1->m_pdict = db2->m_pdict; db1->m_fTrackingChanges = db2->m_fTrackingChanges; db1->m_fAllChanged = db2->m_fAllChanged; db1->m_setexpire = db2->m_setexpire; db1->m_pstorage = db2->m_pstorage; - db1->m_spdbSnapshot = db2->m_spdbSnapshot; + db1->m_pdbSnapshot = db2->m_pdbSnapshot; + db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER); db2->m_pdict = aux.m_pdict; db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fAllChanged = aux.m_fAllChanged; db2->m_setexpire = aux.m_setexpire; db2->m_pstorage = aux.m_pstorage; - db2->m_spdbSnapshot = aux.m_spdbSnapshot; + db2->m_pdbSnapshot = aux.m_pdbSnapshot; + db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER); db1->m_pdict->privdata = static_cast(db1); db2->m_pdict->privdata = static_cast(db2); @@ -1919,41 +1925,45 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val) void redisDbPersistentData::ensure(const char *key) { dictEntry *de = dictFind(m_pdict, key); - ensure(de); + ensure(key, &de); } -void redisDbPersistentData::ensure(dictEntry *de) +void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) { - if (de != nullptr && dictGetVal(de) == nullptr) + serverAssert(sdsKey != nullptr); + if (*pde == nullptr && m_pdbSnapshot != nullptr) { - if (m_spdbSnapshot != nullptr) + dictEntry *deTombstone = dictFind(m_pdictTombstone, sdsKey); + if (deTombstone == nullptr) { - auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de)); - serverAssert(itr != m_spdbSnapshot->end()); + auto itr = m_pdbSnapshot->find(sdsKey); + if (itr == m_pdbSnapshot->end()) + return; // not found if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { - dictSetVal(m_pdict, de, itr.val()); + dictAdd(m_pdict, sdsdup(sdsKey), itr.val()); } else { sds strT = serializeStoredObject(itr.val()); robj *objNew = deserializeStoredObject(strT, sdslen(strT)); sdsfree(strT); - dictSetVal(m_pdict, de, objNew); + dictAdd(m_pdict, sdsdup(sdsKey), objNew); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); } + *pde = dictFind(m_pdict, sdsKey); } - else - { - serverAssert(m_pstorage != nullptr); - sds key = (sds)dictGetKey(de); - m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(data, cb); - serverAssert(o != nullptr); - dictSetVal(m_pdict, de, o); - }); - } + } + else if (*pde != nullptr && dictGetVal(*pde) == nullptr) + { + serverAssert(m_pstorage != nullptr); + sds key = (sds)dictGetKey(*pde); + m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ + robj *o = deserializeStoredObject(data, cb); + serverAssert(o != nullptr); + dictSetVal(m_pdict, *pde, o); + }); } } @@ -2012,81 +2022,100 @@ void redisDbPersistentData::processChanges() m_setchanged.clear(); } -std::shared_ptr redisDbPersistentData::createSnapshot() +redisDbPersistentData *redisDbPersistentData::createSnapshot() { serverAssert(GlobalLocksAcquired()); - serverAssert(m_spdbSnapshot == nullptr); - auto spdb = std::make_shared(); - spdb->m_pdict = dictCreate(&dbDictType,spdb.get()); + serverAssert(m_spdbSnapshotHOLDER == nullptr); + auto spdb = std::make_unique(); + spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; - spdb->m_pdict->rehashidx = m_pdict->rehashidx; + spdb->m_pdict = m_pdict; spdb->m_pdict->iterators++; // fake an iterator so it doesn't rehash if (m_setexpire != nullptr) - spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); + spdb->m_setexpire = m_setexpire; - for (unsigned iht = 0; iht < 2; ++iht) - { - spdb->m_pdict->ht[iht] = m_pdict->ht[iht]; - if (m_pdict->ht[iht].size) - m_pdict->ht[iht].table = (dictEntry**)zcalloc(m_pdict->ht[iht].size*sizeof(dictEntry*), MALLOC_SHARED); - else - m_pdict->ht[iht].table = nullptr; - - for (size_t idx = 0; idx < m_pdict->ht[iht].size; ++idx) - { - const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx]; - dictEntry **pdeDst = &m_pdict->ht[iht].table[idx]; - while (deSrc != nullptr) - { - *pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED); - (*pdeDst)->key = deSrc->key; - (*pdeDst)->v.val = nullptr; - (*pdeDst)->next = nullptr; - pdeDst = &(*pdeDst)->next; - deSrc = deSrc->next; - } - } - } + m_pdict = dictCreate(&dbDictType,this); + m_setexpire = new (MALLOC_LOCAL) expireset(); - m_spdbSnapshot = std::move(spdb); - return m_spdbSnapshot; + m_spdbSnapshotHOLDER = std::move(spdb); + m_pdbSnapshot = m_spdbSnapshotHOLDER.get(); + return m_pdbSnapshot; } void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) { - serverAssert(GlobalLocksAcquired()); - serverAssert(m_spdbSnapshot.get() == psnapshot); + if (!GlobalLocksAcquired()) + serverLog(LL_WARNING, "Global locks not acquired"); + serverAssert(m_spdbSnapshotHOLDER.get() == psnapshot); + m_spdbSnapshotHOLDER->m_pdict->iterators--; - dictIterator *di = dictGetIterator(m_pdict); + if (m_pdbSnapshot == nullptr) + { + // the database was cleared so we don't need to recover the snapshot + dictEmpty(m_pdictTombstone, nullptr); + m_spdbSnapshotHOLDER = nullptr; + return; + } + + // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB + dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; while ((de = dictNext(di)) != NULL) { - dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de)); - if (dictGetVal(de) == nullptr) + dictEntry *deSnapshot = dictFind(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de)); + if (deSnapshot == nullptr) + continue; // sometimes we delete things that were never in the snapshot + + robj *obj = (robj*)dictGetVal(deSnapshot); + const char *key = (const char*)dictGetKey(deSnapshot); + if (obj == nullptr || obj->FExpires()) { - if (deSnapshot != nullptr) + auto itrExpire = m_spdbSnapshotHOLDER->m_setexpire->find(key); + if (itrExpire != m_spdbSnapshotHOLDER->m_setexpire->end()) { - de->v.val = deSnapshot->v.val; - deSnapshot->v.val = nullptr; + m_spdbSnapshotHOLDER->m_setexpire->erase(itrExpire); // Note: normally we would have to set obj::fexpire false but we're deleting it anyways... } } - if (deSnapshot && (dictGetKey(deSnapshot) == dictGetKey(de))) - { - // The key is owned by the parent snapshot, so we modify the DB key dtor - // to ensure the key is not free'd during the delete - m_spdbSnapshot->m_pdict->type = &dbSnapshotDictType; - dictDelete(m_spdbSnapshot->m_pdict, dictGetKey(de)); - m_spdbSnapshot->m_pdict->type = &dbDictType; - } + dictDelete(m_spdbSnapshotHOLDER->m_pdict, key); } dictReleaseIterator(di); - m_spdbSnapshot->m_pdict->iterators--; - m_spdbSnapshot = nullptr; + dictEmpty(m_pdictTombstone, nullptr); + + // Stage 2 Move all new keys to the snapshot DB + di = dictGetIterator(m_pdict); + while ((de = dictNext(di)) != NULL) + { + dictEntry *deExisting = dictFind(m_spdbSnapshotHOLDER->m_pdict, (const char*)dictGetKey(de)); + if (deExisting != nullptr) + { + decrRefCount((robj*)dictGetVal(deExisting)); + dictSetVal(m_spdbSnapshotHOLDER->m_pdict, deExisting, dictGetVal(de)); + } + else + { + dictAdd(m_spdbSnapshotHOLDER->m_pdict, sdsdup((sds)dictGetKey(de)), dictGetVal(de)); + } + incrRefCount((robj*)dictGetVal(de)); + } + dictReleaseIterator(di); + + // Stage 3 swap the databases with the snapshot + std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict); + + // Stage 4 merge all expires + // TODO + std::swap(m_setexpire, m_spdbSnapshotHOLDER->m_setexpire); + + // Finally free the snapshot + m_spdbSnapshotHOLDER = nullptr; + m_pdbSnapshot = nullptr; } redisDbPersistentData::~redisDbPersistentData() { dictRelease(m_pdict); + if (m_pdictTombstone) + dictRelease(m_pdictTombstone); delete m_setexpire; } \ No newline at end of file diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 58da000ee..1e2981105 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -56,6 +56,8 @@ bool redisDbPersistentData::asyncDelete(robj *key) { * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); + if (m_pdbSnapshot != nullptr) + dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr); if (de) { robj *val = (robj*)dictGetVal(de); if (val->FExpires()) diff --git a/src/rdb.cpp b/src/rdb.cpp index f3f53ebe6..12ed78627 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1348,7 +1348,10 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) return C_OK; werr: - serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); + if (g_pserver->rdbThreadVars.fRdbThreadCancel) + serverLog(LL_WARNING, "Background save cancelled"); + else + serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); return C_ERR; @@ -1378,10 +1381,13 @@ void *rdbSaveThread(void *vargs) sendChildInfo(CHILD_INFO_TYPE_RDB); } - aeAcquireLock(); + // If we were told to cancel the requesting thread holds the lock for us + if (!g_pserver->rdbThreadVars.fRdbThreadCancel) + aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); - aeReleaseLock(); + if (!g_pserver->rdbThreadVars.fRdbThreadCancel) + aeReleaseLock(); zfree(args); return (retval == C_OK) ? (void*)0 : (void*)1; } @@ -1397,7 +1403,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) args->rsi.master_repl_offset = g_pserver->master_repl_offset; for (int idb = 0; idb < cserver.dbnum; ++idb) - args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get(); + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(); g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.fRdbThreadCancel = false; @@ -2423,6 +2429,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { * the child did not exit for an error, but because we wanted), and performs * the cleanup needed. */ void killRDBChild(void) { + serverAssert(GlobalLocksAcquired()); g_pserver->rdbThreadVars.fRdbThreadCancel = true; void *rval; pthread_join(g_pserver->rdbThreadVars.rdb_child_thread,&rval); @@ -2508,10 +2515,14 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) } zfree(msg); } - aeAcquireLock(); + + // If we were told to cancel the requesting thread is holding the lock for us + if (!g_pserver->rdbThreadVars.fRdbThreadCancel) + aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); - aeReleaseLock(); + if (!g_pserver->rdbThreadVars.fRdbThreadCancel) + aeReleaseLock(); zfree(args->clientids); zfree(args); rioFreeFdset(&slave_sockets); @@ -2574,7 +2585,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { start = ustime(); for (int idb = 0; idb < cserver.dbnum; ++idb) - args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get(); + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(); g_pserver->rdbThreadVars.tmpfileNum++; g_pserver->rdbThreadVars.fRdbThreadCancel = false; diff --git a/src/server.cpp b/src/server.cpp index e6529bc6b..3c49fa30b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1183,14 +1183,6 @@ void dictSdsNOPDestructor(void *, void *) {} void dictDbKeyDestructor(void *privdata, void *key) { - redisDbPersistentData *owner = (redisDbPersistentData*)privdata; - serverAssert(owner != nullptr); - if (owner->m_spdbSnapshot != nullptr) - { - dictEntry *deSnapshot = dictFind(owner->m_spdbSnapshot->m_pdict, key); - if (deSnapshot && (key == dictGetKey(deSnapshot))) - return; // don't free, it's now owned by the snapshot - } sdsfree((sds)key); } diff --git a/src/server.h b/src/server.h index 4c7835721..d49098b51 100644 --- a/src/server.h +++ b/src/server.h @@ -1108,10 +1108,16 @@ class redisDbPersistentData public: ~redisDbPersistentData(); + redisDbPersistentData() = default; + redisDbPersistentData(redisDbPersistentData &&) = default; + static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); size_t slots() const { return dictSlots(m_pdict); } - size_t size() const { return dictSize(m_pdict); } + size_t size() const + { + return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); + } void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void trackkey(robj_roptr o) @@ -1128,7 +1134,7 @@ public: dict_iter find(const char *key) { dictEntry *de = dictFind(m_pdict, key); - ensure(de); + ensure(key, &de); return dict_iter(de); } @@ -1139,8 +1145,24 @@ public: dict_iter random() { + if (size() == 0) + return dict_iter(nullptr); + if (m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0) + { + dict_iter iter(nullptr); + double pctInSnapshot = (double)m_pdbSnapshot->size() / (size() + m_pdbSnapshot->size()); + double randval = (double)rand()/RAND_MAX; + if (randval <= pctInSnapshot) + { + iter = m_pdbSnapshot->random(); + ensure(iter.key()); + dictEntry *de = dictFind(m_pdict, iter.key()); + return dict_iter(de); + } + } dictEntry *de = dictGetRandomKey(m_pdict); - ensure(de); + if (de != nullptr) + ensure((const char*)dictGetKey(de), &de); return dict_iter(de); } @@ -1187,17 +1209,18 @@ public: expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } - std::shared_ptr createSnapshot(); + redisDbPersistentData *createSnapshot(); void endSnapshot(const redisDbPersistentData *psnapshot); private: void ensure(const char *key); - void ensure(dictEntry *de); + void ensure(const char *key, dictEntry **de); void storeDatabase(); void storeKey(const char *key, size_t cchKey, robj *o); // Keyspace dict *m_pdict = nullptr; /* The keyspace for this DB */ + dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ int m_fTrackingChanges = 0; // Note: Stack based bool m_fAllChanged = false; std::set m_setchanged; @@ -1206,7 +1229,11 @@ private: // Expire expireset *m_setexpire = nullptr; - std::shared_ptr m_spdbSnapshot; + // These two pointers are the same, UNLESS the database has been cleared. + // in which case m_pdbSnapshot is NULL and we continue as though we weren' + // in a snapshot + redisDbPersistentData *m_pdbSnapshot = nullptr; + std::unique_ptr m_spdbSnapshotHOLDER; }; /* Redis database representation. There are multiple databases identified