Remove Expireset (#217)

Major refactor to place expiry information directly in the object struct.
This commit is contained in:
John Sully 2023-08-21 16:36:45 -04:00 committed by GitHub Enterprise
parent 80dcbad30f
commit ffac55273a
17 changed files with 429 additions and 484 deletions

View File

@ -1592,8 +1592,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
} }
/* Save the expire time */ /* Save the expire time */
if (o->FExpires()) { if (o->FExpires()) {
std::unique_lock<fastlock> ul(g_expireLock); expireEntry *pexpire = &o->expire;
expireEntry *pexpire = db->getExpire(&key);
for (auto &subExpire : *pexpire) { for (auto &subExpire : *pexpire) {
if (subExpire.subkey() == nullptr) if (subExpire.subkey() == nullptr)
{ {

View File

@ -5610,7 +5610,6 @@ try_again:
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) { for (j = 0; j < num_keys; j++) {
long long ttl = 0; long long ttl = 0;
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpire = c->db->getExpire(kv[j]); expireEntry *pexpire = c->db->getExpire(kv[j]);
long long expireat = INVALID_EXPIRE; long long expireat = INVALID_EXPIRE;
if (pexpire != nullptr) if (pexpire != nullptr)

View File

@ -2908,7 +2908,7 @@ standardConfig configs[] = {
createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->key_load_delay, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, cserver.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, g_pserver->active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),

View File

@ -55,8 +55,8 @@ struct dbBackup {
int expireIfNeeded(redisDb *db, robj *key, robj *o); int expireIfNeeded(redisDb *db, robj *key, robj *o);
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); sds serializeStoredObjectAndExpire(robj_roptr o);
dictType dictChangeDescType { dictType dictChangeDescType {
dictSdsHash, /* hash function */ dictSdsHash, /* hash function */
@ -83,6 +83,7 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
serverAssert(db->FKeyExpires((const char*)key)); serverAssert(db->FKeyExpires((const char*)key));
valNew->expire = std::move(valOld->expire);
valNew->SetFExpires(true); valNew->SetFExpires(true);
valOld->SetFExpires(false); valOld->SetFExpires(false);
return; return;
@ -281,8 +282,8 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o; return o;
} }
bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr) { bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr, bool fValExpires = false) {
serverAssert(!val->FExpires()); serverAssert(fValExpires || !val->FExpires());
sds copy = sdsdupshared(key); sds copy = sdsdupshared(key);
uint64_t mvcc = getMvccTstamp(); uint64_t mvcc = getMvccTstamp();
@ -1494,15 +1495,6 @@ void renameGenericCommand(client *c, int nx) {
incrRefCount(o); incrRefCount(o);
std::unique_ptr<expireEntry> spexpire;
{ // scope pexpireOld since it will be invalid soon
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
if (pexpireOld != nullptr)
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
}
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
if (nx) { if (nx) {
decrRefCount(o); decrRefCount(o);
@ -1513,10 +1505,12 @@ void renameGenericCommand(client *c, int nx) {
* with the same name. */ * with the same name. */
dbDelete(c->db,c->argv[2]); dbDelete(c->db,c->argv[2]);
} }
bool fExpires = o->FExpires();
long long whenT = o->expire.when();
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
dbAdd(c->db,c->argv[2],o); o->SetFExpires(fExpires);
if (spexpire != nullptr) dbAddCore(c->db,szFromObj(c->argv[2]),o,true /*fUpdateMvcc*/,true/*fAssumeNew*/,nullptr,true/*fValExpires*/);
setExpire(c,c->db,c->argv[2],std::move(*spexpire)); serverAssert(whenT == o->expire.when()); // dbDelete and dbAdd must not modify the expire, just the FExpire bit
signalModifiedKey(c,c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[2]); signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
@ -1579,22 +1573,15 @@ void moveCommand(client *c) {
return; return;
} }
std::unique_ptr<expireEntry> spexpire;
{ // scope pexpireOld
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
if (pexpireOld != nullptr)
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
}
if (o->FExpires())
removeExpire(c->db,c->argv[1]);
serverAssert(!o->FExpires());
incrRefCount(o); incrRefCount(o);
bool fExpire = o->FExpires();
long long whenT = o->expire.when();
dbDelete(src,c->argv[1]); dbDelete(src,c->argv[1]);
g_pserver->dirty++; g_pserver->dirty++;
dbAdd(dst,c->argv[1],o); o->SetFExpires(fExpire);
if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire)); dbAddCore(dst, szFromObj(c->argv[1]), o, true /*fUpdateMvcc*/, true /*fAssumeNew*/, nullptr, true /*fValExpires*/);
serverAssert(whenT == o->expire.when()); // add/delete must not modify the expire time
signalModifiedKey(c,src,c->argv[1]); signalModifiedKey(c,src,c->argv[1]);
signalModifiedKey(c,dst,c->argv[1]); signalModifiedKey(c,dst,c->argv[1]);
@ -1662,7 +1649,7 @@ void copyCommand(client *c) {
addReply(c,shared.czero); addReply(c,shared.czero);
return; return;
} }
expire = c->db->getExpire(key); expire = o->FExpires() ? &o->expire : nullptr;
/* Return zero if the key already exists in the target DB. /* Return zero if the key already exists in the target DB.
* If REPLACE option is selected, delete newkey from targetDB. */ * If REPLACE option is selected, delete newkey from targetDB. */
@ -1829,63 +1816,48 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
/* An expire may only be removed if there is a corresponding entry in the /* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */ * main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,itr != nullptr); serverAssertWithInfo(NULL,key,itr != nullptr);
std::unique_lock<fastlock> ul(g_expireLock);
robj *val = itr.val(); robj *val = itr.val();
if (!val->FExpires()) if (!val->FExpires())
return 0; return 0;
trackkey(key, true /* fUpdate */); trackkey(key, true /* fUpdate */);
auto itrExpire = m_setexpire->find(itr.key());
serverAssert(itrExpire != m_setexpire->end());
m_setexpire->erase(itrExpire);
val->SetFExpires(false); val->SetFExpires(false);
serverAssert(m_numexpires > 0);
m_numexpires--;
return 1; return 1;
} }
int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) {
auto de = find(szFromObj(key)); auto de = find(szFromObj(key));
serverAssertWithInfo(NULL,key,de != nullptr); serverAssertWithInfo(NULL,key,de != nullptr);
std::unique_lock<fastlock> ul(g_expireLock);
robj *val = de.val(); robj *val = de.val();
if (!val->FExpires()) if (!val->FExpires())
return 0; return 0;
auto itr = m_setexpire->find(de.key()); if (!val->expire.FFat())
serverAssert(itr != m_setexpire->end());
serverAssert(itr->key() == de.key());
if (!itr->FFat())
return 0; return 0;
int found = 0; int found = 0;
for (auto subitr : *itr) for (auto subitr : val->expire)
{ {
if (subitr.subkey() == nullptr) if (subitr.subkey() == nullptr)
continue; continue;
if (sdscmp((sds)subitr.subkey(), szFromObj(subkey)) == 0) if (sdscmp((sds)subitr.subkey(), szFromObj(subkey)) == 0)
{ {
itr->erase(subitr); val->expire.erase(subitr);
found = 1; found = 1;
break; break;
} }
} }
if (itr->pfatentry()->size() == 0) if (val->expire.pfatentry()->size() == 0)
this->removeExpire(key, de); this->removeExpire(key, de);
return found; return found;
} }
void redisDbPersistentData::resortExpire(expireEntry &e)
{
std::unique_lock<fastlock> ul(g_expireLock);
auto itr = m_setexpire->find(e.key());
expireEntry eT = std::move(e);
m_setexpire->erase(itr);
m_setexpire->insert(eT);
}
/* Set an expire to the specified key. If the expire is set in the context /* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set * of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds * to NULL. The 'when' parameter is the absolute unix time in milliseconds
@ -1940,10 +1912,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
if (kde.val()->FExpires()) if (kde.val()->FExpires())
removeExpire(db, key); removeExpire(db, key);
e.setKeyUnsafe(kde.key()); db->setExpire(kde.key(), std::move(e));
db->setExpire(std::move(e));
kde.val()->SetFExpires(true);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica;
if (c && writable_slave && !(c->flags & CLIENT_MASTER)) if (c && writable_slave && !(c->flags & CLIENT_MASTER))
@ -1954,14 +1923,15 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
* is associated with this key (i.e. the key is non volatile) */ * is associated with this key (i.e. the key is non volatile) */
expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) { expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) {
/* No expire? return ASAP */ /* No expire? return ASAP */
std::unique_lock<fastlock> ul(g_expireLock);
if (expireSize() == 0) if (expireSize() == 0)
return nullptr; return nullptr;
auto itrExpire = m_setexpire->find(key); auto itr = find_cached_threadsafe(key);
if (itrExpire == m_setexpire->end()) if (itr == end())
return nullptr; return nullptr;
return itrExpire.operator->(); if (!itr.val()->FExpires())
return nullptr;
return &itr.val()->expire;
} }
const expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) const const expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) const
@ -2062,15 +2032,13 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */ /* Don't expire anything while loading. It will be done later. */
if (g_pserver->loading) return 0; if (g_pserver->loading) return 0;
std::unique_lock<fastlock> ul(g_expireLock);
const expireEntry *pexpire = db->getExpire(key); const expireEntry *pexpire = db->getExpire(key);
mstime_t now; mstime_t now;
long long when;
if (pexpire == nullptr) return 0; /* No expire for this key */ if (pexpire == nullptr) return 0; /* No expire for this key */
long long when = pexpire->FGetPrimaryExpire(); if (!pexpire->FGetPrimaryExpire(&when))
if (when == INVALID_EXPIRE)
return 0; return 0;
/* If we are in the context of a Lua script, we pretend that time is /* If we are in the context of a Lua script, we pretend that time is
@ -2632,7 +2600,6 @@ void redisDbPersistentData::initialize()
m_pdbSnapshot = nullptr; m_pdbSnapshot = nullptr;
m_pdict = dictCreate(&dbDictType,this); m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbTombstoneDictType,this); m_pdictTombstone = dictCreate(&dbTombstoneDictType,this);
m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = 0; m_fAllChanged = 0;
m_fTrackingChanges = 0; m_fTrackingChanges = 0;
} }
@ -2668,7 +2635,6 @@ void moduleClusterLoadCallback(const char * rgchKey, size_t cchKey, void *data)
void redisDb::initialize(int id) void redisDb::initialize(int id)
{ {
redisDbPersistentData::initialize(); redisDbPersistentData::initialize();
this->expireitr = setexpire()->end();
this->blocking_keys = dictCreate(&keylistDictType,NULL); this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL);
@ -2714,6 +2680,8 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_ite
serverAssert(dictFind(m_pdictTombstone, key) != nullptr); serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
} }
#endif #endif
if (o->FExpires())
++m_numexpires;
trackkey(key, false /* fUpdate */); trackkey(key, false /* fUpdate */);
} }
else else
@ -2761,7 +2729,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*))
} else { } else {
redisDbPersistentData::clear(callback); redisDbPersistentData::clear(callback);
} }
expireitr = setexpire()->end(); expires_cursor = 0;
return removed; return removed;
} }
@ -2774,59 +2742,57 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_cnewKeysPending = 0; m_cnewKeysPending = 0;
m_fAllChanged++; m_fAllChanged++;
} }
{
std::unique_lock<fastlock> ul(g_expireLock);
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
}
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->clear(callback); m_spstorage->clear(callback);
dictEmpty(m_pdictTombstone,callback); dictEmpty(m_pdictTombstone,callback);
m_pdbSnapshot = nullptr; m_pdbSnapshot = nullptr;
m_numexpires = 0;
} }
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
{ {
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
std::unique_lock<fastlock> ul(g_expireLock);
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL); serverAssertWithInfo(NULL,key,kde != NULL);
trackkey(key, true /* fUpdate */); trackkey(key, true /* fUpdate */);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) robj *o = (robj*)dictGetVal(kde);
if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{ {
// shared objects cannot have the expire bit set, create a real object // shared objects cannot have the expire bit set, create a real object
dictSetVal(m_pdict, kde, dupStringObject((robj*)dictGetVal(kde))); dictSetVal(m_pdict, kde, dupStringObject(o));
o = (robj*)dictGetVal(kde);
} }
const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr;
if (((robj*)dictGetVal(kde))->FExpires()) { if (o->FExpires()) {
auto itr = m_setexpire->find((sds)dictGetKey(kde)); o->expire.update(szSubKey, when);
serverAssert(itr != m_setexpire->end());
expireEntry eNew(std::move(*itr));
eNew.update(szSubKey, when);
m_setexpire->erase(itr);
m_setexpire->insert(eNew);
} }
else else
{ {
expireEntry e((sds)dictGetKey(kde), szSubKey, when); expireEntry e(szSubKey, when);
((robj*)dictGetVal(kde))->SetFExpires(true); o->expire = std::move(e);
m_setexpire->insert(e); o->SetFExpires(true);
++m_numexpires;
} }
} }
void redisDbPersistentData::setExpire(expireEntry &&e) void redisDbPersistentData::setExpire(const char *key, expireEntry &&e)
{ {
std::unique_lock<fastlock> ul(g_expireLock); trackkey(key, true /* fUpdate */);
trackkey(e.key(), true /* fUpdate */); auto itr = find(key);
m_setexpire->insert(e); if (!itr->FExpires())
m_numexpires++;
itr->expire = std::move(e);
itr->SetFExpires(true);
} }
bool redisDb::FKeyExpires(const char *key) bool redisDb::FKeyExpires(const char *key)
{ {
std::unique_lock<fastlock> ul(g_expireLock); auto itr = find(key);
return setexpireUnsafe()->find(key) != setexpire()->end(); if (itr == end())
return false;
return itr->FExpires();
} }
void redisDbPersistentData::updateValue(dict_iter itr, robj *val) void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
@ -2850,7 +2816,6 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
serverAssert(m_refCount == 0); serverAssert(m_refCount == 0);
if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr) if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr)
return; return;
std::unique_lock<fastlock> ul(g_expireLock);
// First see if the key can be obtained from a snapshot // First see if the key can be obtained from a snapshot
if (*pde == nullptr && m_pdbSnapshot != nullptr) if (*pde == nullptr && m_pdbSnapshot != nullptr)
@ -2872,7 +2837,11 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
else else
{ {
sds strT = serializeStoredObject(itr.val()); sds strT = serializeStoredObject(itr.val());
robj *objNew = deserializeStoredObject(this, sdsKey, strT, sdslen(strT)); robj *objNew = deserializeStoredObject(strT, sdslen(strT));
if (itr->FExpires()) {
objNew->expire = itr->expire;
objNew->SetFExpires(true);
}
sdsfree(strT); sdsfree(strT);
dictAdd(m_pdict, keyNew, objNew); dictAdd(m_pdict, keyNew, objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
@ -2902,26 +2871,19 @@ LNotFound:
std::unique_ptr<expireEntry> spexpire; std::unique_ptr<expireEntry> spexpire;
m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire(sdsNewKey, (const char*)data, cb, &offset); spexpire = deserializeExpire((const char*)data, cb, &offset);
o = deserializeStoredObject(this, sdsNewKey, reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}); });
if (o != nullptr) if (o != nullptr)
{ {
dictAdd(m_pdict, sdsNewKey, o); dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr);
std::unique_lock<fastlock> ul(g_expireLock); if (spexpire != nullptr) {
if (spexpire != nullptr) o->expire = std::move(*spexpire);
{
auto itr = m_setexpire->find(sdsKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
} }
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); o->SetFExpires(spexpire != nullptr);
g_pserver->stat_storage_provider_read_hits++; g_pserver->stat_storage_provider_read_hits++;
} else { } else {
sdsfree(sdsNewKey); sdsfree(sdsNewKey);
@ -2931,18 +2893,11 @@ LNotFound:
*pde = dictFind(m_pdict, sdsKey); *pde = dictFind(m_pdict, sdsKey);
} }
} }
if (*pde != nullptr && dictGetVal(*pde) != nullptr)
{
robj *o = (robj*)dictGetVal(*pde);
std::unique_lock<fastlock> ul(g_expireLock);
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
}
} }
void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite) void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite)
{ {
sds temp = serializeStoredObjectAndExpire(this, key, o); sds temp = serializeStoredObjectAndExpire(o);
m_spstorage->insert(key, temp, sdslen(temp), fOverwrite); m_spstorage->insert(key, temp, sdslen(temp), fOverwrite);
sdsfree(temp); sdsfree(temp);
} }
@ -2966,7 +2921,7 @@ void redisDbPersistentData::storeDatabase()
if (itr == nullptr) if (itr == nullptr)
return; return;
robj *o = itr.val(); robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); sds temp = serializeStoredObjectAndExpire(o);
storage->insert((sds)key, temp, sdslen(temp), fUpdate); storage->insert((sds)key, temp, sdslen(temp), fUpdate);
sdsfree(temp); sdsfree(temp);
} }
@ -3042,7 +2997,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
while ((de = dictNext(di)) != nullptr) while ((de = dictNext(di)) != nullptr)
{ {
robj *o = (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); sds temp = serializeStoredObjectAndExpire(o);
veckeys.push_back((sds)dictGetKey(de)); veckeys.push_back((sds)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de))); veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(temp); vecvals.push_back(temp);
@ -3106,9 +3061,7 @@ redisDbPersistentData::~redisDbPersistentData()
if (m_dictChanged) if (m_dictChanged)
dictRelease(m_dictChanged); dictRelease(m_dictChanged);
if (m_dictChangedStorageFlush) if (m_dictChangedStorageFlush)
dictRelease(m_dictChangedStorageFlush); dictRelease(m_dictChangedStorageFlush);
delete m_setexpire;
} }
dict_iter redisDbPersistentData::random() dict_iter redisDbPersistentData::random()
@ -3262,7 +3215,7 @@ sds serializeExpire(const expireEntry *pexpire)
return str; return str;
} }
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset) std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset)
{ {
unsigned celem; unsigned celem;
if (cch < sizeof(unsigned)) if (cch < sizeof(unsigned))
@ -3294,7 +3247,7 @@ std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t
offset += sizeof(long long); offset += sizeof(long long);
if (spexpire == nullptr) if (spexpire == nullptr)
spexpire = std::make_unique<expireEntry>(key, subkey, when); spexpire = std::make_unique<expireEntry>(subkey, when);
else else
spexpire->update(subkey, when); spexpire->update(subkey, when);
@ -3306,13 +3259,9 @@ std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t
return spexpire; return spexpire;
} }
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o) sds serializeStoredObjectAndExpire(robj_roptr o)
{ {
std::unique_lock<fastlock> ul(g_expireLock); const expireEntry *pexpire = o->FExpires() ? &o->expire : nullptr;
auto itrExpire = db->setexpire()->find(key);
const expireEntry *pexpire = nullptr;
if (itrExpire != db->setexpire()->end())
pexpire = &(*itrExpire);
sds str = serializeExpire(pexpire); sds str = serializeExpire(pexpire);
str = serializeStoredObject(o, str); str = serializeStoredObject(o, str);
@ -3395,8 +3344,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
robj *o = nullptr; robj *o = nullptr;
m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset); spexpire = deserializeExpire((const char*)data, cb, &offset);
o = deserializeStoredObject(this, sharedKey, reinterpret_cast<const char*>(data) + offset, cb - offset); o = deserializeStoredObject(reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
}); });
@ -3431,18 +3380,9 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
} }
} }
dictAdd(m_pdict, sharedKey, o); dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
std::unique_lock<fastlock> ul(g_expireLock);
if (spexpire != nullptr) if (spexpire != nullptr)
{ o->expire = std::move(*spexpire);
auto itr = m_setexpire->find(sharedKey); o->SetFExpires(spexpire != nullptr);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
} }
} }
else else

View File

@ -146,11 +146,10 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) {
* Note that this function does not reset the initial 'digest' passed, it * Note that this function does not reset the initial 'digest' passed, it
* will continue mixing this object digest to anything that was already * will continue mixing this object digest to anything that was already
* present. */ * present. */
void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { void xorObjectDigest(unsigned char *digest, robj_roptr o) {
uint32_t aux = htonl(o->type); uint32_t aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux)); mixDigest(digest,&aux,sizeof(aux));
std::unique_lock<fastlock> ul(g_expireLock); const expireEntry *pexpire = o->FExpires() ? &o->expire : nullptr;
expireEntry *pexpire = db->getExpire(keyobj);
long long expiretime = INVALID_EXPIRE; long long expiretime = INVALID_EXPIRE;
char buf[128]; char buf[128];
@ -318,7 +317,7 @@ void computeDatasetDigest(unsigned char *final) {
mixDigest(digest,key,sdslen(key)); mixDigest(digest,key,sdslen(key));
xorObjectDigest(db,keyobj,digest,o); xorObjectDigest(digest,o);
/* We can finally xor the key-val digest to the final digest */ /* We can finally xor the key-val digest to the final digest */
xorDigest(final,digest,20); xorDigest(final,digest,20);
@ -716,7 +715,7 @@ NULL
* work on logically expired keys */ * work on logically expired keys */
auto itr = c->db->find(c->argv[j]); auto itr = c->db->find(c->argv[j]);
robj* o = (robj*)(itr == NULL ? NULL : itr.val()); robj* o = (robj*)(itr == NULL ? NULL : itr.val());
if (o) xorObjectDigest(c->db,c->argv[j],digest,o); if (o) xorObjectDigest(digest,o);
sds d = sdsempty(); sds d = sdsempty();
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
@ -843,10 +842,6 @@ NULL
g_pserver->db[dbid]->getStats(buf,sizeof(buf)); g_pserver->db[dbid]->getStats(buf,sizeof(buf));
stats = sdscat(stats,buf); stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires set]\n");
g_pserver->db[dbid]->getExpireStats(buf, sizeof(buf));
stats = sdscat(stats, buf);
addReplyVerbatim(c,stats,sdslen(stats),"txt"); addReplyVerbatim(c,stats,sdslen(stats),"txt");
sdsfree(stats); sdsfree(stats);
} else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) {

View File

@ -47,7 +47,6 @@ extern "C" int je_get_defrag_hint(void* ptr);
/* forward declarations*/ /* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref); void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
bool replaceSatelliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations. /* Defrag helper for generic allocations.
* *
@ -425,20 +424,6 @@ dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, s
return NULL; return NULL;
} }
bool replaceSatelliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey);
if (itr != set.end())
{
expireEntry eNew(std::move(*itr));
eNew.setKeyUnsafe(newkey);
set.erase(itr);
set.insert(eNew);
serverAssert(set.find(newkey) != set.end());
return true;
}
return false;
}
long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref; quicklistNode *newnode, *node = *node_ref;
long defragged = 0; long defragged = 0;
@ -851,7 +836,6 @@ long defragModule(redisDb *db, dictEntry *kde) {
* all the various pointers it has. Returns a stat of how many pointers were * all the various pointers it has. Returns a stat of how many pointers were
* moved. */ * moved. */
long defragKey(redisDb *db, dictEntry *de) { long defragKey(redisDb *db, dictEntry *de) {
std::unique_lock<fastlock> ul(g_expireLock);
sds keysds = (sds)dictGetKey(de); sds keysds = (sds)dictGetKey(de);
robj *newob, *ob; robj *newob, *ob;
unsigned char *newzl; unsigned char *newzl;
@ -862,15 +846,8 @@ long defragKey(redisDb *db, dictEntry *de) {
/* Try to defrag the key name. */ /* Try to defrag the key name. */
newsds = activeDefragSds(keysds); newsds = activeDefragSds(keysds);
if (newsds) if (newsds) {
{
defragged++, de->key = newsds; defragged++, de->key = newsds;
if (!db->setexpire()->empty()) {
bool fReplaced = replaceSatelliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds);
serverAssert(fReplaced == ob->FExpires());
} else {
serverAssert(!ob->FExpires());
}
} }
if ((newob = activeDefragStringOb(ob, &defragged))) { if ((newob = activeDefragStringOb(ob, &defragged))) {

View File

@ -222,52 +222,23 @@ void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e,
* idle time are on the left, and keys with the higher idle time on the * idle time are on the left, and keys with the higher idle time on the
* right. */ * right. */
struct visitFunctor int evictionPoolPopulate(int dbid, redisDb *db, bool fVolatile, struct evictionPoolEntry *pool)
{ {
int dbid; int returnCount = 0;
dict *dbdict; dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
struct evictionPoolEntry *pool; int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples);
int count = 0; for (int j = 0; j < count; j++) {
int tries = 0; robj *o = (robj*)dictGetVal(samples[j]);
// If the object is in second tier storage we don't need to evict it (since it already is)
bool operator()(const expireEntry &e) if (o != nullptr)
{
dictEntry *de = dictFind(dbdict, e.key());
if (de != nullptr)
{ {
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); if (!fVolatile || o->FExpires()) {
++count; processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, &o->expire, pool);
}
++tries;
return tries < g_pserver->maxmemory_samples;
}
};
int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool)
{
if (setexpire != nullptr)
{
std::unique_lock<fastlock> ul(g_expireLock);
visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 };
setexpire->random_visit(visitor);
return visitor.count;
}
else
{
int returnCount = 0;
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
// If the object is in second tier storage we don't need to evict it (since it alrady is)
if (o != nullptr)
{
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
++returnCount; ++returnCount;
} }
} }
return returnCount;
} }
return 0; return returnCount;
} }
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
@ -718,14 +689,14 @@ int performEvictions(bool fPreSnapshot) {
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
{ {
if ((keys = db->size()) != 0) { if ((keys = db->size()) != 0) {
total_keys += evictionPoolPopulate(i, db, nullptr, pool); total_keys += evictionPoolPopulate(i, db, false, pool);
} }
} }
else else
{ {
keys = db->expireSize(); keys = db->expireSize();
if (keys != 0) if (keys != 0)
total_keys += evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool); total_keys += evictionPoolPopulate(i, db, true, pool);
} }
} }
if (!total_keys) break; /* No keys to evict. */ if (!total_keys) break; /* No keys to evict. */
@ -786,7 +757,7 @@ int performEvictions(bool fPreSnapshot) {
{ {
if (db->expireSize()) if (db->expireSize())
{ {
bestkey = (sds)db->random_expire().key(); db->random_expire(&bestkey);
bestdbid = j; bestdbid = j;
break; break;
} }

View File

@ -33,8 +33,6 @@
#include "server.h" #include "server.h"
#include "cron.h" #include "cron.h"
fastlock g_expireLock {"Expire"};
/* Helper function for the activeExpireCycle() function. /* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table * This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database. * entry 'de' of the 'expires' hash table of a Redis database.
@ -74,21 +72,20 @@ void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &tried) { int activeExpireCycleExpire(redisDb *db, const char *key, expireEntry &e, long long now, size_t &tried) {
if (!e.FFat()) if (!e.FFat())
{ {
activeExpireCycleExpireFullKey(db, e.key()); activeExpireCycleExpireFullKey(db, key);
++tried; ++tried;
return 1; return 1;
} }
expireEntryFat *pfat = e.pfatentry(); expireEntryFat *pfat = e.pfatentry();
robj *val = db->find(e.key()); robj *val = db->find(key);
int deleted = 0; int deleted = 0;
redisObjectStack objKey; redisObjectStack objKey;
initStaticStringObject(objKey, (char*)e.key()); initStaticStringObject(objKey, (char*)key);
bool fTtlChanged = false;
while (!pfat->FEmpty()) while (!pfat->FEmpty())
{ {
@ -99,7 +96,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
// Is it the full key expiration? // Is it the full key expiration?
if (pfat->nextExpireEntry().spsubkey == nullptr) if (pfat->nextExpireEntry().spsubkey == nullptr)
{ {
activeExpireCycleExpireFullKey(db, e.key()); activeExpireCycleExpireFullKey(db, key);
return ++deleted; return ++deleted;
} }
@ -109,7 +106,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) { if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) {
deleted++; deleted++;
if (setTypeSize(val) == 0) { if (setTypeSize(val) == 0) {
activeExpireCycleExpireFullKey(db, e.key()); activeExpireCycleExpireFullKey(db, key);
return deleted; return deleted;
} }
} }
@ -119,7 +116,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
if (hashTypeDelete(val,(sds)pfat->nextExpireEntry().spsubkey.get())) { if (hashTypeDelete(val,(sds)pfat->nextExpireEntry().spsubkey.get())) {
deleted++; deleted++;
if (hashTypeLength(val) == 0) { if (hashTypeLength(val) == 0) {
activeExpireCycleExpireFullKey(db, e.key()); activeExpireCycleExpireFullKey(db, key);
return deleted; return deleted;
} }
} }
@ -129,7 +126,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
if (zsetDel(val,(sds)pfat->nextExpireEntry().spsubkey.get())) { if (zsetDel(val,(sds)pfat->nextExpireEntry().spsubkey.get())) {
deleted++; deleted++;
if (zsetLength(val) == 0) { if (zsetLength(val) == 0) {
activeExpireCycleExpireFullKey(db, e.key()); activeExpireCycleExpireFullKey(db, key);
return deleted; return deleted;
} }
} }
@ -137,15 +134,15 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
case OBJ_CRON: case OBJ_CRON:
{ {
sds keyCopy = sdsdup(e.key()); sds keyCopy = sdsdup(key);
incrRefCount(val); incrRefCount(val);
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{ aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{
executeCronJobExpireHook(keyCopy, val); executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy); sdsfree(keyCopy);
decrRefCount(val); decrRefCount(val);
}, true /*fLock*/, true /*fForceQueue*/); }, true /*fLock*/, true /*fForceQueue*/);
break;
} }
return deleted;
case OBJ_LIST: case OBJ_LIST:
default: default:
@ -157,7 +154,6 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
pfat->popfrontExpireEntry(); pfat->popfrontExpireEntry();
fTtlChanged = true;
if ((tried % ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP) == 0) { if ((tried % ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP) == 0) {
break; break;
} }
@ -167,11 +163,6 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &
{ {
removeExpire(db, &objKey); removeExpire(db, &objKey);
} }
else if (!pfat->FEmpty() && fTtlChanged)
{
// We need to resort the expire entry since it may no longer be in the correct position
db->resortExpire(e);
}
if (deleted) if (deleted)
{ {
@ -317,8 +308,26 @@ void pexpireMemberAtCommand(client *c)
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period * executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
we do extra efforts. */
/*static*/ void redisDbPersistentData::activeExpireCycleCore(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. */
unsigned long
effort = g_pserver->active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
void activeExpireCycleCore(int type) {
/* This function has some global state in order to continue the work /* This function has some global state in order to continue the work
* incrementally across calls. */ * incrementally across calls. */
static unsigned int current_db = 0; /* Next DB to test. */ static unsigned int current_db = 0; /* Next DB to test. */
@ -336,10 +345,16 @@ void activeExpireCycleCore(int type) {
if (type == ACTIVE_EXPIRE_CYCLE_FAST) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit /* Don't start a fast cycle if the previous cycle did not exit
* for time limit. Also don't repeat a fast cycle for the same period * for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */ * as the fast cycle total duration itself. */
if (!timelimit_exit) return; if (!timelimit_exit &&
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; g_pserver->stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start; last_fast_cycle = start;
} }
@ -353,16 +368,16 @@ void activeExpireCycleCore(int type) {
if (dbs_per_call > cserver.dbnum || timelimit_exit) if (dbs_per_call > cserver.dbnum || timelimit_exit)
dbs_per_call = cserver.dbnum; dbs_per_call = cserver.dbnum;
/* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* per iteration. Since this function gets called with a frequency of * time per iteration. Since this function gets called with a frequency of
* g_pserver->hz times per second, the following is the max amount of * server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */ * microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/g_pserver->hz/100; timelimit = config_cycle_slow_time_perc*1000000/g_pserver->hz/100;
timelimit_exit = 0; timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1; if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea /* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still * about the number of keys that are already logically expired, but still
@ -371,6 +386,9 @@ void activeExpireCycleCore(int type) {
long total_expired = 0; long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
redisDb *db = g_pserver->db[(current_db % cserver.dbnum)]; redisDb *db = g_pserver->db[(current_db % cserver.dbnum)];
/* Increment the DB now so we are sure if we run out of time /* Increment the DB now so we are sure if we run out of time
@ -378,48 +396,130 @@ void activeExpireCycleCore(int type) {
* distribute the time evenly across DBs. */ * distribute the time evenly across DBs. */
current_db++; current_db++;
long long now; /* Continue to expire if at the end of the cycle there are still
iteration++; * a big percentage of keys to expire, compared to the number of keys
now = mstime(); * we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */ /* If there is nothing to expire try next DB ASAP. */
if (db->setexpireUnsafe()->empty()) if (db->expireSize() == 0) {
{ db->avg_ttl = 0;
db->avg_ttl = 0; break;
db->last_expire_set = now; }
continue; num = dictSize(db->m_pdict);
} slots = dictSlots(db->m_pdict);
now = mstime();
std::unique_lock<fastlock> ul(g_expireLock);
size_t expired = 0; /* When there are less than 1% filled slots, sampling the key
size_t tried = 0; * space is expensive, so stop here waiting for better times...
long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough * The dictionary will be resized asap. */
db->expireitr = db->setexpireUnsafe()->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (slots > DICT_HT_INITIAL_SIZE &&
if (e.when() < now) (num*100/slots < 1)) break;
{
expired += activeExpireCycleExpire(db, e, now, tried); /* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->m_pdict)) break;
unsigned long idx = db->expires_cursor;
idx &= db->m_pdict->ht[table].sizemask;
dictEntry *de = db->m_pdict->ht[table].table[idx];
long long ttl;
/* Scan the current bucket of the current table. */
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
robj *o = (robj*)dictGetVal(de);
de = de->next;
if (!o->FExpires())
continue;
expireEntry *exp = &o->expire;
serverAssert(exp->when() > 0);
ttl = exp->when()-now;
size_t tried = 0;
if (exp->when() <= now) {
if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++;
serverAssert(ttl <= 0);
} else {
serverAssert(ttl > 0);
}
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
} }
if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0) /* We can't block forever here even if there are many keys to
{ * expire. So after a given amount of milliseconds return to the
/* We can't block forever here even if there are many keys to * caller waiting for the other active expire cycle. */
* expire. So after a given amount of milliseconds return to the if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
* caller waiting for the other active expire cycle. */
elapsed = ustime()-start; elapsed = ustime()-start;
if (elapsed > timelimit) { if (elapsed > timelimit) {
timelimit_exit = 1; timelimit_exit = 1;
g_pserver->stat_expired_time_cap_reached_count++; g_pserver->stat_expired_time_cap_reached_count++;
return false; break;
} }
check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION;
} }
return true; /* We don't repeat the cycle for the current database if there are
}, &check); * an acceptable amount of stale keys (logically expired but yet
* not reclaimed). */
total_expired += expired; } while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
} }
elapsed = ustime()-start; elapsed = ustime()-start;
g_pserver->stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired. /* Update our estimate of keys existing but yet to be expired.
@ -435,7 +535,7 @@ void activeExpireCycleCore(int type) {
void activeExpireCycle(int type) void activeExpireCycle(int type)
{ {
runAndPropogateToReplicas(activeExpireCycleCore, type); runAndPropogateToReplicas(redisDbPersistentData::activeExpireCycleCore, type);
} }
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
@ -481,7 +581,6 @@ void expireSlaveKeys(void) {
if (slaveKeysWithExpire == NULL || if (slaveKeysWithExpire == NULL ||
dictSize(slaveKeysWithExpire) == 0) return; dictSize(slaveKeysWithExpire) == 0) return;
std::unique_lock<fastlock> ul(g_expireLock);
int cycles = 0, noexpire = 0; int cycles = 0, noexpire = 0;
mstime_t start = mstime(); mstime_t start = mstime();
while(1) { while(1) {
@ -496,19 +595,14 @@ void expireSlaveKeys(void) {
while(dbids && dbid < cserver.dbnum) { while(dbids && dbid < cserver.dbnum) {
if ((dbids & 1) != 0) { if ((dbids & 1) != 0) {
redisDb *db = g_pserver->db[dbid]; redisDb *db = g_pserver->db[dbid];
// the expire is hashed based on the key pointer, so we need the point in the main db
auto itrDB = db->find(keyname); auto itrDB = db->find(keyname);
auto itrExpire = db->setexpire()->end();
if (itrDB != nullptr)
itrExpire = db->setexpireUnsafe()->find(itrDB.key());
int expired = 0; int expired = 0;
if (itrExpire != db->setexpire()->end()) if (itrDB != db->end() && itrDB->FExpires())
{ {
if (itrExpire->when() < start) { if (itrDB->expire.when() < start) {
size_t tried = 0; size_t tried = 0;
expired = activeExpireCycleExpire(g_pserver->db[dbid],*itrExpire,start,tried); expired = activeExpireCycleExpire(g_pserver->db[dbid],itrDB.key(),itrDB->expire,start,tried);
} }
} }
@ -516,7 +610,7 @@ void expireSlaveKeys(void) {
* corresponding bit in the new bitmap we set as value. * corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we * At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */ * no longer need to keep track of this key. */
if (itrExpire != db->setexpire()->end() && !expired) { if (itrDB != db->end() && itrDB->FExpires() && !expired) {
noexpire++; noexpire++;
new_dbids |= (uint64_t)1 << dbid; new_dbids |= (uint64_t)1 << dbid;
} }
@ -694,7 +788,6 @@ void ttlGenericCommand(client *c, int output_ms) {
/* The key exists. Return -1 if it has no expire, or the actual /* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */ * TTL value otherwise. */
std::unique_lock<fastlock> ul(g_expireLock);
expireEntry *pexpire = c->db->getExpire(c->argv[1]); expireEntry *pexpire = c->db->getExpire(c->argv[1]);
if (c->argc == 2) { if (c->argc == 2) {
@ -784,18 +877,11 @@ expireEntryFat::~expireEntryFat()
} }
expireEntryFat::expireEntryFat(const expireEntryFat &e) expireEntryFat::expireEntryFat(const expireEntryFat &e)
: m_keyPrimary(e.m_keyPrimary), m_vecexpireEntries(e.m_vecexpireEntries) : m_vecexpireEntries(e.m_vecexpireEntries)
{ {
// Note: dictExpires is not copied // Note: dictExpires is not copied
} }
expireEntryFat::expireEntryFat(expireEntryFat &&e)
: m_keyPrimary(std::move(e.m_keyPrimary)), m_vecexpireEntries(std::move(e.m_vecexpireEntries))
{
m_dictIndex = e.m_dictIndex;
e.m_dictIndex = nullptr;
}
void expireEntryFat::createIndex() void expireEntryFat::createIndex()
{ {
serverAssert(m_dictIndex == nullptr); serverAssert(m_dictIndex == nullptr);

View File

@ -22,9 +22,11 @@ public:
{} {}
subexpireEntry(const subexpireEntry &other) subexpireEntry(const subexpireEntry &other)
: spsubkey((const char*)sdsdupshared(other.spsubkey.get()), sdsfree) : spsubkey(nullptr, sdsfree)
{ {
when = other.when; when = other.when;
if (other.spsubkey != nullptr)
spsubkey = std::unique_ptr<const char, void(*)(const char*)>((const char*)sdsdupshared(other.spsubkey.get()), sdsfree);
} }
subexpireEntry(subexpireEntry &&) = default; subexpireEntry(subexpireEntry &&) = default;
@ -41,27 +43,30 @@ public:
}; };
private: private:
sdsimmutablestring m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
dict *m_dictIndex = nullptr; dict *m_dictIndex = nullptr;
long long m_whenPrimary = LLONG_MAX;
void createIndex(); void createIndex();
public: public:
expireEntryFat(const sdsimmutablestring &keyPrimary) expireEntryFat() = default;
: m_keyPrimary(keyPrimary) expireEntryFat(const expireEntryFat &);
{}
~expireEntryFat(); ~expireEntryFat();
expireEntryFat(const expireEntryFat &e);
expireEntryFat(expireEntryFat &&e);
long long when() const noexcept { return m_vecexpireEntries.front().when; } long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return static_cast<const char*>(m_keyPrimary); }
bool operator<(long long when) const noexcept { return this->when() < when; } bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when); void expireSubKey(const char *szSubkey, long long when);
bool FGetPrimaryExpire(long long *pwhen) const {
if (m_whenPrimary != LLONG_MAX) {
*pwhen = m_whenPrimary;
return true;
}
return false;
}
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry(); void popfrontExpireEntry();
@ -70,19 +75,11 @@ public:
}; };
class expireEntry { class expireEntry {
struct struct {
{ uint64_t m_whenAndPtrUnion : 63,
sdsimmutablestring m_key; fFat : 1;
expireEntryFat *m_pfatentry = nullptr; } s;
} u; static_assert(sizeof(expireEntryFat*) <= sizeof(int64_t), "The pointer must fit in the union");
long long m_when; // bit wise and with FFatMask means this is a fat entry and we should use the pointer
/* Mask to check if an entry is Fat, most significant bit of m_when being set means it is Fat otherwise it is not */
long long FFatMask() const noexcept {
return (1LL) << (sizeof(long long)*CHAR_BIT - 1);
}
expireEntry() = default;
public: public:
class iter class iter
{ {
@ -118,92 +115,103 @@ public:
const iter &operator*() const { return *this; } const iter &operator*() const { return *this; }
}; };
expireEntry(sds key, const char *subkey, long long when) expireEntry()
{
s.fFat = 0;
s.m_whenAndPtrUnion = 0;
}
expireEntry(const char *subkey, long long when)
{ {
if (subkey != nullptr) if (subkey != nullptr)
{ {
m_when = FFatMask() | INVALID_EXPIRE; auto pfatentry = new (MALLOC_LOCAL) expireEntryFat();
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key))); pfatentry->expireSubKey(subkey, when);
u.m_pfatentry->expireSubKey(subkey, when); s.m_whenAndPtrUnion = reinterpret_cast<long long>(pfatentry);
s.fFat = true;
} }
else else
{ {
u.m_key = sdsimmutablestring(sdsdupshared(key)); s.m_whenAndPtrUnion = when;
m_when = when; s.fFat = false;
} }
} }
expireEntry(const expireEntry &e)
{
*this = e;
}
expireEntry(expireEntry &&e)
{
u.m_key = std::move(e.u.m_key);
u.m_pfatentry = std::move(e.u.m_pfatentry);
m_when = e.m_when;
e.m_when = 0;
e.u.m_pfatentry = nullptr;
}
expireEntry(expireEntryFat *pfatentry) expireEntry(expireEntryFat *pfatentry)
{ {
u.m_pfatentry = pfatentry; assert(pfatentry != nullptr);
m_when = FFatMask() | INVALID_EXPIRE; s.m_whenAndPtrUnion = reinterpret_cast<long long>(pfatentry);
for (auto itr : *this) s.fFat = true;
{ }
if (itr.subkey() == nullptr)
{ expireEntry(const expireEntry &e) {
m_when = FFatMask() | itr.when(); if (e.FFat()) {
break; s.m_whenAndPtrUnion = reinterpret_cast<long long>(new expireEntryFat(*e.pfatentry()));
} s.fFat = true;
} else {
s = e.s;
} }
} }
expireEntry(expireEntry &&e)
{
s = e.s;
}
expireEntry &operator=(expireEntry &&e)
{
s = e.s;
e.s.m_whenAndPtrUnion = 0;
e.s.fFat = false;
return *this;
}
expireEntry &operator=(expireEntry &e) {
if (e.FFat()) {
s.m_whenAndPtrUnion = reinterpret_cast<long long>(new expireEntryFat(*e.pfatentry()));
s.fFat = true;
} else {
s = e.s;
}
return *this;
}
// Duplicate the expire, note this is intended to be passed directly to setExpire // Duplicate the expire, note this is intended to be passed directly to setExpire
expireEntry duplicate() const { expireEntry duplicate() const {
expireEntry dst; expireEntry dst;
dst.m_when = m_when;
if (FFat()) { if (FFat()) {
dst.u.m_pfatentry = new expireEntryFat(*u.m_pfatentry); auto pfatentry = new expireEntryFat(*expireEntry::pfatentry());
dst.s.m_whenAndPtrUnion = reinterpret_cast<long long>(pfatentry);
dst.s.fFat = true;
} else { } else {
dst.u.m_key = u.m_key; dst.s.m_whenAndPtrUnion = s.m_whenAndPtrUnion;
dst.s.fFat = false;
} }
return dst; return dst;
} }
void reset() {
if (FFat())
delete pfatentry();
s.fFat = false;
s.m_whenAndPtrUnion = 0;
}
~expireEntry() ~expireEntry()
{ {
if (FFat()) if (FFat())
delete u.m_pfatentry; delete pfatentry();
} }
expireEntry &operator=(const expireEntry &e) inline bool FFat() const noexcept { return s.fFat; }
{ expireEntryFat *pfatentry() {
u.m_key = e.u.m_key; assert(FFat());
m_when = e.m_when; return reinterpret_cast<expireEntryFat*>(s.m_whenAndPtrUnion);
if (e.FFat()) }
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); const expireEntryFat *pfatentry() const {
return *this; return const_cast<expireEntry*>(this)->pfatentry();
} }
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key));
else
u.m_key = sdsimmutablestring(sdsdupshared(key));
}
inline bool FFat() const noexcept { return m_when & FFatMask(); }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; }
bool operator==(const sdsview &key) const noexcept
{
return key == this->key();
}
bool operator<(const expireEntry &e) const noexcept bool operator<(const expireEntry &e) const noexcept
{ {
@ -214,17 +222,11 @@ public:
return this->when() < when; return this->when() < when;
} }
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return static_cast<const char*>(u.m_key);
}
long long when() const noexcept long long when() const noexcept
{ {
if (FFat()) if (FFat())
return u.m_pfatentry->when(); return pfatentry()->when();
return FGetPrimaryExpire(); return s.m_whenAndPtrUnion;
} }
void update(const char *subkey, long long when) void update(const char *subkey, long long when)
@ -233,30 +235,27 @@ public:
{ {
if (subkey == nullptr) if (subkey == nullptr)
{ {
m_when = when; s.m_whenAndPtrUnion = when;
return; return;
} }
else else
{ {
// we have to upgrade to a fat entry // we have to upgrade to a fat entry
long long whenT = m_when; auto pfatentry = new (MALLOC_LOCAL) expireEntryFat();
sdsimmutablestring keyPrimary = u.m_key; pfatentry->expireSubKey(nullptr, s.m_whenAndPtrUnion);
m_when |= FFatMask(); s.m_whenAndPtrUnion = reinterpret_cast<long long>(pfatentry);
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); s.fFat = true;
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through // at this point we're fat so fall through
} }
} }
if (subkey == nullptr) pfatentry()->expireSubKey(subkey, when);
m_when = when | FFatMask();
u.m_pfatentry->expireSubKey(subkey, when);
} }
iter begin() const { return iter(this, 0); } iter begin() const { return iter(this, 0); }
iter end() const iter end() const
{ {
if (FFat()) if (FFat())
return iter(this, u.m_pfatentry->size()); return iter(this, pfatentry()->size());
return iter(this, 1); return iter(this, 1);
} }
@ -268,26 +267,39 @@ public:
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
} }
size_t size() const size_t size() const {
{
if (FFat()) if (FFat())
return u.m_pfatentry->size(); return pfatentry()->size();
return 1; return 1;
} }
long long FGetPrimaryExpire() const noexcept
{
return m_when & (~FFatMask());
}
bool FGetPrimaryExpire(long long *pwhen) const noexcept bool FGetPrimaryExpire(long long *pwhen) const noexcept
{ {
*pwhen = FGetPrimaryExpire(); if (FFat()) {
return *pwhen != INVALID_EXPIRE; return pfatentry()->FGetPrimaryExpire(pwhen);
} else {
*pwhen = s.m_whenAndPtrUnion;
return true;
}
}
void *release_as_void() {
uint64_t whenT = s.m_whenAndPtrUnion;
whenT |= static_cast<uint64_t>(s.fFat) << 63;
s.m_whenAndPtrUnion = 0;
s.fFat = 0;
return reinterpret_cast<void*>(whenT);
}
static expireEntry *from_void(void **src) {
uintptr_t llV = reinterpret_cast<uintptr_t>(src);
return reinterpret_cast<expireEntry*>(llV);
}
static const expireEntry *from_void(void *const*src) {
uintptr_t llV = reinterpret_cast<uintptr_t>(src);
return reinterpret_cast<expireEntry*>(llV);
} }
explicit operator sdsview() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); } explicit operator long long() const noexcept { return when(); }
}; };
typedef semiorderedset<expireEntry, sdsview, true /*expireEntry can be memmoved*/> expireset; static_assert(sizeof(expireEntry) == sizeof(long long), "This must fit in a long long so it can be put in a dictEntry");
extern fastlock g_expireLock;

View File

@ -20,11 +20,9 @@ void lazyfreeFreeObject(void *args[]) {
* when the database was logically deleted. */ * when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) { void lazyfreeFreeDatabase(void *args[]) {
dict *ht1 = (dict *) args[0]; dict *ht1 = (dict *) args[0];
expireset *setexpire = (expireset *) args[1];
size_t numkeys = dictSize(ht1); size_t numkeys = dictSize(ht1);
dictRelease(ht1); dictRelease(ht1);
delete setexpire;
atomicDecr(lazyfree_objects,numkeys); atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys); atomicIncr(lazyfreed_objects,numkeys);
} }
@ -217,17 +215,15 @@ void freeObjAsync(robj *key, robj *obj) {
* create a new empty set of hash tables and scheduling the old ones for * create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */ * lazy freeing. */
void redisDbPersistentData::emptyDbAsync() { void redisDbPersistentData::emptyDbAsync() {
std::unique_lock<fastlock> ul(g_expireLock);
dict *oldht1 = m_pdict; dict *oldht1 = m_pdict;
auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,this); m_pdict = dictCreate(&dbDictType,this);
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->clearAsync(); m_spstorage->clearAsync();
if (m_fTrackingChanges) if (m_fTrackingChanges)
m_fAllChanged = true; m_fAllChanged = true;
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,set); m_numexpires = 0;
bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,nullptr);
} }
/* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */

View File

@ -2442,11 +2442,10 @@ int RM_UnlinkKey(RedisModuleKey *key) {
* If no TTL is associated with the key or if the key is empty, * If no TTL is associated with the key or if the key is empty,
* REDISMODULE_NO_EXPIRE is returned. */ * REDISMODULE_NO_EXPIRE is returned. */
mstime_t RM_GetExpire(RedisModuleKey *key) { mstime_t RM_GetExpire(RedisModuleKey *key) {
std::unique_lock<fastlock> ul(g_expireLock); auto itr = key->db->find(key->key);
expireEntry *pexpire = key->db->getExpire(key->key);
mstime_t expire = INVALID_EXPIRE; mstime_t expire = INVALID_EXPIRE;
if (pexpire != nullptr) if (itr->FExpires())
pexpire->FGetPrimaryExpire(&expire); itr->expire.FGetPrimaryExpire(&expire);
if (expire == INVALID_EXPIRE || key->value == NULL) if (expire == INVALID_EXPIRE || key->value == NULL)
return REDISMODULE_NO_EXPIRE; return REDISMODULE_NO_EXPIRE;

View File

@ -1141,10 +1141,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->db[mh->num_dbs].overhead_ht_main = mem; mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem; mem_total+=mem;
std::unique_lock<fastlock> ul(g_expireLock); mh->db[mh->num_dbs].overhead_ht_expires = 0;
mem = db->setexpire()->estimated_bytes_used();
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
mh->num_dbs++; mh->num_dbs++;
} }
@ -1628,7 +1625,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
return newObject; return newObject;
} }
robj *deserializeStoredObjectCore(const void *data, size_t cb) robj *deserializeStoredObject(const void *data, size_t cb)
{ {
switch (((char*)data)[0]) switch (((char*)data)[0])
{ {
@ -1665,14 +1662,6 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb)
serverPanic("Unknown object type loading from storage"); serverPanic("Unknown object type loading from storage");
} }
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb)
{
robj *o = deserializeStoredObjectCore(data, cb);
std::unique_lock<fastlock> ul(g_expireLock);
o->SetFExpires(db->setexpire()->exists(key));
return o;
}
sds serializeStoredObject(robj_roptr o, sds sdsPrefix) sds serializeStoredObject(robj_roptr o, sds sdsPrefix)
{ {
switch (o->type) switch (o->type)

View File

@ -1237,18 +1237,14 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
return 1; return 1;
} }
int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o) int saveKey(rio *rdb, int flags, size_t *processed, const char *keystr, robj_roptr o)
{ {
redisObjectStack key; redisObjectStack key;
initStaticStringObject(key,(char*)keystr); initStaticStringObject(key,(char*)keystr);
std::unique_lock<fastlock> ul(g_expireLock, std::defer_lock);
const expireEntry *pexpire = nullptr; const expireEntry *pexpire = nullptr;
if (o->FExpires()) if (o->FExpires()) {
{ pexpire = &o->expire;
ul.lock();
pexpire = db->getExpire(&key);
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
} }
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
@ -1355,7 +1351,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error
if (o->FExpires()) if (o->FExpires())
++ckeysExpired; ++ckeysExpired;
if (!saveKey(rdb, db, rdbflags, &processed, keystr, o)) if (!saveKey(rdb, rdbflags, &processed, keystr, o))
return false; return false;
/* Update child info every 1 second (approximately). /* Update child info every 1 second (approximately).

View File

@ -1518,6 +1518,16 @@ dictType dbDictType = {
dictGCAsyncFree /* async free destructor */ dictGCAsyncFree /* async free destructor */
}; };
dictType dbExpiresDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
dictExpandAllowed /* allow to expand */
};
/* db->pdict, keys are sds strings, vals are Redis objects. */ /* db->pdict, keys are sds strings, vals are Redis objects. */
dictType dbTombstoneDictType = { dictType dbTombstoneDictType = {
dictSdsHash, /* hash function */ dictSdsHash, /* hash function */
@ -1550,17 +1560,6 @@ dictType shaScriptObjectDictType = {
NULL /* allow to expand */ NULL /* allow to expand */
}; };
/* Db->expires */
dictType dbExpiresDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
dictExpandAllowed /* allow to expand */
};
/* Command table. sds string -> command struct pointer. */ /* Command table. sds string -> command struct pointer. */
dictType commandTableDictType = { dictType commandTableDictType = {
dictSdsCaseHash, /* hash function */ dictSdsCaseHash, /* hash function */

View File

@ -978,6 +978,7 @@ public:
private: private:
mutable std::atomic<unsigned> refcount {0}; mutable std::atomic<unsigned> refcount {0};
public: public:
expireEntry expire;
void *m_ptr; void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
@ -988,7 +989,7 @@ public:
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
} robj; } robj;
static_assert(sizeof(redisObject) <= 16, "object size is critical, don't increase"); static_assert(sizeof(redisObject) <= 24, "object size is critical, don't increase");
class redisObjectStack : public redisObjectExtended, public redisObject class redisObjectStack : public redisObjectExtended, public redisObject
{ {
@ -1144,16 +1145,20 @@ public:
dict_iter random(); dict_iter random();
const expireEntry &random_expire() const expireEntry *random_expire(sds *key)
{ {
return m_setexpire->random_value(); auto itr = random();
if (itr->FExpires()) {
*key = itr.key();
return &itr->expire;
}
return nullptr;
} }
dict_iter end() { return dict_iter(nullptr, nullptr); } dict_iter end() { return dict_iter(nullptr, nullptr); }
dict_const_iter end() const { return dict_const_iter(nullptr); } dict_const_iter end() const { return dict_const_iter(nullptr); }
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); }
void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); }
bool insert(char *k, robj *o, bool fAssumeNew = false, dict_iter *existing = nullptr); bool insert(char *k, robj *o, bool fAssumeNew = false, dict_iter *existing = nullptr);
void tryResize(); void tryResize();
@ -1161,16 +1166,15 @@ public:
void updateValue(dict_iter itr, robj *val); void updateValue(dict_iter itr, robj *val);
bool syncDelete(robj *key); bool syncDelete(robj *key);
bool asyncDelete(robj *key); bool asyncDelete(robj *key);
size_t expireSize() const { return m_setexpire->size(); } size_t expireSize() const { return m_numexpires; }
int removeExpire(robj *key, dict_iter itr); int removeExpire(robj *key, dict_iter itr);
int removeSubkeyExpire(robj *key, robj *subkey); int removeSubkeyExpire(robj *key, robj *subkey);
void resortExpire(expireEntry &e);
void clear(void(callback)(void*)); void clear(void(callback)(void*));
void emptyDbAsync(); void emptyDbAsync();
// Note: If you do not need the obj then use the objless iterator version. It's faster // Note: If you do not need the obj then use the objless iterator version. It's faster
bool iterate(std::function<bool(const char*, robj*)> fn); bool iterate(std::function<bool(const char*, robj*)> fn);
void setExpire(robj *key, robj *subkey, long long when); void setExpire(robj *key, robj *subkey, long long when);
void setExpire(expireEntry &&e); void setExpire(const char *key, expireEntry &&e);
void initialize(); void initialize();
void prepOverwriteForSnapshot(char *key); void prepOverwriteForSnapshot(char *key);
@ -1194,9 +1198,6 @@ public:
// objects stored elsewhere // objects stored elsewhere
dict *dictUnsafeKeyOnly() { return m_pdict; } dict *dictUnsafeKeyOnly() { return m_pdict; }
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() const { return m_setexpire; }
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot); void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
@ -1218,6 +1219,8 @@ public:
dict_iter find_cached_threadsafe(const char *key) const; dict_iter find_cached_threadsafe(const char *key) const;
static void activeExpireCycleCore(int type);
protected: protected:
uint64_t m_mvccCheckpoint = 0; uint64_t m_mvccCheckpoint = 0;
@ -1240,7 +1243,7 @@ private:
std::shared_ptr<StorageCache> m_spstorage = nullptr; std::shared_ptr<StorageCache> m_spstorage = nullptr;
// Expire // Expire
expireset *m_setexpire = nullptr; size_t m_numexpires = 0;
// These two pointers are the same, UNLESS the database has been cleared. // These two pointers are the same, UNLESS the database has been cleared.
// in which case m_pdbSnapshot is NULL and we continue as though we weren' // in which case m_pdbSnapshot is NULL and we continue as though we weren'
@ -1310,7 +1313,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
friend int removeExpire(redisDb *db, robj *key); friend int removeExpire(redisDb *db, robj *key);
friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when); friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when);
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend int evictionPoolPopulate(int dbid, redisDb *db, bool fVolatile, struct evictionPoolEntry *pool);
friend void activeDefragCycle(void); friend void activeDefragCycle(void);
friend void activeExpireCycle(int); friend void activeExpireCycle(int);
friend void expireSlaveKeys(void); friend void expireSlaveKeys(void);
@ -1319,9 +1322,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
typedef ::dict_const_iter const_iter; typedef ::dict_const_iter const_iter;
typedef ::dict_iter iter; typedef ::dict_iter iter;
redisDb() redisDb() = default;
: expireitr(nullptr)
{}
void initialize(int id); void initialize(int id);
void storageProviderInitialize(); void storageProviderInitialize();
@ -1343,7 +1344,6 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::random_expire; using redisDbPersistentData::random_expire;
using redisDbPersistentData::end; using redisDbPersistentData::end;
using redisDbPersistentData::getStats; using redisDbPersistentData::getStats;
using redisDbPersistentData::getExpireStats;
using redisDbPersistentData::insert; using redisDbPersistentData::insert;
using redisDbPersistentData::tryResize; using redisDbPersistentData::tryResize;
using redisDbPersistentData::incrementallyRehash; using redisDbPersistentData::incrementallyRehash;
@ -1361,15 +1361,12 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::processChanges; using redisDbPersistentData::processChanges;
using redisDbPersistentData::processChangesAsync; using redisDbPersistentData::processChangesAsync;
using redisDbPersistentData::commitChanges; using redisDbPersistentData::commitChanges;
using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire;
using redisDbPersistentData::endSnapshot; using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::restoreSnapshot; using redisDbPersistentData::restoreSnapshot;
using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::removeAllCachedValues;
using redisDbPersistentData::disableKeyCache; using redisDbPersistentData::disableKeyCache;
using redisDbPersistentData::keycacheIsEnabled; using redisDbPersistentData::keycacheIsEnabled;
using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::dictUnsafeKeyOnly;
using redisDbPersistentData::resortExpire;
using redisDbPersistentData::prefetchKeysAsync; using redisDbPersistentData::prefetchKeysAsync;
using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing; using redisDbPersistentData::FRehashing;
@ -1386,7 +1383,7 @@ public:
return psnapshot; return psnapshot;
} }
expireset::setiter expireitr; unsigned long expires_cursor = 0;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */ dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
@ -2247,7 +2244,6 @@ struct redisServerConst {
int maxidletime; /* Client timeout in seconds */ int maxidletime; /* Client timeout in seconds */
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_expire_enabled; /* Can be disabled for testing purposes. */
int active_expire_effort; /* From 1 (default) to 10, active effort. */
int active_defrag_enabled; int active_defrag_enabled;
int jemalloc_bg_thread; /* Enable jemalloc background thread */ int jemalloc_bg_thread; /* Enable jemalloc background thread */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
@ -2357,6 +2353,7 @@ struct redisServer {
unsigned int loading_process_events_interval_keys; unsigned int loading_process_events_interval_keys;
int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_expire_enabled; /* Can be disabled for testing purposes. */
int active_expire_effort; /* From 1 (default) to 10, active effort. */
int replicaIsolationFactor = 1; int replicaIsolationFactor = 1;
@ -3180,8 +3177,8 @@ int equalStringObjects(robj *a, robj *b);
unsigned long long estimateObjectIdleTime(robj_roptr o); unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o); void trimStringObjectIfNeeded(robj *o);
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); robj *deserializeStoredObject(const void *data, size_t cb);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr); sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)

View File

@ -74,6 +74,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_fTrackingChanges = 0; spdb->m_fTrackingChanges = 0;
spdb->m_pdict = m_pdict; spdb->m_pdict = m_pdict;
spdb->m_pdictTombstone = m_pdictTombstone; spdb->m_pdictTombstone = m_pdictTombstone;
spdb->m_numexpires = m_numexpires;
// Add a fake iterator so the dicts don't rehash (they need to be read only) // Add a fake iterator so the dicts don't rehash (they need to be read only)
dictPauseRehashing(spdb->m_pdict); dictPauseRehashing(spdb->m_pdict);
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
@ -83,12 +84,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1; spdb->m_refCount = 1;
spdb->m_mvccCheckpoint = getMvccTstamp(); spdb->m_mvccCheckpoint = getMvccTstamp();
if (m_setexpire != nullptr)
{
std::unique_lock<fastlock> ul(g_expireLock);
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
spdb->m_setexpire->pause_rehash(); // needs to be const
}
if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) { if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) {
serverLog(LL_VERBOSE, "NOTICE: Suboptimal snapshot"); serverLog(LL_VERBOSE, "NOTICE: Suboptimal snapshot");
@ -171,11 +166,6 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
size_t expectedSize = psnapshot->size(); size_t expectedSize = psnapshot->size();
dictEmpty(m_pdict, nullptr); dictEmpty(m_pdict, nullptr);
dictEmpty(m_pdictTombstone, nullptr); dictEmpty(m_pdictTombstone, nullptr);
{
std::unique_lock<fastlock> ul(g_expireLock);
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset(*psnapshot->m_setexpire);
}
endSnapshot(psnapshot); endSnapshot(psnapshot);
serverAssert(size() == expectedSize); serverAssert(size() == expectedSize);
} }
@ -597,8 +587,8 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(c
if (!fKeyOnly) if (!fKeyOnly)
{ {
size_t offset = 0; size_t offset = 0;
deserializeExpire(sdsKey, (const char*)data, cbData, &offset); deserializeExpire((const char*)data, cbData, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data)+offset, cbData-offset); o = deserializeStoredObject(reinterpret_cast<const char*>(data)+offset, cbData-offset);
} }
fContinue = fn(sdsKey, o); fContinue = fn(sdsKey, o);
if (o != nullptr) if (o != nullptr)

View File

@ -18,7 +18,7 @@ start_server {tags {"CRON"} overrides {hz 100} } {
test {keydb.cron repeat works} { test {keydb.cron repeat works} {
r flushall r flushall
r keydb.cron testjob repeat 0 600 {redis.call("incr","testkey")} r keydb.cron testjob repeat 0 900 {redis.call("incr","testkey")}
after 1000 after 1000
assert_equal 2 [r get testkey] assert_equal 2 [r get testkey]
} }