From 175b266c34023a2160658c31c5136eaac5a9c352 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Fri, 26 Feb 2021 20:17:34 +0000 Subject: [PATCH 01/12] time thread priority + nanosleep Former-commit-id: c84f296edabc001a1836ab8437f746dcff811148 --- src/server.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 1bc9ab175..b9adea1b5 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6068,9 +6068,12 @@ void OnTerminate() } void *timeThreadMain(void*) { + timespec delay; + delay.tv_sec = 0; + delay.tv_nsec = 100; while (true) { updateCachedTime(); - usleep(1); + clock_nanosleep(CLOCK_REALTIME, 0, &delay, NULL); } } @@ -6422,6 +6425,9 @@ int main(int argc, char **argv) { serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr); + struct sched_param time_thread_priority; + time_thread_priority.sched_priority = sched_get_priority_max(SCHED_FIFO); + pthread_setschedparam(cserver.time_thread_id, SCHED_FIFO, &time_thread_priority); pthread_attr_t tattr; pthread_attr_init(&tattr); From 6065f276c4437036ce7455cbb2177fdce1f2c450 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 2 Mar 2021 01:38:12 +0000 Subject: [PATCH 02/12] Key prefetch error handling instead of crashign Former-commit-id: b322a14efb1b897134b229c1726b45264b57783f --- src/db.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 20e8ac014..9d05e8082 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2970,6 +2970,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command lock.arm(c); getKeysResult result = GETKEYS_RESULT_INIT; auto cmd = lookupCommand(szFromObj(command.argv[0])); + if (cmd == nullptr) + return; // Bad command? It's not for us to judge, just bail int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); for (int ikey = 0; ikey < numkeys; ++ikey) { @@ -2994,10 +2996,14 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command serverAssert(o != nullptr); }, &sharedKey); - if (sharedKey == nullptr) - sharedKey = sdsdupshared(szFromObj(objKey)); + if (o != nullptr) { + if (sharedKey == nullptr) + sharedKey = sdsdupshared(szFromObj(objKey)); - vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); + vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); + } else if (sharedKey != nullptr) { + sdsfree(sharedKey); + } } lock.arm(c); From 76698beeafc45684462e056e86a556b2bcbb3f78 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 2 Mar 2021 04:16:20 +0000 Subject: [PATCH 03/12] Drastically improve perf when loading an RDB with a storage provider Former-commit-id: 0133b42d54676e8fac2c5cb006cc87988dced268 --- src/db.cpp | 77 ++++++++++++++++++++++++++++++++++++++-------------- src/server.h | 12 ++------ 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 9d05e8082..66c19552b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -49,10 +49,25 @@ struct dbBackup { int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); +void changedescDtor(void *privdata, void *obj); std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); +dictType dictChangeDescType { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + nullptr, /* key destructor */ + changedescDtor /* val destructor */ +}; + +void changedescDtor(void *, void *obj) { + redisDbPersistentData::changedesc *desc = (redisDbPersistentData::changedesc*)obj; + delete desc; +} + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -419,12 +434,13 @@ bool redisDbPersistentData::syncDelete(robj *key) fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; if (fDeleted) { - auto itrChange = m_setchanged.find(szFromObj(key)); - if (itrChange != m_setchanged.end()) + dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key)); + if (de != nullptr) { - if (!itrChange->fUpdate) + changedesc *desc = (changedesc*)dictGetVal(de); + if (!desc->fUpdate) --m_cnewKeysPending; - m_setchanged.erase(itrChange); + dictFreeUnlinkedEntry(m_dictChanged, de); } if (m_pdbSnapshot != nullptr) @@ -2505,7 +2521,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) dictEmpty(m_pdict,callback); if (m_fTrackingChanges) { - m_setchanged.clear(); + dictEmpty(m_dictChanged, nullptr); m_cnewKeysPending = 0; m_fAllChanged++; } @@ -2708,17 +2724,20 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) if (m_spstorage != nullptr) { - if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0) + if (!m_fAllChanged && dictSize(m_dictChanged) == 0 && m_cnewKeysPending == 0) return false; m_spstorage->beginWriteBatch(); serverAssert(m_pdbSnapshotStorageFlush == nullptr); - if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) + if (fSnapshot && !m_fAllChanged && dictSize(m_dictChanged) > 100) { // Do a snapshot based process if possible m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */); if (m_pdbSnapshotStorageFlush) { - m_setchangedStorageFlush = std::move(m_setchanged); + if (m_dictChangedStorageFlush) + dictRelease(m_dictChangedStorageFlush); + m_dictChangedStorageFlush = m_dictChanged; + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); } } @@ -2732,13 +2751,17 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) } else { - for (auto &change : m_setchanged) + dictIterator *di = dictGetIterator(m_dictChanged); + dictEntry *de; + while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), this, change); + changedesc *change = (changedesc*)dictGetVal(de); + serializeAndStoreChange(m_spstorage.get(), this, *change); } + dictReleaseIterator(di); } } - m_setchanged.clear(); + dictEmpty(m_dictChanged, nullptr); m_cnewKeysPending = 0; } return (m_spstorage != nullptr); @@ -2748,12 +2771,16 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** { if (m_pdbSnapshotStorageFlush) { - - for (auto &change : m_setchangedStorageFlush) + dictIterator *di = dictGetIterator(m_dictChangedStorageFlush); + dictEntry *de; + while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change); + changedesc *change = (changedesc*)dictGetVal(de); + serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, *change); } - m_setchangedStorageFlush.clear(); + dictReleaseIterator(di); + dictRelease(m_dictChangedStorageFlush); + m_dictChangedStorageFlush = nullptr; *psnapshotFree = m_pdbSnapshotStorageFlush; m_pdbSnapshotStorageFlush = nullptr; } @@ -2773,6 +2800,12 @@ redisDbPersistentData::~redisDbPersistentData() dictRelease(m_pdict); if (m_pdictTombstone) dictRelease(m_pdictTombstone); + + if (m_dictChanged) + dictRelease(m_dictChanged); + if (m_dictChangedStorageFlush) + dictRelease(m_dictChangedStorageFlush); + delete m_setexpire; } @@ -2815,8 +2848,8 @@ bool redisDbPersistentData::removeCachedValue(const char *key) if (m_spstorage != nullptr) m_spstorage->batch_lock(); - auto itr = m_setchanged.find(key); - if (itr != m_setchanged.end()) + dictEntry *de = dictFind(m_dictChanged, key); + if (de != nullptr) { if (m_spstorage != nullptr) m_spstorage->batch_unlock(); @@ -2837,6 +2870,9 @@ void redisDbPersistentData::trackChanges(bool fBulk) m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); + + if (m_dictChanged == nullptr) + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); } void redisDbPersistentData::removeAllCachedValues() @@ -2855,9 +2891,10 @@ void redisDbPersistentData::removeAllCachedValues() void redisDbPersistentData::trackkey(const char *key, bool fUpdate) { if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { - auto itr = m_setchanged.find(key); - if (itr == m_setchanged.end()) { - m_setchanged.emplace(sdsdupshared(key), fUpdate); + dictEntry *de = dictFind(m_dictChanged, key); + if (de == nullptr) { + changedesc *desc = new changedesc(sdsdupshared(key), fUpdate); + dictAdd(m_dictChanged, (void*)desc->strkey.get(), desc); if (!fUpdate) ++m_cnewKeysPending; } diff --git a/src/server.h b/src/server.h index 8e29ac196..2c962cb60 100644 --- a/src/server.h +++ b/src/server.h @@ -1042,6 +1042,7 @@ class redisDbPersistentDataSnapshot; class redisDbPersistentData { friend void dictDbKeyDestructor(void *privdata, void *key); + friend void changedescDtor(void*, void*); friend class redisDbPersistentDataSnapshot; public: @@ -1153,13 +1154,6 @@ private: changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} }; - struct changedescCmp - { - using is_transparent = void; // C++14 to allow comparisons with different types - bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; } - bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); } - bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; } - }; static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change); @@ -1174,7 +1168,7 @@ private: dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ std::atomic m_fTrackingChanges {0}; // Note: Stack based std::atomic m_fAllChanged {0}; - std::set m_setchanged; + dict *m_dictChanged = nullptr; size_t m_cnewKeysPending = 0; std::shared_ptr m_spstorage = nullptr; @@ -1189,7 +1183,7 @@ private: const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr; - std::set m_setchangedStorageFlush; + dict *m_dictChangedStorageFlush = nullptr; int m_refCount = 0; }; From 4066ce8f3ae0161d2d5d6465379fe28ef95747c2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 3 Mar 2021 07:05:12 +0000 Subject: [PATCH 04/12] Don't let dictionaries shrink too rapidly. It can cause massive perf issues while in the shrink rehash Former-commit-id: a7ad346e4f03c85d22a29c8268d35471e86283aa --- src/dict.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 0bf7e6c36..1d0a0e266 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -1267,6 +1267,7 @@ unsigned long dictScan(dict *d, /* Expand the hash table if needed */ static int _dictExpandIfNeeded(dict *d) { + static const size_t SHRINK_FACTOR = 4; /* Incremental rehashing already in progress. Return. */ if (dictIsRehashing(d)) return DICT_OK; @@ -1283,10 +1284,10 @@ static int _dictExpandIfNeeded(dict *d) { return dictExpand(d, d->ht[0].used*2, false /*fShrink*/); } - else if (d->ht[0].used > 0 && d->ht[0].used * 16 < d->ht[0].size && dict_can_resize) + else if (d->ht[0].used > 0 && d->ht[0].size >= (1024*SHRINK_FACTOR) && (d->ht[0].used * 16) < d->ht[0].size && dict_can_resize) { // If the dictionary has shurnk a lot we'll need to shrink the hash table instead - return dictExpand(d, d->ht[0].used*2, true /*fShrink*/); + return dictExpand(d, d->ht[0].size/SHRINK_FACTOR, true /*fShrink*/); } return DICT_OK; } From ea6ba8370d685adc1cdbb7371a1a8be495c1aa32 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 3 Mar 2021 07:05:51 +0000 Subject: [PATCH 05/12] In low load async rehash may not complete in time, do it in the cron Former-commit-id: 0a27d30753d887b6251e645abe26118068c55587 --- src/server.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/server.cpp b/src/server.cpp index 1bc9ab175..b1124cac9 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2157,6 +2157,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); + if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { + aeReleaseLock(); + // If there is not enough lock contention we may not have made enough progress on the async + // rehash. Ensure we finish it outside the lock. + dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); + aeAcquireLock(); + } + /* If another threads unblocked one of our clients, and this thread has been idle then beforeSleep won't have a chance to process the unblocking. So we also process them here in the cron job to ensure they don't starve. @@ -2448,6 +2456,14 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(id); UNUSED(clientData); + if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { + aeReleaseLock(); + // If there is not enough lock contention we may not have made enough progress on the async + // rehash. Ensure we finish it outside the lock. + dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); + aeAcquireLock(); + } + int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); From 08b9f21d71dc533d39a00d6f26cb7391cbd2ee72 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 3 Mar 2021 07:06:19 +0000 Subject: [PATCH 06/12] Data loss issue due to async rehash interfering with snapshots Former-commit-id: 5245ca9a67c74b34139cafc9754543d1b8bed90a --- src/snapshot.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index fb9b294d2..5d479fb7f 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -93,7 +93,17 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); - dictRehashMilliseconds(m_pdict, 50); // Give us the best chance at a fast cleanup + // We can't have async rehash modifying these. Setting the asyncdata list to null + // will cause us to throw away the async work rather than modify the tables in flight + if (m_pdict->asyncdata != nullptr) { + m_pdict->asyncdata = nullptr; + m_pdict->rehashidx = 0; + } + if (m_pdictTombstone->asyncdata != nullptr) { + m_pdictTombstone->rehashidx = 0; + m_pdictTombstone->asyncdata = nullptr; + } + spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; spdb->m_pdict = m_pdict; From 50ce24a10c0ac780b833c957f976b90873ba71cc Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 3 Mar 2021 22:12:51 +0000 Subject: [PATCH 07/12] Fix compile warnings Former-commit-id: c314cab888e9c8b3e825b2dfe0c0392ee998bdc4 --- src/db.cpp | 1 + src/dict.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 66c19552b..8b6b24d31 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -620,6 +620,7 @@ const dbBackup *backupDb(void) { /* Discard a previously created backup, this can be slow (similar to FLUSHALL) * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ void discardDbBackup(const dbBackup *backup, int flags, void(callback)(void*)) { + UNUSED(callback); int async = (flags & EMPTYDB_ASYNC); /* Release main DBs backup . */ diff --git a/src/dict.cpp b/src/dict.cpp index 1d0a0e266..7a05ed062 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -378,7 +378,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { int empty_visits = buckets * 10; - while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) { + while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) { dictEntry *de; /* Note that rehashidx can't overflow as we are sure there are more @@ -386,7 +386,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) goto LDone; - if (d->rehashidx >= d->ht[0].size) goto LDone; + if ((size_t)d->rehashidx >= d->ht[0].size) goto LDone; } de = d->ht[0].table[d->rehashidx]; @@ -666,7 +666,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { else d->ht[table].table[idx] = he->next; if (!nofree) { - if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) { + if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) { he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he->next; } else { @@ -746,7 +746,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; - if (d->asyncdata && i < d->rehashidx) { + if (d->asyncdata && (ssize_t)i < d->rehashidx) { he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { From 902264efb7cd08c0959a0adcbe9d47522528c3a7 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 4 Mar 2021 07:41:06 +0000 Subject: [PATCH 08/12] Load perf fixes with a storage provider set Former-commit-id: 861b19de00c75c9167cc25031292284ad1c21893 --- src/StorageCache.cpp | 83 ++++++++++++++++++++++++++++++++++---------- src/StorageCache.h | 14 ++++---- src/db.cpp | 76 +++++++++++++++++++--------------------- src/rdb.cpp | 37 ++++++++++++++++---- src/server.h | 13 ++----- 5 files changed, 140 insertions(+), 83 deletions(-) diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 23b9af90a..64ae0051e 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -1,36 +1,80 @@ #include "server.h" +uint64_t hashPassthrough(const void *hash) { + return static_cast(reinterpret_cast(hash)); +} + +int hashCompare(void *, const void *key1, const void *key2) { + auto diff = (reinterpret_cast(key1) - reinterpret_cast(key2)); + return !diff; +} + +dictType dbStorageCacheType = { + hashPassthrough, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + hashCompare, /* key compare */ + NULL, /* key destructor */ + NULL /* val destructor */ +}; + +StorageCache::StorageCache(IStorage *storage) + : m_spstorage(storage) +{ + m_pdict = dictCreate(&dbStorageCacheType, nullptr); +} + void StorageCache::clear() { std::unique_lock ul(m_lock); - if (m_setkeys != nullptr) - m_setkeys->clear(); + if (m_pdict != nullptr) + dictEmpty(m_pdict, nullptr); m_spstorage->clear(); + m_collisionCount = 0; } void StorageCache::cacheKey(sds key) { - if (m_setkeys == nullptr) + if (m_pdict == nullptr) return; - m_setkeys->insert(sdsimmutablestring(sdsdupshared(key))); + uintptr_t hash = dictSdsHash(key); + if (dictAdd(m_pdict, reinterpret_cast(hash), (void*)1) != DICT_OK) { + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + serverAssert(de != nullptr); + de->v.s64++; + m_collisionCount++; + } } void StorageCache::cacheKey(const char *rgch, size_t cch) { - if (m_setkeys == nullptr) + if (m_pdict == nullptr) return; - m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch))); + uintptr_t hash = dictGenHashFunction(rgch, (int)cch); + if (dictAdd(m_pdict, reinterpret_cast(hash), (void*)1) != DICT_OK) { + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + serverAssert(de != nullptr); + de->v.s64++; + m_collisionCount++; + } } bool StorageCache::erase(sds key) { bool result = m_spstorage->erase(key, sdslen(key)); std::unique_lock ul(m_lock); - if (result && m_setkeys != nullptr) + if (result && m_pdict != nullptr) { - auto itr = m_setkeys->find(sdsview(key)); - serverAssert(itr != m_setkeys->end()); - m_setkeys->erase(itr); + uint64_t hash = dictSdsHash(key); + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + serverAssert(de != nullptr); + de->v.s64--; + serverAssert(de->v.s64 >= 0); + if (de->v.s64 == 0) { + dictDelete(m_pdict, reinterpret_cast(hash)); + } else { + m_collisionCount--; + } } return result; } @@ -38,7 +82,7 @@ bool StorageCache::erase(sds key) void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) { std::unique_lock ul(m_lock); - if (!fOverwrite && m_setkeys != nullptr) + if (!fOverwrite && m_pdict != nullptr) { cacheKey(key); } @@ -54,16 +98,16 @@ const StorageCache *StorageCache::clone() return cacheNew; } -void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const +void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const { std::unique_lock ul(m_lock); - if (m_setkeys != nullptr) + if (m_pdict != nullptr) { - auto itr = m_setkeys->find(sdsview(key)); - if (itr == m_setkeys->end()) + uint64_t hash = dictSdsHash(key); + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + + if (de == nullptr) return; // Not found - if (cachedKey != nullptr) - *cachedKey = sdsdupshared(itr->get()); } ul.unlock(); m_spstorage->retrieve(key, sdslen(key), fn); @@ -73,8 +117,9 @@ size_t StorageCache::count() const { std::unique_lock ul(m_lock); size_t count = m_spstorage->count(); - if (m_setkeys != nullptr) - serverAssert(count == m_setkeys->size()); + if (m_pdict != nullptr) { + serverAssert(count == (dictSize(m_pdict) + m_collisionCount)); + } return count; } diff --git a/src/StorageCache.h b/src/StorageCache.h index 4fa3c3a08..33492cab1 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -4,12 +4,11 @@ class StorageCache { std::shared_ptr m_spstorage; - std::unique_ptr> m_setkeys; + dict *m_pdict = nullptr; + int m_collisionCount = 0; mutable fastlock m_lock {"StorageCache"}; - StorageCache(IStorage *storage) - : m_spstorage(storage) - {} + StorageCache(IStorage *storage); void cacheKey(sds key); void cacheKey(const char *rgchKey, size_t cchKey); @@ -31,9 +30,10 @@ class StorageCache public: static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { StorageCache *cache = new StorageCache(nullptr); - if (pfactory->FSlow()) + if (!pfactory->FSlow()) { - cache->m_setkeys = std::make_unique>(20); + dictRelease(cache->m_pdict); + cache->m_pdict = nullptr; } load_iter_data data = {cache, fn, privdata}; cache->m_spstorage = std::shared_ptr(pfactory->create(db, key_load_itr, (void*)&data)); @@ -42,7 +42,7 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const; + void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } diff --git a/src/db.cpp b/src/db.cpp index 8b6b24d31..b60462c09 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -49,7 +49,6 @@ struct dbBackup { int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); -void changedescDtor(void *privdata, void *obj); std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); @@ -59,15 +58,10 @@ dictType dictChangeDescType { NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - nullptr, /* key destructor */ - changedescDtor /* val destructor */ + dictSdsDestructor, /* key destructor */ + nullptr /* val destructor */ }; -void changedescDtor(void *, void *obj) { - redisDbPersistentData::changedesc *desc = (redisDbPersistentData::changedesc*)obj; - delete desc; -} - /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -437,8 +431,8 @@ bool redisDbPersistentData::syncDelete(robj *key) dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key)); if (de != nullptr) { - changedesc *desc = (changedesc*)dictGetVal(de); - if (!desc->fUpdate) + bool fUpdate = (bool)dictGetVal(de); + if (!fUpdate) --m_cnewKeysPending; dictFreeUnlinkedEntry(m_dictChanged, de); } @@ -2641,20 +2635,18 @@ LNotFound: { if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database { - sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available robj *o = nullptr; + sds sdsNewKey = sdsdupshared(sdsKey); std::unique_ptr spexpire; m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ size_t offset = 0; - spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); - o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); + spexpire = deserializeExpire(sdsNewKey, (const char*)data, cb, &offset); + o = deserializeStoredObject(this, sdsNewKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); - }, &sdsNewKey); + }); if (o != nullptr) { - if (sdsNewKey == nullptr) - sdsNewKey = sdsdupshared(sdsKey); dictAdd(m_pdict, sdsNewKey, o); o->SetFExpires(spexpire != nullptr); @@ -2667,11 +2659,8 @@ LNotFound: serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); } serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); - } - else - { - if (sdsNewKey != nullptr) - sdsfree(sdsNewKey); // BUG but don't bother crashing + } else { + sdsfree(sdsNewKey); } *pde = dictFind(m_pdict, sdsKey); @@ -2705,14 +2694,14 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change) +/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate) { - auto itr = db->find_cached_threadsafe(change.strkey.get()); + auto itr = db->find_cached_threadsafe(key); if (itr == nullptr) return; robj *o = itr.val(); sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); - storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate); + storage->insert((sds)key, temp, sdslen(temp), fUpdate); sdsfree(temp); } @@ -2756,8 +2745,7 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) dictEntry *de; while ((de = dictNext(di)) != nullptr) { - changedesc *change = (changedesc*)dictGetVal(de); - serializeAndStoreChange(m_spstorage.get(), this, *change); + serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de)); } dictReleaseIterator(di); } @@ -2776,8 +2764,7 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** dictEntry *de; while ((de = dictNext(di)) != nullptr) { - changedesc *change = (changedesc*)dictGetVal(de); - serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, *change); + serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de)); } dictReleaseIterator(di); dictRelease(m_dictChangedStorageFlush); @@ -2866,14 +2853,18 @@ bool redisDbPersistentData::removeCachedValue(const char *key) return true; } -void redisDbPersistentData::trackChanges(bool fBulk) +void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint) { m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); - if (m_dictChanged == nullptr) + if (m_dictChanged == nullptr) { m_dictChanged = dictCreate(&dictChangeDescType, nullptr); + } + + if (sizeHint > 0) + dictExpand(m_dictChanged, sizeHint, false); } void redisDbPersistentData::removeAllCachedValues() @@ -2886,7 +2877,16 @@ void redisDbPersistentData::removeAllCachedValues() trackChanges(false); } - dictEmpty(m_pdict, nullptr); + if (m_pdict->iterators == 0) { + dict *dT = m_pdict; + m_pdict = dictCreate(&dbDictType, this); + dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash + g_pserver->asyncworkqueue->AddWorkFunction([dT]{ + dictRelease(dT); + }, true); + } else { + dictEmpty(m_pdict, nullptr); + } } void redisDbPersistentData::trackkey(const char *key, bool fUpdate) @@ -2894,8 +2894,7 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate) if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { dictEntry *de = dictFind(m_dictChanged, key); if (de == nullptr) { - changedesc *desc = new changedesc(sdsdupshared(key), fUpdate); - dictAdd(m_dictChanged, (void*)desc->strkey.get(), desc); + dictAdd(m_dictChanged, (void*)sdsdupshared(key), (void*)fUpdate); if (!fUpdate) ++m_cnewKeysPending; } @@ -3024,20 +3023,17 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command std::vector>> vecInserts; for (robj *objKey : veckeys) { - sds sharedKey = nullptr; + sds sharedKey = sdsdupshared((sds)szFromObj(objKey)); std::unique_ptr spexpire; robj *o = nullptr; m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ size_t offset = 0; - spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset); - o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast(data) + offset, cb - offset); + spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset); + o = deserializeStoredObject(this, sharedKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); - }, &sharedKey); + }); if (o != nullptr) { - if (sharedKey == nullptr) - sharedKey = sdsdupshared(szFromObj(objKey)); - vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); } else if (sharedKey != nullptr) { sdsfree(sharedKey); diff --git a/src/rdb.cpp b/src/rdb.cpp index aefc730c0..3d4b9f345 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2364,6 +2364,21 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } +class EvictionPolicyCleanup +{ + int oldpolicy; + +public: + EvictionPolicyCleanup() { + oldpolicy = g_pserver->maxmemory_policy; + g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM; + } + + ~EvictionPolicyCleanup() { + g_pserver->maxmemory_policy = oldpolicy; + } +}; + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { @@ -2380,9 +2395,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { sds key = nullptr; bool fLastKeyExpired = false; + // If we're running flash we may evict during load. We want a fast eviction function + // because there isn't any difference in use times between keys anyways + EvictionPolicyCleanup ecleanup; + for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->trackChanges(true); + g_pserver->db[idb]->trackChanges(true, 1024); } rdb->update_cksum = rdbLoadProgressCallback; @@ -2403,7 +2422,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { now = mstime(); lru_clock = LRU_CLOCK(); - + while(1) { robj *val; @@ -2645,17 +2664,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } else { /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, do this every 16 keys to limit the perf impact */ - if (g_pserver->m_pstorageFactory && (ckeysLoaded % 16) == 0) + if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0) { - if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK || (ckeysLoaded % (1024)) == 0) + bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK); + if (fHighMemory || (ckeysLoaded % (1024)) == 0) { for (int idb = 0; idb < cserver.dbnum; ++idb) { if (g_pserver->db[idb]->processChanges(false)) g_pserver->db[idb]->commitChanges(); - g_pserver->db[idb]->trackChanges(false); + if (fHighMemory && !(rsi && rsi->fForceSetKey)) { + g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) + fHighMemory = false; // we took care of it + } + g_pserver->db[idb]->trackChanges(false, 1024); } - freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); + if (fHighMemory) + freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); } } diff --git a/src/server.h b/src/server.h index 2c962cb60..42a32ecde 100644 --- a/src/server.h +++ b/src/server.h @@ -1042,7 +1042,6 @@ class redisDbPersistentDataSnapshot; class redisDbPersistentData { friend void dictDbKeyDestructor(void *privdata, void *key); - friend void changedescDtor(void*, void*); friend class redisDbPersistentDataSnapshot; public: @@ -1110,7 +1109,7 @@ public: void setStorageProvider(StorageCache *pstorage); - void trackChanges(bool fBulk); + void trackChanges(bool fBulk, size_t sizeHint = 0); // Process and commit changes for secondary storage. Note that process and commit are seperated // to allow you to release the global lock before commiting. To prevent deadlocks you *must* @@ -1147,15 +1146,7 @@ protected: uint64_t m_mvccCheckpoint = 0; private: - struct changedesc - { - sdsimmutablestring strkey; - bool fUpdate; - - changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} - }; - - static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change); + static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate); void ensure(const char *key); void ensure(const char *key, dictEntry **de); From 50060b4a139164621cc111298f4562a285d7eb6f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 5 Mar 2021 00:54:11 +0000 Subject: [PATCH 09/12] Fix crash saving an RDB Former-commit-id: 51c35f03a84f7ada0f150a1c3992df574ab89b95 --- src/StorageCache.cpp | 7 ++++--- src/StorageCache.h | 9 ++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 64ae0051e..7f86b595e 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -18,10 +18,11 @@ dictType dbStorageCacheType = { NULL /* val destructor */ }; -StorageCache::StorageCache(IStorage *storage) +StorageCache::StorageCache(IStorage *storage, bool fCache) : m_spstorage(storage) { - m_pdict = dictCreate(&dbStorageCacheType, nullptr); + if (fCache) + m_pdict = dictCreate(&dbStorageCacheType, nullptr); } void StorageCache::clear() @@ -94,7 +95,7 @@ const StorageCache *StorageCache::clone() { std::unique_lock ul(m_lock); // Clones never clone the cache - StorageCache *cacheNew = new StorageCache(const_cast(m_spstorage->clone())); + StorageCache *cacheNew = new StorageCache(const_cast(m_spstorage->clone()), false /*fCache*/); return cacheNew; } diff --git a/src/StorageCache.h b/src/StorageCache.h index 33492cab1..c2170b7d0 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -8,7 +8,7 @@ class StorageCache int m_collisionCount = 0; mutable fastlock m_lock {"StorageCache"}; - StorageCache(IStorage *storage); + StorageCache(IStorage *storage, bool fNoCache); void cacheKey(sds key); void cacheKey(const char *rgchKey, size_t cchKey); @@ -29,12 +29,7 @@ class StorageCache public: static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { - StorageCache *cache = new StorageCache(nullptr); - if (!pfactory->FSlow()) - { - dictRelease(cache->m_pdict); - cache->m_pdict = nullptr; - } + StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/); load_iter_data data = {cache, fn, privdata}; cache->m_spstorage = std::shared_ptr(pfactory->create(db, key_load_itr, (void*)&data)); return cache; From a2c2337cd9e85d29cd7a66a64707f349da483a4e Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Mar 2021 02:52:22 +0000 Subject: [PATCH 10/12] Fix failed merge due to overwriting the hashidx and pretending no rehash is in progress Former-commit-id: 0524b822a05b732e0f3e510dcaeb3304069d3595 --- src/dict.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 7a05ed062..f25eff002 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -201,8 +201,6 @@ int dictMerge(dict *dst, dict *src) std::swap(dst->iterators, src->iterators); } - src->rehashidx = -1; - if (!dictIsRehashing(dst) && !dictIsRehashing(src)) { if (dst->ht[0].size >= src->ht[0].size) From 76366769881a7281f495c9813b43e60a207af329 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Mar 2021 02:53:15 +0000 Subject: [PATCH 11/12] When we add a delay to rdb save for test purposes we still need to check for aborts Former-commit-id: b057b4d05aae6c08b855bfc9ae48d41ad0e881f8 --- src/rdb.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 3d4b9f345..cedf787ce 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1146,8 +1146,14 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn if (rdbSaveObject(rdb,val,key) == -1) return -1; /* Delay return if required (for testing) */ - if (serverTL->getRdbKeySaveDelay()) - usleep(serverTL->getRdbKeySaveDelay()); + if (serverTL->getRdbKeySaveDelay()) { + int sleepTime = serverTL->getRdbKeySaveDelay(); + while (!g_pserver->rdbThreadVars.fRdbThreadCancel && sleepTime > 0) { + int sleepThisTime = std::min(100, sleepTime); + usleep(sleepThisTime); + sleepTime -= sleepThisTime; + } + } /* Save expire entry after as it will apply to the previously loaded key */ /* This is because we update the expire datastructure directly without buffering */ From bf81e55547b6ffb69cd587433a63022cddccef9e Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Mar 2021 02:53:52 +0000 Subject: [PATCH 12/12] If we're clearing the command queue we need to reset the parse length Former-commit-id: 19068f990a77fb428a50a8f751ed6f8cf59a8a74 --- src/replication.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index a6c13c044..0589868cb 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3434,12 +3434,9 @@ void replicationCacheMaster(redisMaster *mi, client *c) { * pending outputs to the master. */ sdsclear(mi->master->querybuf); if (!mi->master->vecqueuedcmd.empty()) { - // Clear out everything except for partially parsed commands (which we'll cache) - auto cmd = std::move(mi->master->vecqueuedcmd.front()); mi->master->vecqueuedcmd.clear(); - if (cmd.argc != cmd.argcMax) - mi->master->vecqueuedcmd.emplace_back(std::move(cmd)); } + mi->master->multibulklen = 0; sdsclear(mi->master->pending_querybuf); mi->master->read_reploff = mi->master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c);