Merge pull request #313 from MalavanEQAlpha/fixissue295
Resolve Issue #295 by speeding up keyIsExpired and setting timeout on subkey expires. Former-commit-id: 9e5e6c2f86548b18ae27b4e1ac20c72517392b07
This commit is contained in:
commit
e0af0bcb95
@ -5556,11 +5556,11 @@ try_again:
|
||||
for (j = 0; j < num_keys; j++) {
|
||||
long long ttl = 0;
|
||||
expireEntry *pexpire = getExpire(c->db,kv[j]);
|
||||
long long expireat = -1;
|
||||
long long expireat = INVALID_EXPIRE;
|
||||
if (pexpire != nullptr)
|
||||
pexpire->FGetPrimaryExpire(&expireat);
|
||||
|
||||
if (expireat != -1) {
|
||||
if (expireat != INVALID_EXPIRE) {
|
||||
ttl = expireat-mstime();
|
||||
if (ttl < 0) {
|
||||
continue;
|
||||
|
12
src/db.cpp
12
src/db.cpp
@ -1751,17 +1751,9 @@ int keyIsExpired(redisDb *db, robj *key) {
|
||||
/* Don't expire anything while loading. It will be done later. */
|
||||
if (g_pserver->loading) return 0;
|
||||
|
||||
long long when = -1;
|
||||
for (auto &exp : *pexpire)
|
||||
{
|
||||
if (exp.subkey() == nullptr)
|
||||
{
|
||||
when = exp.when();
|
||||
break;
|
||||
}
|
||||
}
|
||||
long long when = pexpire->FGetPrimaryExpire();
|
||||
|
||||
if (when == -1)
|
||||
if (when == INVALID_EXPIRE)
|
||||
return 0;
|
||||
|
||||
/* If we are in the context of a Lua script, we pretend that time is
|
||||
|
@ -149,7 +149,7 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj
|
||||
uint32_t aux = htonl(o->type);
|
||||
mixDigest(digest,&aux,sizeof(aux));
|
||||
expireEntry *pexpire = getExpire(db,keyobj);
|
||||
long long expiretime = -1;
|
||||
long long expiretime = INVALID_EXPIRE;
|
||||
char buf[128];
|
||||
|
||||
if (pexpire != nullptr)
|
||||
@ -281,7 +281,7 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
/* If the key has an expire, add it to the mix */
|
||||
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
|
||||
if (expiretime != INVALID_EXPIRE) xorDigest(digest,"!!expire!!",10);
|
||||
}
|
||||
|
||||
/* Compute the dataset digest. Since keys, sets elements, hashes elements
|
||||
|
@ -72,11 +72,12 @@ void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
|
||||
void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &tried) {
|
||||
if (!e.FFat())
|
||||
{
|
||||
activeExpireCycleExpireFullKey(db, e.key());
|
||||
return;
|
||||
++tried;
|
||||
return 1;
|
||||
}
|
||||
|
||||
expireEntryFat *pfat = e.pfatentry();
|
||||
@ -90,6 +91,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
|
||||
while (!pfat->FEmpty())
|
||||
{
|
||||
++tried;
|
||||
if (pfat->nextExpireEntry().when > now)
|
||||
break;
|
||||
|
||||
@ -97,7 +99,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
if (pfat->nextExpireEntry().spsubkey == nullptr)
|
||||
{
|
||||
activeExpireCycleExpireFullKey(db, e.key());
|
||||
return;
|
||||
return ++deleted;
|
||||
}
|
||||
|
||||
switch (val->type)
|
||||
@ -107,7 +109,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
deleted++;
|
||||
if (setTypeSize(val) == 0) {
|
||||
activeExpireCycleExpireFullKey(db, e.key());
|
||||
return;
|
||||
return deleted;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -117,7 +119,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
deleted++;
|
||||
if (hashTypeLength(val) == 0) {
|
||||
activeExpireCycleExpireFullKey(db, e.key());
|
||||
return;
|
||||
return deleted;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -127,7 +129,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
deleted++;
|
||||
if (zsetLength(val) == 0) {
|
||||
activeExpireCycleExpireFullKey(db, e.key());
|
||||
return;
|
||||
return deleted;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -142,7 +144,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
decrRefCount(val);
|
||||
}, true /*fLock*/, true /*fForceQueue*/);
|
||||
}
|
||||
return;
|
||||
return deleted;
|
||||
|
||||
case OBJ_LIST:
|
||||
default:
|
||||
@ -155,6 +157,9 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
|
||||
pfat->popfrontExpireEntry();
|
||||
fTtlChanged = true;
|
||||
if ((tried % ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP) == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!pfat->FEmpty() && fTtlChanged)
|
||||
@ -181,6 +186,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
{
|
||||
removeExpire(db, &objKey);
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
int parseUnitString(const char *sz)
|
||||
@ -392,10 +398,8 @@ void activeExpireCycleCore(int type) {
|
||||
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) {
|
||||
if (e.when() < now)
|
||||
{
|
||||
activeExpireCycleExpire(db, e, now);
|
||||
++expired;
|
||||
expired += activeExpireCycleExpire(db, e, now, tried);
|
||||
}
|
||||
++tried;
|
||||
|
||||
if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0)
|
||||
{
|
||||
@ -503,8 +507,8 @@ void expireSlaveKeys(void) {
|
||||
if (itr != db->setexpire->end())
|
||||
{
|
||||
if (itr->when() < start) {
|
||||
activeExpireCycleExpire(g_pserver->db+dbid,*itr,start);
|
||||
expired = 1;
|
||||
size_t tried = 0;
|
||||
expired = activeExpireCycleExpire(g_pserver->db+dbid,*itr,start,tried);
|
||||
}
|
||||
}
|
||||
|
||||
@ -680,7 +684,7 @@ void pexpireatCommand(client *c) {
|
||||
|
||||
/* Implements TTL and PTTL */
|
||||
void ttlGenericCommand(client *c, int output_ms) {
|
||||
long long expire = -1, ttl = -1;
|
||||
long long expire = INVALID_EXPIRE, ttl = -1;
|
||||
|
||||
/* If the key does not exist at all, return -2 */
|
||||
if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) {
|
||||
@ -714,7 +718,7 @@ void ttlGenericCommand(client *c, int output_ms) {
|
||||
}
|
||||
|
||||
|
||||
if (expire != -1) {
|
||||
if (expire != INVALID_EXPIRE) {
|
||||
ttl = expire-mstime();
|
||||
if (ttl < 0) ttl = 0;
|
||||
}
|
||||
|
56
src/expire.h
56
src/expire.h
@ -1,5 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
/*
|
||||
* INVALID_EXPIRE is the value we set the expireEntry's m_when value to when the main key is not expired and the value we return when we try to get the expire time of a key or subkey that is not expired
|
||||
* Want this value to be LLONG_MAX however we use the most significant bit of m_when as a flag to see if the expireEntry is Fat or not so we want to ensure that it is unset hence the (& ~((1LL) << (sizeof(long long)*CHAR_BIT - 1)))
|
||||
* Eventually this number might actually end up being a valid expire time, this could cause bugs so at that time it might be a good idea to use a larger data type.
|
||||
*/
|
||||
#define INVALID_EXPIRE (LLONG_MAX & ~((1LL) << (sizeof(long long)*CHAR_BIT - 1)))
|
||||
|
||||
class expireEntryFat
|
||||
{
|
||||
friend class expireEntry;
|
||||
@ -47,6 +54,7 @@ public:
|
||||
~expireEntryFat();
|
||||
|
||||
long long when() const noexcept { return m_vecexpireEntries.front().when; }
|
||||
|
||||
const char *key() const noexcept { return m_keyPrimary; }
|
||||
|
||||
bool operator<(long long when) const noexcept { return this->when() < when; }
|
||||
@ -66,7 +74,12 @@ class expireEntry {
|
||||
sds m_key;
|
||||
expireEntryFat *m_pfatentry;
|
||||
} u;
|
||||
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
|
||||
long long m_when; // bit wise and with FFatMask means this is a fat entry and we should use the pointer
|
||||
|
||||
/* Mask to check if an entry is Fat, most significant bit of m_when being set means it is Fat otherwise it is not */
|
||||
long long FFatMask() const noexcept {
|
||||
return (1LL) << (sizeof(long long)*CHAR_BIT - 1);
|
||||
}
|
||||
|
||||
expireEntry() = default;
|
||||
public:
|
||||
@ -108,7 +121,7 @@ public:
|
||||
{
|
||||
if (subkey != nullptr)
|
||||
{
|
||||
m_when = LLONG_MIN;
|
||||
m_when = FFatMask() | INVALID_EXPIRE;
|
||||
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key);
|
||||
u.m_pfatentry->expireSubKey(subkey, when);
|
||||
}
|
||||
@ -122,7 +135,15 @@ public:
|
||||
expireEntry(expireEntryFat *pfatentry)
|
||||
{
|
||||
u.m_pfatentry = pfatentry;
|
||||
m_when = LLONG_MIN;
|
||||
m_when = FFatMask() | INVALID_EXPIRE;
|
||||
for (auto itr : *this)
|
||||
{
|
||||
if (itr.subkey() == nullptr)
|
||||
{
|
||||
m_when = FFatMask() | itr.when();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expireEntry(expireEntry &&e)
|
||||
@ -159,7 +180,7 @@ public:
|
||||
u.m_key = key;
|
||||
}
|
||||
|
||||
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
|
||||
inline bool FFat() const noexcept { return m_when & FFatMask(); }
|
||||
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
|
||||
|
||||
|
||||
@ -187,7 +208,7 @@ public:
|
||||
{
|
||||
if (FFat())
|
||||
return u.m_pfatentry->when();
|
||||
return m_when;
|
||||
return FGetPrimaryExpire();
|
||||
}
|
||||
|
||||
void update(const char *subkey, long long when)
|
||||
@ -204,12 +225,14 @@ public:
|
||||
// we have to upgrade to a fat entry
|
||||
long long whenT = m_when;
|
||||
sds keyPrimary = u.m_key;
|
||||
m_when = LLONG_MIN;
|
||||
m_when |= FFatMask();
|
||||
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
|
||||
u.m_pfatentry->expireSubKey(nullptr, whenT);
|
||||
// at this point we're fat so fall through
|
||||
}
|
||||
}
|
||||
if (subkey == nullptr)
|
||||
m_when = when | FFatMask();
|
||||
u.m_pfatentry->expireSubKey(subkey, when);
|
||||
}
|
||||
|
||||
@ -229,18 +252,15 @@ public:
|
||||
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
|
||||
}
|
||||
|
||||
bool FGetPrimaryExpire(long long *pwhen)
|
||||
{
|
||||
*pwhen = -1;
|
||||
for (auto itr : *this)
|
||||
{
|
||||
if (itr.subkey() == nullptr)
|
||||
{
|
||||
*pwhen = itr.when();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
long long FGetPrimaryExpire() const noexcept
|
||||
{
|
||||
return m_when & (~FFatMask());
|
||||
}
|
||||
|
||||
bool FGetPrimaryExpire(long long *pwhen) const noexcept
|
||||
{
|
||||
*pwhen = FGetPrimaryExpire();
|
||||
return *pwhen != INVALID_EXPIRE;
|
||||
}
|
||||
|
||||
explicit operator const char*() const noexcept { return key(); }
|
||||
|
@ -2446,11 +2446,11 @@ int RM_UnlinkKey(RedisModuleKey *key) {
|
||||
* REDISMODULE_NO_EXPIRE is returned. */
|
||||
mstime_t RM_GetExpire(RedisModuleKey *key) {
|
||||
expireEntry *pexpire = getExpire(key->db,key->key);
|
||||
mstime_t expire = -1;
|
||||
mstime_t expire = INVALID_EXPIRE;
|
||||
if (pexpire != nullptr)
|
||||
pexpire->FGetPrimaryExpire(&expire);
|
||||
|
||||
if (expire == -1 || key->value == NULL)
|
||||
if (expire == INVALID_EXPIRE || key->value == NULL)
|
||||
return REDISMODULE_NO_EXPIRE;
|
||||
expire -= mstime();
|
||||
return expire >= 0 ? expire : 0;
|
||||
|
@ -1124,7 +1124,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
|
||||
int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU;
|
||||
|
||||
/* Save the expire time */
|
||||
long long expiretime = -1;
|
||||
long long expiretime = INVALID_EXPIRE;
|
||||
if (pexpire != nullptr && pexpire->FGetPrimaryExpire(&expiretime)) {
|
||||
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
|
||||
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
|
||||
|
@ -306,6 +306,7 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs)
|
||||
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP 16384 /* Subkey loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
|
||||
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
|
||||
|
Loading…
x
Reference in New Issue
Block a user