From ce0fde973a03c9bbbfa06232a93484c20578299d Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 3 Jan 2020 15:53:36 -0500 Subject: [PATCH] Add support for storing expirations in FLASH Former-commit-id: 1dca07bd564042fce1b01d275641f35b918ae557 --- src/db.cpp | 118 +++++++++++++++++++++++++++++++++++++++++++++-- src/object.cpp | 16 +++++-- src/sds.c | 3 ++ src/server.h | 10 +++- src/snapshot.cpp | 8 +++- 5 files changed, 146 insertions(+), 9 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 65cec482f..4d14d35a1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -42,6 +42,9 @@ int keyIsExpired(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -565,6 +568,11 @@ void flushdbCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } c->db->removeAllCachedValues(); addReply(c,shared.ok); return; @@ -587,6 +595,11 @@ void flushallCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->removeAllCachedValues(); addReply(c,shared.ok); @@ -2100,9 +2113,24 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database { m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(this, sdsKey, data, cb); + size_t offset = 0; + sds sdsNewKey = sdsdupshared(sdsKey); + auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); + robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); - dictAdd(m_pdict, sdsdupshared(sdsKey), o); + dictAdd(m_pdict, sdsNewKey, o); + + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sdsKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); }); *pde = dictFind(m_pdict, sdsKey); } @@ -2117,7 +2145,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) { - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, szKey, o); m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2162,7 +2190,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() if (de == nullptr) continue; robj *o = (robj*)dictGetVal(de); - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); } } @@ -2273,4 +2301,86 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate) ++m_cnewKeysPending; } } +} + +sds serializeExpire(const expireEntry *pexpire) +{ + sds str = sdsnewlen(nullptr, sizeof(unsigned)); + + if (pexpire == nullptr) + { + unsigned zero = 0; + memcpy(str, &zero, sizeof(unsigned)); + return str; + } + + auto &e = *pexpire; + unsigned celem = (unsigned)e.size(); + memcpy(str, &celem, sizeof(unsigned)); + + for (auto itr = e.begin(); itr != e.end(); ++itr) + { + unsigned subkeylen = itr.subkey() ? (unsigned)sdslen(itr.subkey()) : 0; + size_t strOffset = sdslen(str); + str = sdsgrowzero(str, sdslen(str) + sizeof(unsigned) + subkeylen + sizeof(long long)); + memcpy(str + strOffset, &subkeylen, sizeof(unsigned)); + if (itr.subkey()) + memcpy(str + strOffset + sizeof(unsigned), itr.subkey(), subkeylen); + long long when = itr.when(); + memcpy(str + strOffset + sizeof(unsigned) + subkeylen, &when, sizeof(when)); + } + return str; +} + +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset) +{ + unsigned celem; + if (cch < sizeof(unsigned)) + throw "Corrupt expire entry"; + memcpy(&celem, str, sizeof(unsigned)); + std::unique_ptr spexpire; + + size_t offset = sizeof(unsigned); + for (; celem > 0; --celem) + { + serverAssert(cch > (offset+sizeof(unsigned))); + + unsigned subkeylen; + memcpy(&subkeylen, str + offset, sizeof(unsigned)); + offset += sizeof(unsigned); + + sds subkey = nullptr; + if (subkeylen != 0) + { + serverAssert(cch > (offset + subkeylen)); + subkey = sdsnewlen(nullptr, subkeylen); + memcpy(subkey, str + offset, subkeylen); + offset += subkeylen; + } + + long long when; + serverAssert(cch >= (offset + sizeof(long long))); + memcpy(&when, str + offset, sizeof(long long)); + offset += sizeof(long long); + + if (spexpire == nullptr) + spexpire = std::make_unique(key, subkey, when); + else + spexpire->update(subkey, when); + } + + *poffset = offset; + return spexpire; +} + +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o) +{ + auto itrExpire = db->setexpire()->find(key); + const expireEntry *pexpire = nullptr; + if (itrExpire != db->setexpire()->end()) + pexpire = &(*itrExpire); + + sds str = serializeExpire(pexpire); + str = serializeStoredObject(o, str); + return str; } \ No newline at end of file diff --git a/src/object.cpp b/src/object.cpp index bfbd1524f..450e202c9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1619,20 +1619,30 @@ robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, return o; } -sds serializeStoredObject(robj_roptr o) +sds serializeStoredObject(robj_roptr o, sds sdsPrefix) { switch (o->type) { case OBJ_STRING: { - sds sdsT = sdsnewlen(nullptr, 1); - sdsT[0] = RDB_TYPE_STRING; + sds sdsT = nullptr; + if (sdsPrefix) + sdsT = sdsgrowzero(sdsPrefix, sdslen(sdsPrefix)+1); + else + sdsT = sdsnewlen(nullptr, 1); + sdsT[sdslen(sdsT)-1] = RDB_TYPE_STRING; return serializeStoredStringObject(sdsT, o); } default: rio rdb; createDumpPayload(&rdb,o,nullptr); + if (sdsPrefix) + { + sds rval = sdscatsds(sdsPrefix, (sds)rdb.io.buffer.ptr); + sdsfree((sds)rdb.io.buffer.ptr); + return rval; + } return (sds)rdb.io.buffer.ptr; } serverPanic("Attempting to store unknown object type"); diff --git a/src/sds.c b/src/sds.c index e1678a95b..d6027725f 100644 --- a/src/sds.c +++ b/src/sds.c @@ -234,6 +234,9 @@ void sdsclear(sds s) { * Note: this does not change the *length* of the sds string as returned * by sdslen(), but only the free buffer space we have. */ sds sdsMakeRoomFor(sds s, size_t addlen) { + if (s == NULL) + return sdsnewlen(NULL, addlen); + void *sh, *newsh; size_t avail = sdsavail(s); size_t len, newlen; diff --git a/src/server.h b/src/server.h index b9289f46d..8bf1d344c 100644 --- a/src/server.h +++ b/src/server.h @@ -1163,6 +1163,13 @@ public: pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); } + size_t size() const + { + if (FFat()) + return u.m_pfatentry->size(); + return 1; + } + bool FGetPrimaryExpire(long long *pwhen) const { *pwhen = -1; @@ -2633,7 +2640,8 @@ unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); -sds serializeStoredObject(robj_roptr o); +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 5b2503b82..d0f81e7ef 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -283,7 +283,13 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function(data)+offset, cbData-offset); + } fContinue = fn(sdsKey, o); if (o != nullptr) decrRefCount(o);