Add support for storing expirations in FLASH

Former-commit-id: 1dca07bd564042fce1b01d275641f35b918ae557
This commit is contained in:
John Sully 2020-01-03 15:53:36 -05:00
parent 55361c65d8
commit ce0fde973a
5 changed files with 146 additions and 9 deletions

View File

@ -42,6 +42,9 @@
int keyIsExpired(redisDb *db, robj *key); int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o); int expireIfNeeded(redisDb *db, robj *key, robj *o);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o);
/* Update LFU when an object is accessed. /* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached. * Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */ * Then logarithmically increment the counter, and update the access time. */
@ -565,6 +568,11 @@ void flushdbCommand(client *c) {
{ {
if (!strcasecmp(szFromObj(c->argv[1]), "cache")) if (!strcasecmp(szFromObj(c->argv[1]), "cache"))
{ {
if (g_pserver->m_pstorageFactory == nullptr)
{
addReplyError(c, "Cannot flush cache without a storage provider set");
return;
}
c->db->removeAllCachedValues(); c->db->removeAllCachedValues();
addReply(c,shared.ok); addReply(c,shared.ok);
return; return;
@ -587,6 +595,11 @@ void flushallCommand(client *c) {
{ {
if (!strcasecmp(szFromObj(c->argv[1]), "cache")) if (!strcasecmp(szFromObj(c->argv[1]), "cache"))
{ {
if (g_pserver->m_pstorageFactory == nullptr)
{
addReplyError(c, "Cannot flush cache without a storage provider set");
return;
}
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->removeAllCachedValues(); g_pserver->db[idb]->removeAllCachedValues();
addReply(c,shared.ok); addReply(c,shared.ok);
@ -2100,9 +2113,24 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{ {
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(this, sdsKey, data, cb); size_t offset = 0;
sds sdsNewKey = sdsdupshared(sdsKey);
auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset);
robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
dictAdd(m_pdict, sdsdupshared(sdsKey), o); dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sdsKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
}); });
*pde = dictFind(m_pdict, sdsKey); *pde = dictFind(m_pdict, sdsKey);
} }
@ -2117,7 +2145,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite)
{ {
sds temp = serializeStoredObject(o); sds temp = serializeStoredObjectAndExpire(this, szKey, o);
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite);
sdsfree(temp); sdsfree(temp);
} }
@ -2162,7 +2190,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
if (de == nullptr) if (de == nullptr)
continue; continue;
robj *o = (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObject(o); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); vecRet.emplace_back(std::move(change), unique_sds_ptr(temp));
} }
} }
@ -2274,3 +2302,85 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
} }
} }
} }
sds serializeExpire(const expireEntry *pexpire)
{
sds str = sdsnewlen(nullptr, sizeof(unsigned));
if (pexpire == nullptr)
{
unsigned zero = 0;
memcpy(str, &zero, sizeof(unsigned));
return str;
}
auto &e = *pexpire;
unsigned celem = (unsigned)e.size();
memcpy(str, &celem, sizeof(unsigned));
for (auto itr = e.begin(); itr != e.end(); ++itr)
{
unsigned subkeylen = itr.subkey() ? (unsigned)sdslen(itr.subkey()) : 0;
size_t strOffset = sdslen(str);
str = sdsgrowzero(str, sdslen(str) + sizeof(unsigned) + subkeylen + sizeof(long long));
memcpy(str + strOffset, &subkeylen, sizeof(unsigned));
if (itr.subkey())
memcpy(str + strOffset + sizeof(unsigned), itr.subkey(), subkeylen);
long long when = itr.when();
memcpy(str + strOffset + sizeof(unsigned) + subkeylen, &when, sizeof(when));
}
return str;
}
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset)
{
unsigned celem;
if (cch < sizeof(unsigned))
throw "Corrupt expire entry";
memcpy(&celem, str, sizeof(unsigned));
std::unique_ptr<expireEntry> spexpire;
size_t offset = sizeof(unsigned);
for (; celem > 0; --celem)
{
serverAssert(cch > (offset+sizeof(unsigned)));
unsigned subkeylen;
memcpy(&subkeylen, str + offset, sizeof(unsigned));
offset += sizeof(unsigned);
sds subkey = nullptr;
if (subkeylen != 0)
{
serverAssert(cch > (offset + subkeylen));
subkey = sdsnewlen(nullptr, subkeylen);
memcpy(subkey, str + offset, subkeylen);
offset += subkeylen;
}
long long when;
serverAssert(cch >= (offset + sizeof(long long)));
memcpy(&when, str + offset, sizeof(long long));
offset += sizeof(long long);
if (spexpire == nullptr)
spexpire = std::make_unique<expireEntry>(key, subkey, when);
else
spexpire->update(subkey, when);
}
*poffset = offset;
return spexpire;
}
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o)
{
auto itrExpire = db->setexpire()->find(key);
const expireEntry *pexpire = nullptr;
if (itrExpire != db->setexpire()->end())
pexpire = &(*itrExpire);
sds str = serializeExpire(pexpire);
str = serializeStoredObject(o, str);
return str;
}

View File

@ -1619,20 +1619,30 @@ robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key,
return o; return o;
} }
sds serializeStoredObject(robj_roptr o) sds serializeStoredObject(robj_roptr o, sds sdsPrefix)
{ {
switch (o->type) switch (o->type)
{ {
case OBJ_STRING: case OBJ_STRING:
{ {
sds sdsT = sdsnewlen(nullptr, 1); sds sdsT = nullptr;
sdsT[0] = RDB_TYPE_STRING; if (sdsPrefix)
sdsT = sdsgrowzero(sdsPrefix, sdslen(sdsPrefix)+1);
else
sdsT = sdsnewlen(nullptr, 1);
sdsT[sdslen(sdsT)-1] = RDB_TYPE_STRING;
return serializeStoredStringObject(sdsT, o); return serializeStoredStringObject(sdsT, o);
} }
default: default:
rio rdb; rio rdb;
createDumpPayload(&rdb,o,nullptr); createDumpPayload(&rdb,o,nullptr);
if (sdsPrefix)
{
sds rval = sdscatsds(sdsPrefix, (sds)rdb.io.buffer.ptr);
sdsfree((sds)rdb.io.buffer.ptr);
return rval;
}
return (sds)rdb.io.buffer.ptr; return (sds)rdb.io.buffer.ptr;
} }
serverPanic("Attempting to store unknown object type"); serverPanic("Attempting to store unknown object type");

View File

@ -234,6 +234,9 @@ void sdsclear(sds s) {
* Note: this does not change the *length* of the sds string as returned * Note: this does not change the *length* of the sds string as returned
* by sdslen(), but only the free buffer space we have. */ * by sdslen(), but only the free buffer space we have. */
sds sdsMakeRoomFor(sds s, size_t addlen) { sds sdsMakeRoomFor(sds s, size_t addlen) {
if (s == NULL)
return sdsnewlen(NULL, addlen);
void *sh, *newsh; void *sh, *newsh;
size_t avail = sdsavail(s); size_t avail = sdsavail(s);
size_t len, newlen; size_t len, newlen;

View File

@ -1163,6 +1163,13 @@ public:
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
} }
size_t size() const
{
if (FFat())
return u.m_pfatentry->size();
return 1;
}
bool FGetPrimaryExpire(long long *pwhen) const bool FGetPrimaryExpire(long long *pwhen) const
{ {
*pwhen = -1; *pwhen = -1;
@ -2633,7 +2640,8 @@ unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o); void trimStringObjectIfNeeded(robj *o);
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb);
sds serializeStoredObject(robj_roptr o); std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)

View File

@ -283,7 +283,13 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
bool fContinue = true; bool fContinue = true;
if (de == nullptr) if (de == nullptr)
{ {
robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData); robj *o = nullptr;
if (!fKeyOnly)
{
size_t offset = 0;
deserializeExpire(sdsKey, (const char*)data, cbData, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data)+offset, cbData-offset);
}
fContinue = fn(sdsKey, o); fContinue = fn(sdsKey, o);
if (o != nullptr) if (o != nullptr)
decrRefCount(o); decrRefCount(o);