Add the IStorage interface and wire it up

Former-commit-id: 898efbfc0c7038818083ea29fdd63cafa47721fb
This commit is contained in:
John Sully 2019-09-20 14:52:22 -04:00
parent 63f2b3a987
commit 4f47f6818f
10 changed files with 194 additions and 20 deletions

14
src/IStorage.h Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include <functional>
class IStorage
{
public:
typedef std::function<void(const char *, size_t, const void *, size_t)> callback;
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0;
virtual void erase(const char *key, size_t cchKey) = 0;
virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0;
virtual size_t clear() = 0;
virtual void enumerate(callback fn) = 0;
};

View File

@ -87,6 +87,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
if (flags & LOOKUP_UPDATEMVCC) { if (flags & LOOKUP_UPDATEMVCC) {
val->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = getMvccTstamp();
db->trackkey(key);
} }
return val; return val;
} else { } else {
@ -800,7 +801,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
/* Handle the case of a hash table. */ /* Handle the case of a hash table. */
ht = NULL; ht = NULL;
if (o == nullptr) { if (o == nullptr) {
ht = c->db->dictUnsafe(); ht = c->db->dictUnsafeKeyOnly();
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = (dict*)ptrFromObj(o); ht = (dict*)ptrFromObj(o);
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
@ -1221,12 +1222,12 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
if (!val->FExpires()) if (!val->FExpires())
return 0; return 0;
trackkey(key);
auto itrExpire = m_setexpire->find(itr.key()); auto itrExpire = m_setexpire->find(itr.key());
serverAssert(itrExpire != m_setexpire->end()); serverAssert(itrExpire != m_setexpire->end());
serverAssert(itrExpire->key() == itr.key()); serverAssert(itrExpire->key() == itr.key());
m_setexpire->erase(itrExpire); m_setexpire->erase(itrExpire);
val->SetFExpires(false); val->SetFExpires(false);
trackkey(key);
return 1; return 1;
} }
@ -1805,13 +1806,13 @@ void redisDbPersistentData::initialize()
m_pdict = dictCreate(&dbDictType,NULL); m_pdict = dictCreate(&dbDictType,NULL);
m_setexpire = new(MALLOC_LOCAL) expireset(); m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = false; m_fAllChanged = false;
m_fTrackingChanges = false; m_fTrackingChanges = 0;
} }
void redisDb::initialize(int id) void redisDb::initialize(int id)
{ {
m_persistentData.initialize(); m_persistentData.initialize();
this->expireitr = m_persistentData.setexpireUnsafe()->end(); this->expireitr = m_persistentData.setexpire()->end();
this->blocking_keys = dictCreate(&keylistDictType,NULL); this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL);
@ -1843,7 +1844,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*))
} else { } else {
m_persistentData.clear(callback); m_persistentData.clear(callback);
} }
expireitr = m_persistentData.setexpireUnsafe()->end(); expireitr = m_persistentData.setexpire()->end();
return removed; return removed;
} }
@ -1854,6 +1855,8 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_fAllChanged = true; m_fAllChanged = true;
delete m_setexpire; delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset(); m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_pstorage != nullptr)
m_pstorage->clear();
} }
/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) /* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2)
@ -1863,11 +1866,13 @@ void redisDbPersistentData::clear(void(callback)(void*))
db1->m_fTrackingChanges = db2->m_fTrackingChanges; db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged; db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire; db1->m_setexpire = db2->m_setexpire;
db1->m_pstorage = db2->m_pstorage;
db2->m_pdict = aux.m_pdict; db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged; db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire; db2->m_setexpire = aux.m_setexpire;
db2->m_pstorage = aux.m_pstorage;
} }
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
@ -1875,6 +1880,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL); serverAssertWithInfo(NULL,key,kde != NULL);
trackkey(key);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{ {
@ -1901,15 +1907,86 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
void redisDbPersistentData::setExpire(expireEntry &&e) void redisDbPersistentData::setExpire(expireEntry &&e)
{ {
trackkey(e.key());
m_setexpire->insert(e); m_setexpire->insert(e);
} }
bool redisDb::FKeyExpires(const char *key) bool redisDb::FKeyExpires(const char *key)
{ {
return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end(); return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpire()->end();
} }
void redisDbPersistentData::updateValue(dict_iter itr, robj *val) void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
{ {
trackkey(itr.key());
dictSetVal(m_pdict, itr.de, val); dictSetVal(m_pdict, itr.de, val);
}
void redisDbPersistentData::ensure(dictEntry *de)
{
if (de != nullptr && dictGetVal(de) == nullptr)
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(de);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, de, o);
});
}
}
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o)
{
sds temp = serializeStoredObject(o);
m_pstorage->insert(szKey, cchKey, temp, sdslen(temp));
sdsfree(temp);
}
void redisDbPersistentData::storeDatabase()
{
dictIterator *di = dictGetIterator(m_pdict);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
sds key = (sds)dictGetKey(de);
robj *o = (robj*)dictGetVal(de);
storeKey(key, sdslen(key), o);
}
dictReleaseIterator(di);
}
void redisDbPersistentData::processChanges()
{
--m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0);
if (m_pstorage != nullptr)
{
if (m_fTrackingChanges == 0)
{
if (m_fAllChanged)
{
m_pstorage->clear();
storeDatabase();
}
else
{
for (auto &str : m_setchanged)
{
sds sdsKey = sdsnewlen(str.data(), str.size());
robj *o = find(sdsKey);
if (o != nullptr)
{
storeKey(str.data(), str.size(), o);
}
else
{
m_pstorage->erase(str.data(), str.size());
}
sdsfree(sdsKey);
}
}
}
}
m_setchanged.clear();
} }

View File

@ -787,6 +787,8 @@ long defragKey(redisDb *db, dictEntry *de) {
/* Try to defrag robj and / or string value. */ /* Try to defrag robj and / or string value. */
ob = (robj*)dictGetVal(de); ob = (robj*)dictGetVal(de);
if (ob == nullptr)
return defragged;
if ((newob = activeDefragStringOb(ob, &defragged))) { if ((newob = activeDefragStringOb(ob, &defragged))) {
de->v.val = newob; de->v.val = newob;
ob = newob; ob = newob;
@ -853,7 +855,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
g_pserver->stat_active_defrag_scanned++; g_pserver->stat_active_defrag_scanned++;
} }
/* Defrag scan callback for each hash table bicket, /* Defrag scan callback for each hash table bucket,
* used in order to defrag the dictEntry allocations. */ * used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
@ -1111,7 +1113,8 @@ void activeDefragCycle(void) {
break; /* this will exit the function and we'll continue on the next cycle */ break; /* this will exit the function and we'll continue on the next cycle */
} }
cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db); // we actually look at the objects too but defragScanCallback can handle missing values
cursor = dictScan(db->dictUnsafeKeyOnly(), cursor, defragScanCallback, defragDictBucketCallback, db);
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehasing), * (if we have a lot of pointers in one hash bucket or rehasing),

View File

@ -256,15 +256,16 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev
{ {
if (setexpire != nullptr) if (setexpire != nullptr)
{ {
visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 }; visitFunctor visitor { dbid, db->m_persistentData.dictUnsafeKeyOnly(), pool, 0 };
setexpire->random_visit(visitor); setexpire->random_visit(visitor);
} }
else else
{ {
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples); int count = dictGetSomeKeys(db->m_persistentData.dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) { for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]); robj *o = (robj*)dictGetVal(samples[j]);
serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
} }
} }

View File

@ -407,12 +407,12 @@ void expireSlaveKeys(void) {
// the expire is hashed based on the key pointer, so we need the point in the main db // the expire is hashed based on the key pointer, so we need the point in the main db
auto itrDB = db->find(keyname); auto itrDB = db->find(keyname);
auto itrExpire = db->m_persistentData.setexpireUnsafe()->end(); auto itrExpire = db->m_persistentData.setexpire()->end();
if (itrDB != nullptr) if (itrDB != nullptr)
itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key()); itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key());
int expired = 0; int expired = 0;
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end()) if (itrExpire != db->m_persistentData.setexpire()->end())
{ {
if (itrExpire->when() < start) { if (itrExpire->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start); activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start);
@ -424,7 +424,7 @@ void expireSlaveKeys(void) {
* corresponding bit in the new bitmap we set as value. * corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we * At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */ * no longer need to keep track of this key. */
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) { if (itrExpire != db->m_persistentData.setexpire()->end() && !expired) {
noexpire++; noexpire++;
new_dbids |= (uint64_t)1 << dbid; new_dbids |= (uint64_t)1 << dbid;
} }

View File

@ -116,6 +116,8 @@ void redisDbPersistentData::emptyDbAsync() {
auto *set = m_setexpire; auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset(); m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,NULL); m_pdict = dictCreate(&dbDictType,NULL);
if (m_pstorage != nullptr)
m_pstorage->clear();
if (m_fTrackingChanges) if (m_fTrackingChanges)
m_fAllChanged = true; m_fAllChanged = true;
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));

View File

@ -1493,3 +1493,54 @@ void redisObject::setrefcount(unsigned ref)
serverAssert(!FExpires()); serverAssert(!FExpires());
refcount.store(ref, std::memory_order_relaxed); refcount.store(ref, std::memory_order_relaxed);
} }
sds serializeStoredStringObject(robj_roptr o)
{
sds str = sdsempty();
sdscatlen(str, &(*o), sizeof(robj));
sdscat(str, szFromObj(o));
return str;
}
robj *deserializeStoredStringObject(const char *data, size_t cb)
{
const robj *oT = (const robj*)data;
robj *newObject = nullptr;
switch (oT->encoding)
{
case OBJ_ENCODING_EMBSTR:
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL);
memcpy(newObject, data, cb);
return newObject;
case OBJ_ENCODING_RAW:
newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED);
memcpy(newObject, data, sizeof(robj));
newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj));
memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj));
return newObject;
}
serverPanic("Unknown string object encoding from storage");
return nullptr;
}
robj *deserializeStoredObject(const void *data, size_t cb)
{
const robj *oT = (const robj*)data;
switch (oT->type)
{
case OBJ_STRING:
return deserializeStoredStringObject((char*)data, cb);
}
serverPanic("Unknown object type loading from storage");
}
sds serializeStoredObject(robj_roptr o)
{
switch (o->type)
{
case OBJ_STRING:
return serializeStoredStringObject(o);
}
serverPanic("Attempting to store unknown object type");
}

View File

@ -93,9 +93,9 @@ public:
return end(); return end();
} }
setiter end() setiter end() const
{ {
setiter itr(this); setiter itr(const_cast<semiorderedset<T,T_KEY,MEMMOVE_SAFE>*>(this));
itr.idxPrimary = m_data.size(); itr.idxPrimary = m_data.size();
return itr; return itr;
} }

View File

@ -3697,7 +3697,11 @@ int processCommand(client *c, int callFlags) {
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock); std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].trackChanges();
call(c,callFlags); call(c,callFlags);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
if (listLength(g_pserver->ready_keys)) if (listLength(g_pserver->ready_keys))
handleClientsBlockedOnKeys(); handleClientsBlockedOnKeys();

View File

@ -90,6 +90,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "sha1.h" #include "sha1.h"
#include "endianconv.h" #include "endianconv.h"
#include "crc64.h" #include "crc64.h"
#include "IStorage.h"
extern int g_fTestMode; extern int g_fTestMode;
@ -1086,19 +1087,21 @@ public:
void trackkey(const char *key) void trackkey(const char *key)
{ {
if (m_fTrackingChanges) if (m_fTrackingChanges && !m_fAllChanged)
m_setchanged.insert(key); m_setchanged.insert(std::string(key, sdslen(key)));
} }
dict_iter find(const char *key) dict_iter find(const char *key)
{ {
dictEntry *de = dictFind(m_pdict, key); dictEntry *de = dictFind(m_pdict, key);
ensure(de);
return dict_iter(de); return dict_iter(de);
} }
dict_iter random() dict_iter random()
{ {
dictEntry *de = dictGetRandomKey(m_pdict); dictEntry *de = dictGetRandomKey(m_pdict);
ensure(de);
return dict_iter(de); return dict_iter(de);
} }
@ -1131,16 +1134,27 @@ public:
void setExpire(expireEntry &&e); void setExpire(expireEntry &&e);
void initialize(); void initialize();
dict *dictUnsafe() { return m_pdict; } void trackChanges() { m_fTrackingChanges++; }
void processChanges();
// This should only be used if you look at the key, we do not fixup
// objects stored elsewhere
dict *dictUnsafeKeyOnly() { return m_pdict; }
expireset *setexpireUnsafe() { return m_setexpire; } expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; }
private: private:
void ensure(dictEntry *de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
// Keyspace // Keyspace
dict *m_pdict; /* The keyspace for this DB */ dict *m_pdict; /* The keyspace for this DB */
bool m_fTrackingChanges = false; int m_fTrackingChanges = 0; // Note: Stack based
bool m_fAllChanged = false; bool m_fAllChanged = false;
std::set<std::string> m_setchanged; std::set<std::string> m_setchanged;
IStorage *m_pstorage = nullptr;
// Expire // Expire
expireset *m_setexpire; expireset *m_setexpire;
@ -1181,6 +1195,10 @@ typedef struct redisDb {
void expand(uint64_t slots) { m_persistentData.expand(slots); } void expand(uint64_t slots) { m_persistentData.expand(slots); }
void tryResize() { m_persistentData.tryResize(); } void tryResize() { m_persistentData.tryResize(); }
const expireset *setexpire() { return m_persistentData.setexpire(); } const expireset *setexpire() { return m_persistentData.setexpire(); }
void trackChanges() { m_persistentData.trackChanges(); }
void processChanges() { m_persistentData.processChanges(); }
void trackkey(robj_roptr o) { m_persistentData.trackkey(o); }
iter find(robj_roptr key) iter find(robj_roptr key)
{ {
@ -1215,7 +1233,7 @@ typedef struct redisDb {
bool FKeyExpires(const char *key); bool FKeyExpires(const char *key);
size_t clear(bool fAsync, void(callback)(void*)); size_t clear(bool fAsync, void(callback)(void*));
dict *dictUnsafe() { return m_persistentData.dictUnsafe(); } dict *dictUnsafeKeyOnly() { return m_persistentData.dictUnsafeKeyOnly(); }
expireEntry *getExpire(robj_roptr key); expireEntry *getExpire(robj_roptr key);
private: private:
redisDbPersistentData m_persistentData; redisDbPersistentData m_persistentData;
@ -2338,6 +2356,10 @@ int collateStringObjects(robj *a, robj *b);
int equalStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b);
unsigned long long estimateObjectIdleTime(robj_roptr o); unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o); void trimStringObjectIfNeeded(robj *o);
robj *deserializeStoredObject(const void *data, size_t cb);
sds serializeStoredObject(robj_roptr o);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
/* Synchronous I/O with timeout */ /* Synchronous I/O with timeout */