Load perf fixes with a storage provider set

Former-commit-id: 861b19de00c75c9167cc25031292284ad1c21893
This commit is contained in:
John Sully 2021-03-04 07:41:06 +00:00
parent 269cbd5c25
commit 1bf6ca0645
5 changed files with 140 additions and 83 deletions

View File

@ -1,36 +1,80 @@
#include "server.h" #include "server.h"
uint64_t hashPassthrough(const void *hash) {
return static_cast<uint64_t>(reinterpret_cast<uintptr_t>(hash));
}
int hashCompare(void *, const void *key1, const void *key2) {
auto diff = (reinterpret_cast<uintptr_t>(key1) - reinterpret_cast<uintptr_t>(key2));
return !diff;
}
dictType dbStorageCacheType = {
hashPassthrough, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
hashCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
StorageCache::StorageCache(IStorage *storage)
: m_spstorage(storage)
{
m_pdict = dictCreate(&dbStorageCacheType, nullptr);
}
void StorageCache::clear() void StorageCache::clear()
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (m_setkeys != nullptr) if (m_pdict != nullptr)
m_setkeys->clear(); dictEmpty(m_pdict, nullptr);
m_spstorage->clear(); m_spstorage->clear();
m_collisionCount = 0;
} }
void StorageCache::cacheKey(sds key) void StorageCache::cacheKey(sds key)
{ {
if (m_setkeys == nullptr) if (m_pdict == nullptr)
return; return;
m_setkeys->insert(sdsimmutablestring(sdsdupshared(key))); uintptr_t hash = dictSdsHash(key);
if (dictAdd(m_pdict, reinterpret_cast<void*>(hash), (void*)1) != DICT_OK) {
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
}
} }
void StorageCache::cacheKey(const char *rgch, size_t cch) void StorageCache::cacheKey(const char *rgch, size_t cch)
{ {
if (m_setkeys == nullptr) if (m_pdict == nullptr)
return; return;
m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch))); uintptr_t hash = dictGenHashFunction(rgch, (int)cch);
if (dictAdd(m_pdict, reinterpret_cast<void*>(hash), (void*)1) != DICT_OK) {
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
}
} }
bool StorageCache::erase(sds key) bool StorageCache::erase(sds key)
{ {
bool result = m_spstorage->erase(key, sdslen(key)); bool result = m_spstorage->erase(key, sdslen(key));
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (result && m_setkeys != nullptr) if (result && m_pdict != nullptr)
{ {
auto itr = m_setkeys->find(sdsview(key)); uint64_t hash = dictSdsHash(key);
serverAssert(itr != m_setkeys->end()); dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
m_setkeys->erase(itr); serverAssert(de != nullptr);
de->v.s64--;
serverAssert(de->v.s64 >= 0);
if (de->v.s64 == 0) {
dictDelete(m_pdict, reinterpret_cast<void*>(hash));
} else {
m_collisionCount--;
}
} }
return result; return result;
} }
@ -38,7 +82,7 @@ bool StorageCache::erase(sds key)
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (!fOverwrite && m_setkeys != nullptr) if (!fOverwrite && m_pdict != nullptr)
{ {
cacheKey(key); cacheKey(key);
} }
@ -54,16 +98,16 @@ const StorageCache *StorageCache::clone()
return cacheNew; return cacheNew;
} }
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
if (m_setkeys != nullptr) if (m_pdict != nullptr)
{ {
auto itr = m_setkeys->find(sdsview(key)); uint64_t hash = dictSdsHash(key);
if (itr == m_setkeys->end()) dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
if (de == nullptr)
return; // Not found return; // Not found
if (cachedKey != nullptr)
*cachedKey = sdsdupshared(itr->get());
} }
ul.unlock(); ul.unlock();
m_spstorage->retrieve(key, sdslen(key), fn); m_spstorage->retrieve(key, sdslen(key), fn);
@ -73,8 +117,9 @@ size_t StorageCache::count() const
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
size_t count = m_spstorage->count(); size_t count = m_spstorage->count();
if (m_setkeys != nullptr) if (m_pdict != nullptr) {
serverAssert(count == m_setkeys->size()); serverAssert(count == (dictSize(m_pdict) + m_collisionCount));
}
return count; return count;
} }

View File

@ -4,12 +4,11 @@
class StorageCache class StorageCache
{ {
std::shared_ptr<IStorage> m_spstorage; std::shared_ptr<IStorage> m_spstorage;
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys; dict *m_pdict = nullptr;
int m_collisionCount = 0;
mutable fastlock m_lock {"StorageCache"}; mutable fastlock m_lock {"StorageCache"};
StorageCache(IStorage *storage) StorageCache(IStorage *storage);
: m_spstorage(storage)
{}
void cacheKey(sds key); void cacheKey(sds key);
void cacheKey(const char *rgchKey, size_t cchKey); void cacheKey(const char *rgchKey, size_t cchKey);
@ -31,9 +30,10 @@ class StorageCache
public: public:
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr); StorageCache *cache = new StorageCache(nullptr);
if (pfactory->FSlow()) if (!pfactory->FSlow())
{ {
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>(20); dictRelease(cache->m_pdict);
cache->m_pdict = nullptr;
} }
load_iter_data data = {cache, fn, privdata}; load_iter_data data = {cache, fn, privdata};
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data)); cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
@ -42,7 +42,7 @@ public:
void clear(); void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const; void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key); bool erase(sds key);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -49,7 +49,6 @@ struct dbBackup {
int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key); int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o); int expireIfNeeded(redisDb *db, robj *key, robj *o);
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
void changedescDtor(void *privdata, void *obj);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o);
@ -59,15 +58,10 @@ dictType dictChangeDescType {
NULL, /* key dup */ NULL, /* key dup */
NULL, /* val dup */ NULL, /* val dup */
dictSdsKeyCompare, /* key compare */ dictSdsKeyCompare, /* key compare */
nullptr, /* key destructor */ dictSdsDestructor, /* key destructor */
changedescDtor /* val destructor */ nullptr /* val destructor */
}; };
void changedescDtor(void *, void *obj) {
redisDbPersistentData::changedesc *desc = (redisDbPersistentData::changedesc*)obj;
delete desc;
}
/* Update LFU when an object is accessed. /* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached. * Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */ * Then logarithmically increment the counter, and update the access time. */
@ -437,8 +431,8 @@ bool redisDbPersistentData::syncDelete(robj *key)
dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key)); dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key));
if (de != nullptr) if (de != nullptr)
{ {
changedesc *desc = (changedesc*)dictGetVal(de); bool fUpdate = (bool)dictGetVal(de);
if (!desc->fUpdate) if (!fUpdate)
--m_cnewKeysPending; --m_cnewKeysPending;
dictFreeUnlinkedEntry(m_dictChanged, de); dictFreeUnlinkedEntry(m_dictChanged, de);
} }
@ -2641,20 +2635,18 @@ LNotFound:
{ {
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{ {
sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available
robj *o = nullptr; robj *o = nullptr;
sds sdsNewKey = sdsdupshared(sdsKey);
std::unique_ptr<expireEntry> spexpire; std::unique_ptr<expireEntry> spexpire;
m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); spexpire = deserializeExpire(sdsNewKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(this, sdsNewKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}, &sdsNewKey); });
if (o != nullptr) if (o != nullptr)
{ {
if (sdsNewKey == nullptr)
sdsNewKey = sdsdupshared(sdsKey);
dictAdd(m_pdict, sdsNewKey, o); dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr); o->SetFExpires(spexpire != nullptr);
@ -2667,11 +2659,8 @@ LNotFound:
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
} }
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
} } else {
else sdsfree(sdsNewKey);
{
if (sdsNewKey != nullptr)
sdsfree(sdsNewKey); // BUG but don't bother crashing
} }
*pde = dictFind(m_pdict, sdsKey); *pde = dictFind(m_pdict, sdsKey);
@ -2705,14 +2694,14 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di); dictReleaseIterator(di);
} }
/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change) /* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate)
{ {
auto itr = db->find_cached_threadsafe(change.strkey.get()); auto itr = db->find_cached_threadsafe(key);
if (itr == nullptr) if (itr == nullptr)
return; return;
robj *o = itr.val(); robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate); storage->insert((sds)key, temp, sdslen(temp), fUpdate);
sdsfree(temp); sdsfree(temp);
} }
@ -2756,8 +2745,7 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
dictEntry *de; dictEntry *de;
while ((de = dictNext(di)) != nullptr) while ((de = dictNext(di)) != nullptr)
{ {
changedesc *change = (changedesc*)dictGetVal(de); serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de));
serializeAndStoreChange(m_spstorage.get(), this, *change);
} }
dictReleaseIterator(di); dictReleaseIterator(di);
} }
@ -2776,8 +2764,7 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
dictEntry *de; dictEntry *de;
while ((de = dictNext(di)) != nullptr) while ((de = dictNext(di)) != nullptr)
{ {
changedesc *change = (changedesc*)dictGetVal(de); serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de));
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, *change);
} }
dictReleaseIterator(di); dictReleaseIterator(di);
dictRelease(m_dictChangedStorageFlush); dictRelease(m_dictChangedStorageFlush);
@ -2866,16 +2853,20 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
return true; return true;
} }
void redisDbPersistentData::trackChanges(bool fBulk) void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
{ {
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
if (fBulk) if (fBulk)
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
if (m_dictChanged == nullptr) if (m_dictChanged == nullptr) {
m_dictChanged = dictCreate(&dictChangeDescType, nullptr); m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
} }
if (sizeHint > 0)
dictExpand(m_dictChanged, sizeHint, false);
}
void redisDbPersistentData::removeAllCachedValues() void redisDbPersistentData::removeAllCachedValues()
{ {
// First we have to flush the tracked changes // First we have to flush the tracked changes
@ -2886,16 +2877,24 @@ void redisDbPersistentData::removeAllCachedValues()
trackChanges(false); trackChanges(false);
} }
if (m_pdict->iterators == 0) {
dict *dT = m_pdict;
m_pdict = dictCreate(&dbDictType, this);
dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash
g_pserver->asyncworkqueue->AddWorkFunction([dT]{
dictRelease(dT);
}, true);
} else {
dictEmpty(m_pdict, nullptr); dictEmpty(m_pdict, nullptr);
} }
}
void redisDbPersistentData::trackkey(const char *key, bool fUpdate) void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
{ {
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
dictEntry *de = dictFind(m_dictChanged, key); dictEntry *de = dictFind(m_dictChanged, key);
if (de == nullptr) { if (de == nullptr) {
changedesc *desc = new changedesc(sdsdupshared(key), fUpdate); dictAdd(m_dictChanged, (void*)sdsdupshared(key), (void*)fUpdate);
dictAdd(m_dictChanged, (void*)desc->strkey.get(), desc);
if (!fUpdate) if (!fUpdate)
++m_cnewKeysPending; ++m_cnewKeysPending;
} }
@ -3024,20 +3023,17 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts; std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
for (robj *objKey : veckeys) for (robj *objKey : veckeys)
{ {
sds sharedKey = nullptr; sds sharedKey = sdsdupshared((sds)szFromObj(objKey));
std::unique_ptr<expireEntry> spexpire; std::unique_ptr<expireEntry> spexpire;
robj *o = nullptr; robj *o = nullptr;
m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset); spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(this, sharedKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}, &sharedKey); });
if (o != nullptr) { if (o != nullptr) {
if (sharedKey == nullptr)
sharedKey = sdsdupshared(szFromObj(objKey));
vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
} else if (sharedKey != nullptr) { } else if (sharedKey != nullptr) {
sdsfree(sharedKey); sdsfree(sharedKey);

View File

@ -2364,6 +2364,21 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
} }
} }
class EvictionPolicyCleanup
{
int oldpolicy;
public:
EvictionPolicyCleanup() {
oldpolicy = g_pserver->maxmemory_policy;
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM;
}
~EvictionPolicyCleanup() {
g_pserver->maxmemory_policy = oldpolicy;
}
};
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
@ -2380,9 +2395,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false; bool fLastKeyExpired = false;
// If we're running flash we may evict during load. We want a fast eviction function
// because there isn't any difference in use times between keys anyways
EvictionPolicyCleanup ecleanup;
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->trackChanges(true); g_pserver->db[idb]->trackChanges(true, 1024);
} }
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
@ -2645,16 +2664,22 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} else { } else {
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit, /* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */ do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 16) == 0) if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{ {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK || (ckeysLoaded % (1024)) == 0) bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
{ {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
if (g_pserver->db[idb]->processChanges(false)) if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false); if (fHighMemory && !(rsi && rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
} }
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
} }
} }

View File

@ -1042,7 +1042,6 @@ class redisDbPersistentDataSnapshot;
class redisDbPersistentData class redisDbPersistentData
{ {
friend void dictDbKeyDestructor(void *privdata, void *key); friend void dictDbKeyDestructor(void *privdata, void *key);
friend void changedescDtor(void*, void*);
friend class redisDbPersistentDataSnapshot; friend class redisDbPersistentDataSnapshot;
public: public:
@ -1110,7 +1109,7 @@ public:
void setStorageProvider(StorageCache *pstorage); void setStorageProvider(StorageCache *pstorage);
void trackChanges(bool fBulk); void trackChanges(bool fBulk, size_t sizeHint = 0);
// Process and commit changes for secondary storage. Note that process and commit are seperated // Process and commit changes for secondary storage. Note that process and commit are seperated
// to allow you to release the global lock before commiting. To prevent deadlocks you *must* // to allow you to release the global lock before commiting. To prevent deadlocks you *must*
@ -1147,15 +1146,7 @@ protected:
uint64_t m_mvccCheckpoint = 0; uint64_t m_mvccCheckpoint = 0;
private: private:
struct changedesc static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate);
{
sdsimmutablestring strkey;
bool fUpdate;
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
};
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change);
void ensure(const char *key); void ensure(const char *key);
void ensure(const char *key, dictEntry **de); void ensure(const char *key, dictEntry **de);