Dramatically improve the performance of subkey expires

Former-commit-id: 368f67f42217c5fd2cfb3cb3643984917793e994
This commit is contained in:
John Sully 2020-09-20 23:30:21 +00:00
parent ff1a43ea18
commit f333cf98c9
3 changed files with 308 additions and 237 deletions

View File

@ -745,3 +745,90 @@ void touchCommand(client *c) {
addReplyLongLong(c,touched);
}
expireEntryFat::~expireEntryFat()
{
if (m_dictIndex != nullptr)
dictRelease(m_dictIndex);
}
void expireEntryFat::createIndex()
{
serverAssert(m_dictIndex == nullptr);
m_dictIndex = dictCreate(&keyptrDictType, nullptr);
for (auto &entry : m_vecexpireEntries)
{
if (entry.spsubkey != nullptr)
{
dictEntry *de = dictAddRaw(m_dictIndex, (void*)entry.spsubkey.get(), nullptr);
de->v.s64 = entry.when;
}
}
}
void expireEntryFat::expireSubKey(const char *szSubkey, long long when)
{
if (m_vecexpireEntries.size() >= INDEX_THRESHOLD && m_dictIndex == nullptr)
createIndex();
// First check if the subkey already has an expiration
if (m_dictIndex != nullptr && szSubkey != nullptr)
{
dictEntry *de = dictFind(m_dictIndex, szSubkey);
if (de != nullptr)
{
auto itr = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), de->v.u64);
while (itr != m_vecexpireEntries.end() && itr->when == de->v.s64)
{
bool fFound = false;
if (szSubkey == nullptr && itr->spsubkey == nullptr) {
fFound = true;
} else if (szSubkey != nullptr && itr->spsubkey != nullptr && sdscmp((sds)itr->spsubkey.get(), (sds)szSubkey) == 0) {
fFound = true;
}
if (fFound) {
m_vecexpireEntries.erase(itr);
dictDelete(m_dictIndex, szSubkey);
break;
}
++itr;
}
}
}
else
{
for (auto &entry : m_vecexpireEntries)
{
if (szSubkey != nullptr)
{
// if this is a subkey expiry then its not a match if the expireEntry is either for the
// primary key or a different subkey
if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0)
continue;
}
else
{
if (entry.spsubkey != nullptr)
continue;
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data()));
break;
}
}
auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr;
auto itr = m_vecexpireEntries.emplace(itrInsert, when, subkey);
if (m_dictIndex && subkey) {
dictEntry *de = dictAddRaw(m_dictIndex, (void*)itr->spsubkey.get(), nullptr);
de->v.s64 = when;
}
}
void expireEntryFat::popfrontExpireEntry()
{
if (m_dictIndex != nullptr && m_vecexpireEntries.begin()->spsubkey) {
int res = dictDelete(m_dictIndex, (void*)m_vecexpireEntries.begin()->spsubkey.get());
serverAssert(res == DICT_OK);
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin());
}

220
src/expire.h Normal file
View File

@ -0,0 +1,220 @@
#pragma once
class expireEntryFat
{
friend class expireEntry;
static const int INDEX_THRESHOLD = 16;
public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sds m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
dict *m_dictIndex = nullptr;
void createIndex();
public:
expireEntryFat(sds keyPrimary)
: m_keyPrimary(keyPrimary)
{}
~expireEntryFat();
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return m_keyPrimary; }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when);
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry();
const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; }
size_t size() const noexcept { return m_vecexpireEntries.size(); }
};
class expireEntry {
union
{
sds m_key;
expireEntryFat *m_pfatentry;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
class iter
{
friend class expireEntry;
expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
iter &operator++() { ++m_idx; return *this; }
const char *subkey() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].spsubkey.get();
return nullptr;
}
long long when() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].when;
return m_pentry->when();
}
bool operator!=(const iter &other)
{
return m_idx != other.m_idx;
}
const iter &operator*() const { return *this; }
};
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key);
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = key;
m_when = when;
}
}
expireEntry(expireEntryFat *pfatentry)
{
u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
}
expireEntry(expireEntry &&e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
e.u.m_key = (char*)key(); // we do this so it can still be found in the set
e.m_when = 0;
}
~expireEntry()
{
if (FFat())
delete u.m_pfatentry;
}
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = key;
else
u.m_key = key;
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
bool operator==(const char *key) const noexcept
{
return this->key() == key;
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return u.m_key;
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sds keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() { return iter(this, 0); }
iter end()
{
if (FFat())
return iter(this, u.m_pfatentry->size());
return iter(this, 1);
}
void erase(iter &itr)
{
if (!FFat())
throw -1; // assert
pfatentry()->m_vecexpireEntries.erase(
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
bool FGetPrimaryExpire(long long *pwhen)
{
*pwhen = -1;
for (auto itr : *this)
{
if (itr.subkey() == nullptr)
{
*pwhen = itr.when();
return true;
}
}
return false;
}
explicit operator const char*() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
};
typedef semiorderedset<expireEntry, const char *, true /*expireEntry can be memmoved*/> expireset;

View File

@ -94,6 +94,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "semiorderedset.h"
#include "connection.h" /* Connection abstraction */
#include "serverassert.h"
#include "expire.h"
#define REDISMODULE_CORE 1
#include "redismodule.h" /* Redis modules API defines. */
@ -864,243 +865,6 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o);
}
class expireEntryFat
{
friend class expireEntry;
public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sds m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
public:
expireEntryFat(sds keyPrimary)
: m_keyPrimary(keyPrimary)
{}
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return m_keyPrimary; }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when)
{
// First check if the subkey already has an expiration
for (auto &entry : m_vecexpireEntries)
{
if (szSubkey != nullptr)
{
// if this is a subkey expiry then its not a match if the expireEntry is either for the
// primary key or a different subkey
if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0)
continue;
}
else
{
if (entry.spsubkey != nullptr)
continue;
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data()));
break;
}
auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr;
m_vecexpireEntries.emplace(itrInsert, when, subkey);
}
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); }
const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; }
size_t size() const noexcept { return m_vecexpireEntries.size(); }
};
class expireEntry {
union
{
sds m_key;
expireEntryFat *m_pfatentry;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
class iter
{
friend class expireEntry;
expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
iter &operator++() { ++m_idx; return *this; }
const char *subkey() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].spsubkey.get();
return nullptr;
}
long long when() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].when;
return m_pentry->when();
}
bool operator!=(const iter &other)
{
return m_idx != other.m_idx;
}
const iter &operator*() const { return *this; }
};
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key);
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = key;
m_when = when;
}
}
expireEntry(expireEntryFat *pfatentry)
{
u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
}
expireEntry(expireEntry &&e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
e.u.m_key = (char*)key(); // we do this so it can still be found in the set
e.m_when = 0;
}
~expireEntry()
{
if (FFat())
delete u.m_pfatentry;
}
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = key;
else
u.m_key = key;
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
bool operator==(const char *key) const noexcept
{
return this->key() == key;
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return u.m_key;
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sds keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() { return iter(this, 0); }
iter end()
{
if (FFat())
return iter(this, u.m_pfatentry->size());
return iter(this, 1);
}
void erase(iter &itr)
{
if (!FFat())
throw -1; // assert
pfatentry()->m_vecexpireEntries.erase(
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
bool FGetPrimaryExpire(long long *pwhen)
{
*pwhen = -1;
for (auto itr : *this)
{
if (itr.subkey() == nullptr)
{
*pwhen = itr.when();
return true;
}
}
return false;
}
explicit operator const char*() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
};
typedef semiorderedset<expireEntry, const char *, true /*expireEntry can be memmoved*/> expireset;
/* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
* and Module types have their registered name returned. */