Add the IStorage interface and wire it up

Former-commit-id: 898efbfc0c7038818083ea29fdd63cafa47721fb
This commit is contained in:
John Sully 2019-09-20 14:52:22 -04:00
parent f9f4c719d4
commit a2bd45dbd3
10 changed files with 194 additions and 20 deletions

14
src/IStorage.h Normal file
View File

@ -0,0 +1,14 @@
#pragma once
#include <functional>
class IStorage
{
public:
typedef std::function<void(const char *, size_t, const void *, size_t)> callback;
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0;
virtual void erase(const char *key, size_t cchKey) = 0;
virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0;
virtual size_t clear() = 0;
virtual void enumerate(callback fn) = 0;
};

View File

@ -87,6 +87,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
if (flags & LOOKUP_UPDATEMVCC) {
val->mvcc_tstamp = getMvccTstamp();
db->trackkey(key);
}
return val;
} else {
@ -800,7 +801,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
/* Handle the case of a hash table. */
ht = NULL;
if (o == nullptr) {
ht = c->db->dictUnsafe();
ht = c->db->dictUnsafeKeyOnly();
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = (dict*)ptrFromObj(o);
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
@ -1221,12 +1222,12 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
if (!val->FExpires())
return 0;
trackkey(key);
auto itrExpire = m_setexpire->find(itr.key());
serverAssert(itrExpire != m_setexpire->end());
serverAssert(itrExpire->key() == itr.key());
m_setexpire->erase(itrExpire);
val->SetFExpires(false);
trackkey(key);
return 1;
}
@ -1805,13 +1806,13 @@ void redisDbPersistentData::initialize()
m_pdict = dictCreate(&dbDictType,NULL);
m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = false;
m_fTrackingChanges = false;
m_fTrackingChanges = 0;
}
void redisDb::initialize(int id)
{
m_persistentData.initialize();
this->expireitr = m_persistentData.setexpireUnsafe()->end();
this->expireitr = m_persistentData.setexpire()->end();
this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL);
@ -1843,7 +1844,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*))
} else {
m_persistentData.clear(callback);
}
expireitr = m_persistentData.setexpireUnsafe()->end();
expireitr = m_persistentData.setexpire()->end();
return removed;
}
@ -1854,6 +1855,8 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_fAllChanged = true;
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_pstorage != nullptr)
m_pstorage->clear();
}
/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2)
@ -1863,11 +1866,13 @@ void redisDbPersistentData::clear(void(callback)(void*))
db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire;
db1->m_pstorage = db2->m_pstorage;
db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire;
db2->m_pstorage = aux.m_pstorage;
}
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
@ -1875,6 +1880,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
/* Reuse the sds from the main dict in the expire dict */
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
trackkey(key);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
@ -1901,15 +1907,86 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
void redisDbPersistentData::setExpire(expireEntry &&e)
{
trackkey(e.key());
m_setexpire->insert(e);
}
bool redisDb::FKeyExpires(const char *key)
{
return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end();
return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpire()->end();
}
void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
{
trackkey(itr.key());
dictSetVal(m_pdict, itr.de, val);
}
void redisDbPersistentData::ensure(dictEntry *de)
{
if (de != nullptr && dictGetVal(de) == nullptr)
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(de);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, de, o);
});
}
}
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o)
{
sds temp = serializeStoredObject(o);
m_pstorage->insert(szKey, cchKey, temp, sdslen(temp));
sdsfree(temp);
}
void redisDbPersistentData::storeDatabase()
{
dictIterator *di = dictGetIterator(m_pdict);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
sds key = (sds)dictGetKey(de);
robj *o = (robj*)dictGetVal(de);
storeKey(key, sdslen(key), o);
}
dictReleaseIterator(di);
}
void redisDbPersistentData::processChanges()
{
--m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0);
if (m_pstorage != nullptr)
{
if (m_fTrackingChanges == 0)
{
if (m_fAllChanged)
{
m_pstorage->clear();
storeDatabase();
}
else
{
for (auto &str : m_setchanged)
{
sds sdsKey = sdsnewlen(str.data(), str.size());
robj *o = find(sdsKey);
if (o != nullptr)
{
storeKey(str.data(), str.size(), o);
}
else
{
m_pstorage->erase(str.data(), str.size());
}
sdsfree(sdsKey);
}
}
}
}
m_setchanged.clear();
}

View File

@ -787,6 +787,8 @@ long defragKey(redisDb *db, dictEntry *de) {
/* Try to defrag robj and / or string value. */
ob = (robj*)dictGetVal(de);
if (ob == nullptr)
return defragged;
if ((newob = activeDefragStringOb(ob, &defragged))) {
de->v.val = newob;
ob = newob;
@ -853,7 +855,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
g_pserver->stat_active_defrag_scanned++;
}
/* Defrag scan callback for each hash table bicket,
/* Defrag scan callback for each hash table bucket,
* used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
@ -1111,7 +1113,8 @@ void activeDefragCycle(void) {
break; /* this will exit the function and we'll continue on the next cycle */
}
cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db);
// we actually look at the objects too but defragScanCallback can handle missing values
cursor = dictScan(db->dictUnsafeKeyOnly(), cursor, defragScanCallback, defragDictBucketCallback, db);
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehasing),

View File

@ -256,15 +256,16 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev
{
if (setexpire != nullptr)
{
visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 };
visitFunctor visitor { dbid, db->m_persistentData.dictUnsafeKeyOnly(), pool, 0 };
setexpire->random_visit(visitor);
}
else
{
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples);
int count = dictGetSomeKeys(db->m_persistentData.dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
}
}

View File

@ -407,12 +407,12 @@ void expireSlaveKeys(void) {
// the expire is hashed based on the key pointer, so we need the point in the main db
auto itrDB = db->find(keyname);
auto itrExpire = db->m_persistentData.setexpireUnsafe()->end();
auto itrExpire = db->m_persistentData.setexpire()->end();
if (itrDB != nullptr)
itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key());
int expired = 0;
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end())
if (itrExpire != db->m_persistentData.setexpire()->end())
{
if (itrExpire->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start);
@ -424,7 +424,7 @@ void expireSlaveKeys(void) {
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) {
if (itrExpire != db->m_persistentData.setexpire()->end() && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}

View File

@ -116,6 +116,8 @@ void redisDbPersistentData::emptyDbAsync() {
auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,NULL);
if (m_pstorage != nullptr)
m_pstorage->clear();
if (m_fTrackingChanges)
m_fAllChanged = true;
atomicIncr(lazyfree_objects,dictSize(oldht1));

View File

@ -1493,3 +1493,54 @@ void redisObject::setrefcount(unsigned ref)
serverAssert(!FExpires());
refcount.store(ref, std::memory_order_relaxed);
}
sds serializeStoredStringObject(robj_roptr o)
{
sds str = sdsempty();
sdscatlen(str, &(*o), sizeof(robj));
sdscat(str, szFromObj(o));
return str;
}
robj *deserializeStoredStringObject(const char *data, size_t cb)
{
const robj *oT = (const robj*)data;
robj *newObject = nullptr;
switch (oT->encoding)
{
case OBJ_ENCODING_EMBSTR:
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL);
memcpy(newObject, data, cb);
return newObject;
case OBJ_ENCODING_RAW:
newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED);
memcpy(newObject, data, sizeof(robj));
newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj));
memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj));
return newObject;
}
serverPanic("Unknown string object encoding from storage");
return nullptr;
}
robj *deserializeStoredObject(const void *data, size_t cb)
{
const robj *oT = (const robj*)data;
switch (oT->type)
{
case OBJ_STRING:
return deserializeStoredStringObject((char*)data, cb);
}
serverPanic("Unknown object type loading from storage");
}
sds serializeStoredObject(robj_roptr o)
{
switch (o->type)
{
case OBJ_STRING:
return serializeStoredStringObject(o);
}
serverPanic("Attempting to store unknown object type");
}

View File

@ -93,9 +93,9 @@ public:
return end();
}
setiter end()
setiter end() const
{
setiter itr(this);
setiter itr(const_cast<semiorderedset<T,T_KEY,MEMMOVE_SAFE>*>(this));
itr.idxPrimary = m_data.size();
return itr;
}

View File

@ -3697,7 +3697,11 @@ int processCommand(client *c, int callFlags) {
addReply(c,shared.queued);
} else {
std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].trackChanges();
call(c,callFlags);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].processChanges();
c->woff = g_pserver->master_repl_offset;
if (listLength(g_pserver->ready_keys))
handleClientsBlockedOnKeys();

View File

@ -90,6 +90,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "sha1.h"
#include "endianconv.h"
#include "crc64.h"
#include "IStorage.h"
extern int g_fTestMode;
@ -1086,19 +1087,21 @@ public:
void trackkey(const char *key)
{
if (m_fTrackingChanges)
m_setchanged.insert(key);
if (m_fTrackingChanges && !m_fAllChanged)
m_setchanged.insert(std::string(key, sdslen(key)));
}
dict_iter find(const char *key)
{
dictEntry *de = dictFind(m_pdict, key);
ensure(de);
return dict_iter(de);
}
dict_iter random()
{
dictEntry *de = dictGetRandomKey(m_pdict);
ensure(de);
return dict_iter(de);
}
@ -1131,16 +1134,27 @@ public:
void setExpire(expireEntry &&e);
void initialize();
dict *dictUnsafe() { return m_pdict; }
void trackChanges() { m_fTrackingChanges++; }
void processChanges();
// This should only be used if you look at the key, we do not fixup
// objects stored elsewhere
dict *dictUnsafeKeyOnly() { return m_pdict; }
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
private:
void ensure(dictEntry *de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
// Keyspace
dict *m_pdict; /* The keyspace for this DB */
bool m_fTrackingChanges = false;
int m_fTrackingChanges = 0; // Note: Stack based
bool m_fAllChanged = false;
std::set<std::string> m_setchanged;
IStorage *m_pstorage = nullptr;
// Expire
expireset *m_setexpire;
@ -1181,6 +1195,10 @@ typedef struct redisDb {
void expand(uint64_t slots) { m_persistentData.expand(slots); }
void tryResize() { m_persistentData.tryResize(); }
const expireset *setexpire() { return m_persistentData.setexpire(); }
void trackChanges() { m_persistentData.trackChanges(); }
void processChanges() { m_persistentData.processChanges(); }
void trackkey(robj_roptr o) { m_persistentData.trackkey(o); }
iter find(robj_roptr key)
{
@ -1215,7 +1233,7 @@ typedef struct redisDb {
bool FKeyExpires(const char *key);
size_t clear(bool fAsync, void(callback)(void*));
dict *dictUnsafe() { return m_persistentData.dictUnsafe(); }
dict *dictUnsafeKeyOnly() { return m_persistentData.dictUnsafeKeyOnly(); }
expireEntry *getExpire(robj_roptr key);
private:
redisDbPersistentData m_persistentData;
@ -2338,6 +2356,10 @@ int collateStringObjects(robj *a, robj *b);
int equalStringObjects(robj *a, robj *b);
unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o);
robj *deserializeStoredObject(const void *data, size_t cb);
sds serializeStoredObject(robj_roptr o);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
/* Synchronous I/O with timeout */