From 4f47f6818feb8282ab244a8210a4033f0190525d Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 20 Sep 2019 14:52:22 -0400 Subject: [PATCH] Add the IStorage interface and wire it up Former-commit-id: 898efbfc0c7038818083ea29fdd63cafa47721fb --- src/IStorage.h | 14 +++++++ src/db.cpp | 89 +++++++++++++++++++++++++++++++++++++++++--- src/defrag.cpp | 7 +++- src/evict.cpp | 5 ++- src/expire.cpp | 6 +-- src/lazyfree.cpp | 2 + src/object.cpp | 51 +++++++++++++++++++++++++ src/semiorderedset.h | 4 +- src/server.cpp | 4 ++ src/server.h | 32 +++++++++++++--- 10 files changed, 194 insertions(+), 20 deletions(-) create mode 100644 src/IStorage.h diff --git a/src/IStorage.h b/src/IStorage.h new file mode 100644 index 000000000..b12b98260 --- /dev/null +++ b/src/IStorage.h @@ -0,0 +1,14 @@ +#pragma once +#include + +class IStorage +{ +public: + typedef std::function callback; + + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; + virtual void erase(const char *key, size_t cchKey) = 0; + virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0; + virtual size_t clear() = 0; + virtual void enumerate(callback fn) = 0; +}; diff --git a/src/db.cpp b/src/db.cpp index f8d1d00a6..ddb85a642 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -87,6 +87,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { if (flags & LOOKUP_UPDATEMVCC) { val->mvcc_tstamp = getMvccTstamp(); + db->trackkey(key); } return val; } else { @@ -800,7 +801,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { /* Handle the case of a hash table. */ ht = NULL; if (o == nullptr) { - ht = c->db->dictUnsafe(); + ht = c->db->dictUnsafeKeyOnly(); } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = (dict*)ptrFromObj(o); } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { @@ -1221,12 +1222,12 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { if (!val->FExpires()) return 0; + trackkey(key); auto itrExpire = m_setexpire->find(itr.key()); serverAssert(itrExpire != m_setexpire->end()); serverAssert(itrExpire->key() == itr.key()); m_setexpire->erase(itrExpire); val->SetFExpires(false); - trackkey(key); return 1; } @@ -1805,13 +1806,13 @@ void redisDbPersistentData::initialize() m_pdict = dictCreate(&dbDictType,NULL); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = false; - m_fTrackingChanges = false; + m_fTrackingChanges = 0; } void redisDb::initialize(int id) { m_persistentData.initialize(); - this->expireitr = m_persistentData.setexpireUnsafe()->end(); + this->expireitr = m_persistentData.setexpire()->end(); this->blocking_keys = dictCreate(&keylistDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL); @@ -1843,7 +1844,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*)) } else { m_persistentData.clear(callback); } - expireitr = m_persistentData.setexpireUnsafe()->end(); + expireitr = m_persistentData.setexpire()->end(); return removed; } @@ -1854,6 +1855,8 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_fAllChanged = true; delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); + if (m_pstorage != nullptr) + m_pstorage->clear(); } /* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) @@ -1863,11 +1866,13 @@ void redisDbPersistentData::clear(void(callback)(void*)) db1->m_fTrackingChanges = db2->m_fTrackingChanges; db1->m_fAllChanged = db2->m_fAllChanged; db1->m_setexpire = db2->m_setexpire; + db1->m_pstorage = db2->m_pstorage; db2->m_pdict = aux.m_pdict; db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fAllChanged = aux.m_fAllChanged; db2->m_setexpire = aux.m_setexpire; + db2->m_pstorage = aux.m_pstorage; } void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) @@ -1875,6 +1880,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) /* Reuse the sds from the main dict in the expire dict */ dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); + trackkey(key); if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { @@ -1901,15 +1907,86 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(expireEntry &&e) { + trackkey(e.key()); m_setexpire->insert(e); } bool redisDb::FKeyExpires(const char *key) { - return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end(); + return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpire()->end(); } void redisDbPersistentData::updateValue(dict_iter itr, robj *val) { + trackkey(itr.key()); dictSetVal(m_pdict, itr.de, val); +} + +void redisDbPersistentData::ensure(dictEntry *de) +{ + if (de != nullptr && dictGetVal(de) == nullptr) + { + serverAssert(m_pstorage != nullptr); + sds key = (sds)dictGetKey(de); + m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ + robj *o = deserializeStoredObject(data, cb); + serverAssert(o != nullptr); + dictSetVal(m_pdict, de, o); + }); + } +} + +void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o) +{ + sds temp = serializeStoredObject(o); + m_pstorage->insert(szKey, cchKey, temp, sdslen(temp)); + sdsfree(temp); +} + +void redisDbPersistentData::storeDatabase() +{ + dictIterator *di = dictGetIterator(m_pdict); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + sds key = (sds)dictGetKey(de); + robj *o = (robj*)dictGetVal(de); + storeKey(key, sdslen(key), o); + } + dictReleaseIterator(di); +} + +void redisDbPersistentData::processChanges() +{ + --m_fTrackingChanges; + serverAssert(m_fTrackingChanges >= 0); + + if (m_pstorage != nullptr) + { + if (m_fTrackingChanges == 0) + { + if (m_fAllChanged) + { + m_pstorage->clear(); + storeDatabase(); + } + else + { + for (auto &str : m_setchanged) + { + sds sdsKey = sdsnewlen(str.data(), str.size()); + robj *o = find(sdsKey); + if (o != nullptr) + { + storeKey(str.data(), str.size(), o); + } + else + { + m_pstorage->erase(str.data(), str.size()); + } + sdsfree(sdsKey); + } + } + } + } + m_setchanged.clear(); } \ No newline at end of file diff --git a/src/defrag.cpp b/src/defrag.cpp index adb354b3b..62a5b6f59 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -787,6 +787,8 @@ long defragKey(redisDb *db, dictEntry *de) { /* Try to defrag robj and / or string value. */ ob = (robj*)dictGetVal(de); + if (ob == nullptr) + return defragged; if ((newob = activeDefragStringOb(ob, &defragged))) { de->v.val = newob; ob = newob; @@ -853,7 +855,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { g_pserver->stat_active_defrag_scanned++; } -/* Defrag scan callback for each hash table bicket, +/* Defrag scan callback for each hash table bucket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ @@ -1111,7 +1113,8 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db); + // we actually look at the objects too but defragScanCallback can handle missing values + cursor = dictScan(db->dictUnsafeKeyOnly(), cursor, defragScanCallback, defragDictBucketCallback, db); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehasing), diff --git a/src/evict.cpp b/src/evict.cpp index e10a9fceb..c2da5e926 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -256,15 +256,16 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev { if (setexpire != nullptr) { - visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 }; + visitFunctor visitor { dbid, db->m_persistentData.dictUnsafeKeyOnly(), pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->m_persistentData.dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); + serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); } } diff --git a/src/expire.cpp b/src/expire.cpp index 0ab90afe4..3bd631459 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -407,12 +407,12 @@ void expireSlaveKeys(void) { // the expire is hashed based on the key pointer, so we need the point in the main db auto itrDB = db->find(keyname); - auto itrExpire = db->m_persistentData.setexpireUnsafe()->end(); + auto itrExpire = db->m_persistentData.setexpire()->end(); if (itrDB != nullptr) itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key()); int expired = 0; - if (itrExpire != db->m_persistentData.setexpireUnsafe()->end()) + if (itrExpire != db->m_persistentData.setexpire()->end()) { if (itrExpire->when() < start) { activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start); @@ -424,7 +424,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) { + if (itrExpire != db->m_persistentData.setexpire()->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 471fb6260..89cd65540 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -116,6 +116,8 @@ void redisDbPersistentData::emptyDbAsync() { auto *set = m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); m_pdict = dictCreate(&dbDictType,NULL); + if (m_pstorage != nullptr) + m_pstorage->clear(); if (m_fTrackingChanges) m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); diff --git a/src/object.cpp b/src/object.cpp index 27d7ecaac..141cc7256 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1493,3 +1493,54 @@ void redisObject::setrefcount(unsigned ref) serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); } + +sds serializeStoredStringObject(robj_roptr o) +{ + sds str = sdsempty(); + sdscatlen(str, &(*o), sizeof(robj)); + sdscat(str, szFromObj(o)); + return str; +} + +robj *deserializeStoredStringObject(const char *data, size_t cb) +{ + const robj *oT = (const robj*)data; + robj *newObject = nullptr; + switch (oT->encoding) + { + case OBJ_ENCODING_EMBSTR: + newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); + memcpy(newObject, data, cb); + return newObject; + + case OBJ_ENCODING_RAW: + newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED); + memcpy(newObject, data, sizeof(robj)); + newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); + memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); + return newObject; + } + serverPanic("Unknown string object encoding from storage"); + return nullptr; +} + +robj *deserializeStoredObject(const void *data, size_t cb) +{ + const robj *oT = (const robj*)data; + switch (oT->type) + { + case OBJ_STRING: + return deserializeStoredStringObject((char*)data, cb); + } + serverPanic("Unknown object type loading from storage"); +} + +sds serializeStoredObject(robj_roptr o) +{ + switch (o->type) + { + case OBJ_STRING: + return serializeStoredStringObject(o); + } + serverPanic("Attempting to store unknown object type"); +} \ No newline at end of file diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 00a1f1d91..da704c4ce 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -93,9 +93,9 @@ public: return end(); } - setiter end() + setiter end() const { - setiter itr(this); + setiter itr(const_cast*>(this)); itr.idxPrimary = m_data.size(); return itr; } diff --git a/src/server.cpp b/src/server.cpp index e2db1c23d..ba8bfb4d6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3697,7 +3697,11 @@ int processCommand(client *c, int callFlags) { addReply(c,shared.queued); } else { std::unique_lockdb->lock)> ulock(c->db->lock); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].trackChanges(); call(c,callFlags); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].processChanges(); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) handleClientsBlockedOnKeys(); diff --git a/src/server.h b/src/server.h index 73cdbfed9..4522ea8b8 100644 --- a/src/server.h +++ b/src/server.h @@ -90,6 +90,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "sha1.h" #include "endianconv.h" #include "crc64.h" +#include "IStorage.h" extern int g_fTestMode; @@ -1086,19 +1087,21 @@ public: void trackkey(const char *key) { - if (m_fTrackingChanges) - m_setchanged.insert(key); + if (m_fTrackingChanges && !m_fAllChanged) + m_setchanged.insert(std::string(key, sdslen(key))); } dict_iter find(const char *key) { dictEntry *de = dictFind(m_pdict, key); + ensure(de); return dict_iter(de); } dict_iter random() { dictEntry *de = dictGetRandomKey(m_pdict); + ensure(de); return dict_iter(de); } @@ -1131,16 +1134,27 @@ public: void setExpire(expireEntry &&e); void initialize(); - dict *dictUnsafe() { return m_pdict; } + void trackChanges() { m_fTrackingChanges++; } + void processChanges(); + + // This should only be used if you look at the key, we do not fixup + // objects stored elsewhere + dict *dictUnsafeKeyOnly() { return m_pdict; } + expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } private: + void ensure(dictEntry *de); + void storeDatabase(); + void storeKey(const char *key, size_t cchKey, robj *o); + // Keyspace dict *m_pdict; /* The keyspace for this DB */ - bool m_fTrackingChanges = false; + int m_fTrackingChanges = 0; // Note: Stack based bool m_fAllChanged = false; std::set m_setchanged; + IStorage *m_pstorage = nullptr; // Expire expireset *m_setexpire; @@ -1181,6 +1195,10 @@ typedef struct redisDb { void expand(uint64_t slots) { m_persistentData.expand(slots); } void tryResize() { m_persistentData.tryResize(); } const expireset *setexpire() { return m_persistentData.setexpire(); } + + void trackChanges() { m_persistentData.trackChanges(); } + void processChanges() { m_persistentData.processChanges(); } + void trackkey(robj_roptr o) { m_persistentData.trackkey(o); } iter find(robj_roptr key) { @@ -1215,7 +1233,7 @@ typedef struct redisDb { bool FKeyExpires(const char *key); size_t clear(bool fAsync, void(callback)(void*)); - dict *dictUnsafe() { return m_persistentData.dictUnsafe(); } + dict *dictUnsafeKeyOnly() { return m_persistentData.dictUnsafeKeyOnly(); } expireEntry *getExpire(robj_roptr key); private: redisDbPersistentData m_persistentData; @@ -2338,6 +2356,10 @@ int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); + +robj *deserializeStoredObject(const void *data, size_t cb); +sds serializeStoredObject(robj_roptr o); + #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) /* Synchronous I/O with timeout */