Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: 066f984e5223f58b239d99c115901ea9845da513
This commit is contained in:
John Sully 2021-03-10 04:03:13 +00:00
commit ff3c13a938
9 changed files with 240 additions and 115 deletions

View File

@ -1,36 +1,81 @@
#include "server.h" #include "server.h"
uint64_t hashPassthrough(const void *hash) {
return static_cast<uint64_t>(reinterpret_cast<uintptr_t>(hash));
}
int hashCompare(void *, const void *key1, const void *key2) {
auto diff = (reinterpret_cast<uintptr_t>(key1) - reinterpret_cast<uintptr_t>(key2));
return !diff;
}
dictType dbStorageCacheType = {
hashPassthrough, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
hashCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
StorageCache::StorageCache(IStorage *storage, bool fCache)
: m_spstorage(storage)
{
if (fCache)
m_pdict = dictCreate(&dbStorageCacheType, nullptr);
}
void StorageCache::clear() void StorageCache::clear()
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (m_setkeys != nullptr) if (m_pdict != nullptr)
m_setkeys->clear(); dictEmpty(m_pdict, nullptr);
m_spstorage->clear(); m_spstorage->clear();
m_collisionCount = 0;
} }
void StorageCache::cacheKey(sds key) void StorageCache::cacheKey(sds key)
{ {
if (m_setkeys == nullptr) if (m_pdict == nullptr)
return; return;
m_setkeys->insert(sdsimmutablestring(sdsdupshared(key))); uintptr_t hash = dictSdsHash(key);
if (dictAdd(m_pdict, reinterpret_cast<void*>(hash), (void*)1) != DICT_OK) {
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
}
} }
void StorageCache::cacheKey(const char *rgch, size_t cch) void StorageCache::cacheKey(const char *rgch, size_t cch)
{ {
if (m_setkeys == nullptr) if (m_pdict == nullptr)
return; return;
m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch))); uintptr_t hash = dictGenHashFunction(rgch, (int)cch);
if (dictAdd(m_pdict, reinterpret_cast<void*>(hash), (void*)1) != DICT_OK) {
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
}
} }
bool StorageCache::erase(sds key) bool StorageCache::erase(sds key)
{ {
bool result = m_spstorage->erase(key, sdslen(key)); bool result = m_spstorage->erase(key, sdslen(key));
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (result && m_setkeys != nullptr) if (result && m_pdict != nullptr)
{ {
auto itr = m_setkeys->find(sdsview(key)); uint64_t hash = dictSdsHash(key);
serverAssert(itr != m_setkeys->end()); dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
m_setkeys->erase(itr); serverAssert(de != nullptr);
de->v.s64--;
serverAssert(de->v.s64 >= 0);
if (de->v.s64 == 0) {
dictDelete(m_pdict, reinterpret_cast<void*>(hash));
} else {
m_collisionCount--;
}
} }
return result; return result;
} }
@ -38,7 +83,7 @@ bool StorageCache::erase(sds key)
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (!fOverwrite && m_setkeys != nullptr) if (!fOverwrite && m_pdict != nullptr)
{ {
cacheKey(key); cacheKey(key);
} }
@ -50,20 +95,20 @@ const StorageCache *StorageCache::clone()
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
// Clones never clone the cache // Clones never clone the cache
StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone())); StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone()), false /*fCache*/);
return cacheNew; return cacheNew;
} }
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (m_setkeys != nullptr) if (m_pdict != nullptr)
{ {
auto itr = m_setkeys->find(sdsview(key)); uint64_t hash = dictSdsHash(key);
if (itr == m_setkeys->end()) dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
if (de == nullptr)
return; // Not found return; // Not found
if (cachedKey != nullptr)
*cachedKey = sdsdupshared(itr->get());
} }
ul.unlock(); ul.unlock();
m_spstorage->retrieve(key, sdslen(key), fn); m_spstorage->retrieve(key, sdslen(key), fn);
@ -73,8 +118,9 @@ size_t StorageCache::count() const
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
size_t count = m_spstorage->count(); size_t count = m_spstorage->count();
if (m_setkeys != nullptr) if (m_pdict != nullptr) {
serverAssert(count == m_setkeys->size()); serverAssert(count == (dictSize(m_pdict) + m_collisionCount));
}
return count; return count;
} }

View File

@ -4,12 +4,11 @@
class StorageCache class StorageCache
{ {
std::shared_ptr<IStorage> m_spstorage; std::shared_ptr<IStorage> m_spstorage;
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys; dict *m_pdict = nullptr;
int m_collisionCount = 0;
mutable fastlock m_lock {"StorageCache"}; mutable fastlock m_lock {"StorageCache"};
StorageCache(IStorage *storage) StorageCache(IStorage *storage, bool fNoCache);
: m_spstorage(storage)
{}
void cacheKey(sds key); void cacheKey(sds key);
void cacheKey(const char *rgchKey, size_t cchKey); void cacheKey(const char *rgchKey, size_t cchKey);
@ -30,11 +29,7 @@ class StorageCache
public: public:
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr); StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/);
if (pfactory->FSlow())
{
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>(20);
}
load_iter_data data = {cache, fn, privdata}; load_iter_data data = {cache, fn, privdata};
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data)); cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
return cache; return cache;
@ -42,7 +37,7 @@ public:
void clear(); void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const; void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key); bool erase(sds key);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -53,6 +53,15 @@ void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o);
dictType dictChangeDescType {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
nullptr /* val destructor */
};
/* Update LFU when an object is accessed. /* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached. * Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */ * Then logarithmically increment the counter, and update the access time. */
@ -419,12 +428,13 @@ bool redisDbPersistentData::syncDelete(robj *key)
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
if (fDeleted) { if (fDeleted) {
auto itrChange = m_setchanged.find(szFromObj(key)); dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key));
if (itrChange != m_setchanged.end()) if (de != nullptr)
{ {
if (!itrChange->fUpdate) bool fUpdate = (bool)dictGetVal(de);
if (!fUpdate)
--m_cnewKeysPending; --m_cnewKeysPending;
m_setchanged.erase(itrChange); dictFreeUnlinkedEntry(m_dictChanged, de);
} }
if (m_pdbSnapshot != nullptr) if (m_pdbSnapshot != nullptr)
@ -604,6 +614,7 @@ const dbBackup *backupDb(void) {
/* Discard a previously created backup, this can be slow (similar to FLUSHALL) /* Discard a previously created backup, this can be slow (similar to FLUSHALL)
* Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */ * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
void discardDbBackup(const dbBackup *backup, int flags, void(callback)(void*)) { void discardDbBackup(const dbBackup *backup, int flags, void(callback)(void*)) {
UNUSED(callback);
int async = (flags & EMPTYDB_ASYNC); int async = (flags & EMPTYDB_ASYNC);
/* Release main DBs backup . */ /* Release main DBs backup . */
@ -2505,7 +2516,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
dictEmpty(m_pdict,callback); dictEmpty(m_pdict,callback);
if (m_fTrackingChanges) if (m_fTrackingChanges)
{ {
m_setchanged.clear(); dictEmpty(m_dictChanged, nullptr);
m_cnewKeysPending = 0; m_cnewKeysPending = 0;
m_fAllChanged++; m_fAllChanged++;
} }
@ -2624,20 +2635,18 @@ LNotFound:
{ {
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{ {
sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available
robj *o = nullptr; robj *o = nullptr;
sds sdsNewKey = sdsdupshared(sdsKey);
std::unique_ptr<expireEntry> spexpire; std::unique_ptr<expireEntry> spexpire;
m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); spexpire = deserializeExpire(sdsNewKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(this, sdsNewKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}, &sdsNewKey); });
if (o != nullptr) if (o != nullptr)
{ {
if (sdsNewKey == nullptr)
sdsNewKey = sdsdupshared(sdsKey);
dictAdd(m_pdict, sdsNewKey, o); dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr); o->SetFExpires(spexpire != nullptr);
@ -2650,11 +2659,8 @@ LNotFound:
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
} }
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
} } else {
else sdsfree(sdsNewKey);
{
if (sdsNewKey != nullptr)
sdsfree(sdsNewKey); // BUG but don't bother crashing
} }
*pde = dictFind(m_pdict, sdsKey); *pde = dictFind(m_pdict, sdsKey);
@ -2688,14 +2694,14 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di); dictReleaseIterator(di);
} }
/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change) /* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate)
{ {
auto itr = db->find_cached_threadsafe(change.strkey.get()); auto itr = db->find_cached_threadsafe(key);
if (itr == nullptr) if (itr == nullptr)
return; return;
robj *o = itr.val(); robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate); storage->insert((sds)key, temp, sdslen(temp), fUpdate);
sdsfree(temp); sdsfree(temp);
} }
@ -2708,17 +2714,20 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
{ {
if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0) if (!m_fAllChanged && dictSize(m_dictChanged) == 0 && m_cnewKeysPending == 0)
return false; return false;
m_spstorage->beginWriteBatch(); m_spstorage->beginWriteBatch();
serverAssert(m_pdbSnapshotStorageFlush == nullptr); serverAssert(m_pdbSnapshotStorageFlush == nullptr);
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) if (fSnapshot && !m_fAllChanged && dictSize(m_dictChanged) > 100)
{ {
// Do a snapshot based process if possible // Do a snapshot based process if possible
m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */); m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */);
if (m_pdbSnapshotStorageFlush) if (m_pdbSnapshotStorageFlush)
{ {
m_setchangedStorageFlush = std::move(m_setchanged); if (m_dictChangedStorageFlush)
dictRelease(m_dictChangedStorageFlush);
m_dictChangedStorageFlush = m_dictChanged;
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
} }
} }
@ -2732,13 +2741,16 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
} }
else else
{ {
for (auto &change : m_setchanged) dictIterator *di = dictGetIterator(m_dictChanged);
dictEntry *de;
while ((de = dictNext(di)) != nullptr)
{ {
serializeAndStoreChange(m_spstorage.get(), this, change); serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de));
} }
dictReleaseIterator(di);
} }
} }
m_setchanged.clear(); dictEmpty(m_dictChanged, nullptr);
m_cnewKeysPending = 0; m_cnewKeysPending = 0;
} }
return (m_spstorage != nullptr); return (m_spstorage != nullptr);
@ -2748,12 +2760,15 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
{ {
if (m_pdbSnapshotStorageFlush) if (m_pdbSnapshotStorageFlush)
{ {
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
for (auto &change : m_setchangedStorageFlush) dictEntry *de;
while ((de = dictNext(di)) != nullptr)
{ {
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change); serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de));
} }
m_setchangedStorageFlush.clear(); dictReleaseIterator(di);
dictRelease(m_dictChangedStorageFlush);
m_dictChangedStorageFlush = nullptr;
*psnapshotFree = m_pdbSnapshotStorageFlush; *psnapshotFree = m_pdbSnapshotStorageFlush;
m_pdbSnapshotStorageFlush = nullptr; m_pdbSnapshotStorageFlush = nullptr;
} }
@ -2773,6 +2788,12 @@ redisDbPersistentData::~redisDbPersistentData()
dictRelease(m_pdict); dictRelease(m_pdict);
if (m_pdictTombstone) if (m_pdictTombstone)
dictRelease(m_pdictTombstone); dictRelease(m_pdictTombstone);
if (m_dictChanged)
dictRelease(m_dictChanged);
if (m_dictChangedStorageFlush)
dictRelease(m_dictChangedStorageFlush);
delete m_setexpire; delete m_setexpire;
} }
@ -2815,8 +2836,8 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->batch_lock(); m_spstorage->batch_lock();
auto itr = m_setchanged.find(key); dictEntry *de = dictFind(m_dictChanged, key);
if (itr != m_setchanged.end()) if (de != nullptr)
{ {
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->batch_unlock(); m_spstorage->batch_unlock();
@ -2832,11 +2853,18 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
return true; return true;
} }
void redisDbPersistentData::trackChanges(bool fBulk) void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
{ {
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
if (fBulk) if (fBulk)
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
if (m_dictChanged == nullptr) {
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
}
if (sizeHint > 0)
dictExpand(m_dictChanged, sizeHint, false);
} }
void redisDbPersistentData::removeAllCachedValues() void redisDbPersistentData::removeAllCachedValues()
@ -2849,15 +2877,24 @@ void redisDbPersistentData::removeAllCachedValues()
trackChanges(false); trackChanges(false);
} }
dictEmpty(m_pdict, nullptr); if (m_pdict->iterators == 0) {
dict *dT = m_pdict;
m_pdict = dictCreate(&dbDictType, this);
dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash
g_pserver->asyncworkqueue->AddWorkFunction([dT]{
dictRelease(dT);
}, true);
} else {
dictEmpty(m_pdict, nullptr);
}
} }
void redisDbPersistentData::trackkey(const char *key, bool fUpdate) void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
{ {
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
auto itr = m_setchanged.find(key); dictEntry *de = dictFind(m_dictChanged, key);
if (itr == m_setchanged.end()) { if (de == nullptr) {
m_setchanged.emplace(sdsdupshared(key), fUpdate); dictAdd(m_dictChanged, (void*)sdsdupshared(key), (void*)fUpdate);
if (!fUpdate) if (!fUpdate)
++m_cnewKeysPending; ++m_cnewKeysPending;
} }
@ -2970,6 +3007,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
lock.arm(c); lock.arm(c);
getKeysResult result = GETKEYS_RESULT_INIT; getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0])); auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey) for (int ikey = 0; ikey < numkeys; ++ikey)
{ {
@ -2984,20 +3023,21 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts; std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
for (robj *objKey : veckeys) for (robj *objKey : veckeys)
{ {
sds sharedKey = nullptr; sds sharedKey = sdsdupshared((sds)szFromObj(objKey));
std::unique_ptr<expireEntry> spexpire; std::unique_ptr<expireEntry> spexpire;
robj *o = nullptr; robj *o = nullptr;
m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset); spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(this, sharedKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}, &sharedKey); });
if (sharedKey == nullptr) if (o != nullptr) {
sharedKey = sdsdupshared(szFromObj(objKey)); vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
} else if (sharedKey != nullptr) {
vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); sdsfree(sharedKey);
}
} }
lock.arm(c); lock.arm(c);

View File

@ -201,8 +201,6 @@ int dictMerge(dict *dst, dict *src)
std::swap(dst->iterators, src->iterators); std::swap(dst->iterators, src->iterators);
} }
src->rehashidx = -1;
if (!dictIsRehashing(dst) && !dictIsRehashing(src)) if (!dictIsRehashing(dst) && !dictIsRehashing(src))
{ {
if (dst->ht[0].size >= src->ht[0].size) if (dst->ht[0].size >= src->ht[0].size)
@ -378,7 +376,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
int empty_visits = buckets * 10; int empty_visits = buckets * 10;
while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) { while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) {
dictEntry *de; dictEntry *de;
/* Note that rehashidx can't overflow as we are sure there are more /* Note that rehashidx can't overflow as we are sure there are more
@ -386,7 +384,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
while(d->ht[0].table[d->rehashidx] == NULL) { while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++; d->rehashidx++;
if (--empty_visits == 0) goto LDone; if (--empty_visits == 0) goto LDone;
if (d->rehashidx >= d->ht[0].size) goto LDone; if ((size_t)d->rehashidx >= d->ht[0].size) goto LDone;
} }
de = d->ht[0].table[d->rehashidx]; de = d->ht[0].table[d->rehashidx];
@ -666,7 +664,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
else else
d->ht[table].table[idx] = he->next; d->ht[table].table[idx] = he->next;
if (!nofree) { if (!nofree) {
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) { if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
he->next = d->asyncdata->deGCList; he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he->next; d->asyncdata->deGCList = he->next;
} else { } else {
@ -746,7 +744,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
if ((he = ht->table[i]) == NULL) continue; if ((he = ht->table[i]) == NULL) continue;
while(he) { while(he) {
nextHe = he->next; nextHe = he->next;
if (d->asyncdata && i < d->rehashidx) { if (d->asyncdata && (ssize_t)i < d->rehashidx) {
he->next = d->asyncdata->deGCList; he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he; d->asyncdata->deGCList = he;
} else { } else {
@ -1267,6 +1265,7 @@ unsigned long dictScan(dict *d,
/* Expand the hash table if needed */ /* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d) static int _dictExpandIfNeeded(dict *d)
{ {
static const size_t SHRINK_FACTOR = 4;
/* Incremental rehashing already in progress. Return. */ /* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK; if (dictIsRehashing(d)) return DICT_OK;
@ -1283,10 +1282,10 @@ static int _dictExpandIfNeeded(dict *d)
{ {
return dictExpand(d, d->ht[0].used*2, false /*fShrink*/); return dictExpand(d, d->ht[0].used*2, false /*fShrink*/);
} }
else if (d->ht[0].used > 0 && d->ht[0].used * 16 < d->ht[0].size && dict_can_resize) else if (d->ht[0].used > 0 && d->ht[0].size >= (1024*SHRINK_FACTOR) && (d->ht[0].used * 16) < d->ht[0].size && dict_can_resize)
{ {
// If the dictionary has shurnk a lot we'll need to shrink the hash table instead // If the dictionary has shurnk a lot we'll need to shrink the hash table instead
return dictExpand(d, d->ht[0].used*2, true /*fShrink*/); return dictExpand(d, d->ht[0].size/SHRINK_FACTOR, true /*fShrink*/);
} }
return DICT_OK; return DICT_OK;
} }

View File

@ -1146,8 +1146,14 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn
if (rdbSaveObject(rdb,val,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Delay return if required (for testing) */ /* Delay return if required (for testing) */
if (serverTL->getRdbKeySaveDelay()) if (serverTL->getRdbKeySaveDelay()) {
usleep(serverTL->getRdbKeySaveDelay()); int sleepTime = serverTL->getRdbKeySaveDelay();
while (!g_pserver->rdbThreadVars.fRdbThreadCancel && sleepTime > 0) {
int sleepThisTime = std::min(100, sleepTime);
usleep(sleepThisTime);
sleepTime -= sleepThisTime;
}
}
/* Save expire entry after as it will apply to the previously loaded key */ /* Save expire entry after as it will apply to the previously loaded key */
/* This is because we update the expire datastructure directly without buffering */ /* This is because we update the expire datastructure directly without buffering */
@ -2364,6 +2370,21 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
} }
} }
class EvictionPolicyCleanup
{
int oldpolicy;
public:
EvictionPolicyCleanup() {
oldpolicy = g_pserver->maxmemory_policy;
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM;
}
~EvictionPolicyCleanup() {
g_pserver->maxmemory_policy = oldpolicy;
}
};
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
@ -2380,9 +2401,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false; bool fLastKeyExpired = false;
// If we're running flash we may evict during load. We want a fast eviction function
// because there isn't any difference in use times between keys anyways
EvictionPolicyCleanup ecleanup;
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->trackChanges(true); g_pserver->db[idb]->trackChanges(true, 1024);
} }
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
@ -2403,7 +2428,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
now = mstime(); now = mstime();
lru_clock = LRU_CLOCK(); lru_clock = LRU_CLOCK();
while(1) { while(1) {
robj *val; robj *val;
@ -2645,17 +2670,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} else { } else {
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit, /* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */ do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 16) == 0) if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{ {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK || (ckeysLoaded % (1024)) == 0) bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
{ {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
if (g_pserver->db[idb]->processChanges(false)) if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false); if (fHighMemory && !(rsi && rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
} }
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
} }
} }

View File

@ -3434,12 +3434,9 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
* pending outputs to the master. */ * pending outputs to the master. */
sdsclear(mi->master->querybuf); sdsclear(mi->master->querybuf);
if (!mi->master->vecqueuedcmd.empty()) { if (!mi->master->vecqueuedcmd.empty()) {
// Clear out everything except for partially parsed commands (which we'll cache)
auto cmd = std::move(mi->master->vecqueuedcmd.front());
mi->master->vecqueuedcmd.clear(); mi->master->vecqueuedcmd.clear();
if (cmd.argc != cmd.argcMax)
mi->master->vecqueuedcmd.emplace_back(std::move(cmd));
} }
mi->master->multibulklen = 0;
sdsclear(mi->master->pending_querybuf); sdsclear(mi->master->pending_querybuf);
mi->master->read_reploff = mi->master->reploff; mi->master->read_reploff = mi->master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c); if (c->flags & CLIENT_MULTI) discardTransaction(c);

View File

@ -2157,6 +2157,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(id); UNUSED(id);
UNUSED(clientData); UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
/* If another threads unblocked one of our clients, and this thread has been idle /* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also then beforeSleep won't have a chance to process the unblocking. So we also
process them here in the cron job to ensure they don't starve. process them here in the cron job to ensure they don't starve.
@ -2448,6 +2456,14 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
UNUSED(id); UNUSED(id);
UNUSED(clientData); UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
int iel = ielFromEventLoop(eventLoop); int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN); serverAssert(iel != IDX_EVENT_LOOP_MAIN);
@ -6068,9 +6084,12 @@ void OnTerminate()
} }
void *timeThreadMain(void*) { void *timeThreadMain(void*) {
timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = 100;
while (true) { while (true) {
updateCachedTime(); updateCachedTime();
usleep(1); clock_nanosleep(CLOCK_REALTIME, 0, &delay, NULL);
} }
} }
@ -6422,6 +6441,9 @@ int main(int argc, char **argv) {
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr); pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
struct sched_param time_thread_priority;
time_thread_priority.sched_priority = sched_get_priority_max(SCHED_FIFO);
pthread_setschedparam(cserver.time_thread_id, SCHED_FIFO, &time_thread_priority);
pthread_attr_t tattr; pthread_attr_t tattr;
pthread_attr_init(&tattr); pthread_attr_init(&tattr);

View File

@ -1109,7 +1109,7 @@ public:
void setStorageProvider(StorageCache *pstorage); void setStorageProvider(StorageCache *pstorage);
void trackChanges(bool fBulk); void trackChanges(bool fBulk, size_t sizeHint = 0);
// Process and commit changes for secondary storage. Note that process and commit are seperated // Process and commit changes for secondary storage. Note that process and commit are seperated
// to allow you to release the global lock before commiting. To prevent deadlocks you *must* // to allow you to release the global lock before commiting. To prevent deadlocks you *must*
@ -1146,22 +1146,7 @@ protected:
uint64_t m_mvccCheckpoint = 0; uint64_t m_mvccCheckpoint = 0;
private: private:
struct changedesc static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate);
{
sdsimmutablestring strkey;
bool fUpdate;
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
};
struct changedescCmp
{
using is_transparent = void; // C++14 to allow comparisons with different types
bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; }
bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); }
bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; }
};
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change);
void ensure(const char *key); void ensure(const char *key);
void ensure(const char *key, dictEntry **de); void ensure(const char *key, dictEntry **de);
@ -1174,7 +1159,7 @@ private:
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
std::atomic<int> m_fTrackingChanges {0}; // Note: Stack based std::atomic<int> m_fTrackingChanges {0}; // Note: Stack based
std::atomic<int> m_fAllChanged {0}; std::atomic<int> m_fAllChanged {0};
std::set<changedesc, changedescCmp> m_setchanged; dict *m_dictChanged = nullptr;
size_t m_cnewKeysPending = 0; size_t m_cnewKeysPending = 0;
std::shared_ptr<StorageCache> m_spstorage = nullptr; std::shared_ptr<StorageCache> m_spstorage = nullptr;
@ -1189,7 +1174,7 @@ private:
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr;
std::set<changedesc, changedescCmp> m_setchangedStorageFlush; dict *m_dictChangedStorageFlush = nullptr;
int m_refCount = 0; int m_refCount = 0;
}; };

View File

@ -93,7 +93,17 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
dictRehashMilliseconds(m_pdict, 50); // Give us the best chance at a fast cleanup // We can't have async rehash modifying these. Setting the asyncdata list to null
// will cause us to throw away the async work rather than modify the tables in flight
if (m_pdict->asyncdata != nullptr) {
m_pdict->asyncdata = nullptr;
m_pdict->rehashidx = 0;
}
if (m_pdictTombstone->asyncdata != nullptr) {
m_pdictTombstone->rehashidx = 0;
m_pdictTombstone->asyncdata = nullptr;
}
spdb->m_fAllChanged = false; spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0; spdb->m_fTrackingChanges = 0;
spdb->m_pdict = m_pdict; spdb->m_pdict = m_pdict;