Merge branch 'PRO_RELEASE_6' into keydbpro
Former-commit-id: bffe010ea5279bee869bc61cc6d933979e10bbea
This commit is contained in:
commit
8c2c0fba12
@ -1,3 +1,9 @@
|
|||||||
|
keydb-pro (6:6.0.12-1distribution_placeholder) codename_placeholder; urgency=medium
|
||||||
|
|
||||||
|
* 6.0.12 Enable SCAN for MVCC
|
||||||
|
|
||||||
|
-- Ben Schermel <ben@eqalpha.com> Fri, 10 Jul 2020 20:00:37 +0000
|
||||||
|
|
||||||
keydb-pro (6:6.0.11-1distribution_placeholder) codename_placeholder; urgency=medium
|
keydb-pro (6:6.0.11-1distribution_placeholder) codename_placeholder; urgency=medium
|
||||||
|
|
||||||
* 6.0.11 fixes applied related to cluster usage and expires
|
* 6.0.11 fixes applied related to cluster usage and expires
|
||||||
|
@ -1436,8 +1436,6 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
|
|
||||||
initStaticStringObject(key,(sds)keystr);
|
initStaticStringObject(key,(sds)keystr);
|
||||||
|
|
||||||
expireEntry *pexpire = db->getExpire(&key);
|
|
||||||
|
|
||||||
/* Save the key and associated value */
|
/* Save the key and associated value */
|
||||||
if (o->type == OBJ_STRING) {
|
if (o->type == OBJ_STRING) {
|
||||||
/* Emit a SET command */
|
/* Emit a SET command */
|
||||||
@ -1462,6 +1460,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
serverPanic("Unknown object type");
|
serverPanic("Unknown object type");
|
||||||
}
|
}
|
||||||
/* Save the expire time */
|
/* Save the expire time */
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
expireEntry *pexpire = db->getExpire(&key);
|
||||||
if (pexpire != nullptr) {
|
if (pexpire != nullptr) {
|
||||||
for (auto &subExpire : *pexpire) {
|
for (auto &subExpire : *pexpire) {
|
||||||
if (subExpire.subkey() == nullptr)
|
if (subExpire.subkey() == nullptr)
|
||||||
@ -1480,6 +1480,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common
|
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ul.unlock();
|
||||||
|
|
||||||
/* Read some diff from the parent process from time to time. */
|
/* Read some diff from the parent process from time to time. */
|
||||||
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
|
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
|
||||||
processed = aof->processed_bytes;
|
processed = aof->processed_bytes;
|
||||||
|
@ -5332,6 +5332,7 @@ try_again:
|
|||||||
/* Create RESTORE payload and generate the protocol to call the command. */
|
/* Create RESTORE payload and generate the protocol to call the command. */
|
||||||
for (j = 0; j < num_keys; j++) {
|
for (j = 0; j < num_keys; j++) {
|
||||||
long long ttl = 0;
|
long long ttl = 0;
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpire = c->db->getExpire(kv[j]);
|
expireEntry *pexpire = c->db->getExpire(kv[j]);
|
||||||
long long expireat = -1;
|
long long expireat = -1;
|
||||||
if (pexpire != nullptr)
|
if (pexpire != nullptr)
|
||||||
|
20
src/db.cpp
20
src/db.cpp
@ -39,7 +39,7 @@
|
|||||||
* C-level DB API
|
* C-level DB API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
int keyIsExpired(redisDb *db, robj *key);
|
int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key);
|
||||||
int expireIfNeeded(redisDb *db, robj *key, robj *o);
|
int expireIfNeeded(redisDb *db, robj *key, robj *o);
|
||||||
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
|
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
|
||||||
|
|
||||||
@ -847,7 +847,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p
|
|||||||
|
|
||||||
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
|
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
|
||||||
keyobj = createStringObject(key,sdslen(key));
|
keyobj = createStringObject(key,sdslen(key));
|
||||||
if (!keyIsExpired(c->db,keyobj)) {
|
if (!keyIsExpired(db,keyobj)) {
|
||||||
addReplyBulk(c,keyobj);
|
addReplyBulk(c,keyobj);
|
||||||
numkeys++;
|
numkeys++;
|
||||||
}
|
}
|
||||||
@ -1325,6 +1325,7 @@ void renameGenericCommand(client *c, int nx) {
|
|||||||
std::unique_ptr<expireEntry> spexpire;
|
std::unique_ptr<expireEntry> spexpire;
|
||||||
|
|
||||||
{ // scope pexpireOld since it will be invalid soon
|
{ // scope pexpireOld since it will be invalid soon
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
|
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
|
||||||
if (pexpireOld != nullptr)
|
if (pexpireOld != nullptr)
|
||||||
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
|
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
|
||||||
@ -1403,6 +1404,7 @@ void moveCommand(client *c) {
|
|||||||
|
|
||||||
std::unique_ptr<expireEntry> spexpire;
|
std::unique_ptr<expireEntry> spexpire;
|
||||||
{ // scope pexpireOld
|
{ // scope pexpireOld
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
|
expireEntry *pexpireOld = c->db->getExpire(c->argv[1]);
|
||||||
if (pexpireOld != nullptr)
|
if (pexpireOld != nullptr)
|
||||||
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
|
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
|
||||||
@ -1526,6 +1528,7 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
|
|||||||
/* An expire may only be removed if there is a corresponding entry in the
|
/* An expire may only be removed if there is a corresponding entry in the
|
||||||
* main dict. Otherwise, the key will never be freed. */
|
* main dict. Otherwise, the key will never be freed. */
|
||||||
serverAssertWithInfo(NULL,key,itr != nullptr);
|
serverAssertWithInfo(NULL,key,itr != nullptr);
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
|
||||||
robj *val = itr.val();
|
robj *val = itr.val();
|
||||||
if (!val->FExpires())
|
if (!val->FExpires())
|
||||||
@ -1542,7 +1545,8 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
|
|||||||
int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) {
|
int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) {
|
||||||
auto de = find(szFromObj(key));
|
auto de = find(szFromObj(key));
|
||||||
serverAssertWithInfo(NULL,key,de != nullptr);
|
serverAssertWithInfo(NULL,key,de != nullptr);
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
|
||||||
robj *val = de.val();
|
robj *val = de.val();
|
||||||
if (!val->FExpires())
|
if (!val->FExpires())
|
||||||
return 0;
|
return 0;
|
||||||
@ -1574,6 +1578,7 @@ int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) {
|
|||||||
|
|
||||||
void redisDbPersistentData::resortExpire(expireEntry &e)
|
void redisDbPersistentData::resortExpire(expireEntry &e)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
auto itr = m_setexpire->find(e.key());
|
auto itr = m_setexpire->find(e.key());
|
||||||
expireEntry eT = std::move(e);
|
expireEntry eT = std::move(e);
|
||||||
m_setexpire->erase(itr);
|
m_setexpire->erase(itr);
|
||||||
@ -1732,8 +1737,9 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Check if the key is expired. Note, this does not check subexpires */
|
/* Check if the key is expired. Note, this does not check subexpires */
|
||||||
int keyIsExpired(redisDb *db, robj *key) {
|
int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
|
||||||
expireEntry *pexpire = db->getExpire(key);
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
const expireEntry *pexpire = db->getExpire(key);
|
||||||
mstime_t now;
|
mstime_t now;
|
||||||
|
|
||||||
if (pexpire == nullptr) return 0; /* No expire for this key */
|
if (pexpire == nullptr) return 0; /* No expire for this key */
|
||||||
@ -2358,6 +2364,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
|
|||||||
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
|
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
|
||||||
{
|
{
|
||||||
/* Reuse the sds from the main dict in the expire dict */
|
/* Reuse the sds from the main dict in the expire dict */
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
|
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
|
||||||
serverAssertWithInfo(NULL,key,kde != NULL);
|
serverAssertWithInfo(NULL,key,kde != NULL);
|
||||||
trackkey(key, true /* fUpdate */);
|
trackkey(key, true /* fUpdate */);
|
||||||
@ -2387,12 +2394,14 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
|
|||||||
|
|
||||||
void redisDbPersistentData::setExpire(expireEntry &&e)
|
void redisDbPersistentData::setExpire(expireEntry &&e)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
trackkey(e.key(), true /* fUpdate */);
|
trackkey(e.key(), true /* fUpdate */);
|
||||||
m_setexpire->insert(e);
|
m_setexpire->insert(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool redisDb::FKeyExpires(const char *key)
|
bool redisDb::FKeyExpires(const char *key)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
return setexpireUnsafe()->find(key) != setexpire()->end();
|
return setexpireUnsafe()->find(key) != setexpire()->end();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2412,6 +2421,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
{
|
{
|
||||||
serverAssert(sdsKey != nullptr);
|
serverAssert(sdsKey != nullptr);
|
||||||
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
|
||||||
// First see if the key can be obtained from a snapshot
|
// First see if the key can be obtained from a snapshot
|
||||||
if (*pde == nullptr && m_pdbSnapshot != nullptr)
|
if (*pde == nullptr && m_pdbSnapshot != nullptr)
|
||||||
|
@ -127,6 +127,7 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) {
|
|||||||
void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) {
|
void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) {
|
||||||
uint32_t aux = htonl(o->type);
|
uint32_t aux = htonl(o->type);
|
||||||
mixDigest(digest,&aux,sizeof(aux));
|
mixDigest(digest,&aux,sizeof(aux));
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpire = db->getExpire(keyobj);
|
expireEntry *pexpire = db->getExpire(keyobj);
|
||||||
long long expiretime = -1;
|
long long expiretime = -1;
|
||||||
char buf[128];
|
char buf[128];
|
||||||
|
@ -811,6 +811,7 @@ long defragStream(redisDb *db, dictEntry *kde) {
|
|||||||
* all the various pointers it has. Returns a stat of how many pointers were
|
* all the various pointers it has. Returns a stat of how many pointers were
|
||||||
* moved. */
|
* moved. */
|
||||||
long defragKey(redisDb *db, dictEntry *de) {
|
long defragKey(redisDb *db, dictEntry *de) {
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
sds keysds = (sds)dictGetKey(de);
|
sds keysds = (sds)dictGetKey(de);
|
||||||
robj *newob, *ob;
|
robj *newob, *ob;
|
||||||
unsigned char *newzl;
|
unsigned char *newzl;
|
||||||
|
@ -262,6 +262,7 @@ int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evi
|
|||||||
{
|
{
|
||||||
if (setexpire != nullptr)
|
if (setexpire != nullptr)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 };
|
visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 };
|
||||||
setexpire->random_visit(visitor);
|
setexpire->random_visit(visitor);
|
||||||
return visitor.count;
|
return visitor.count;
|
||||||
|
@ -33,6 +33,8 @@
|
|||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cron.h"
|
#include "cron.h"
|
||||||
|
|
||||||
|
fastlock g_expireLock {"Expire"};
|
||||||
|
|
||||||
/* Helper function for the activeExpireCycle() function.
|
/* Helper function for the activeExpireCycle() function.
|
||||||
* This function will try to expire the key that is stored in the hash table
|
* This function will try to expire the key that is stored in the hash table
|
||||||
* entry 'de' of the 'expires' hash table of a Redis database.
|
* entry 'de' of the 'expires' hash table of a Redis database.
|
||||||
@ -371,6 +373,7 @@ void activeExpireCycle(int type) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
size_t expired = 0;
|
size_t expired = 0;
|
||||||
size_t tried = 0;
|
size_t tried = 0;
|
||||||
long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough
|
long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough
|
||||||
@ -660,6 +663,7 @@ void ttlGenericCommand(client *c, int output_ms) {
|
|||||||
|
|
||||||
/* The key exists. Return -1 if it has no expire, or the actual
|
/* The key exists. Return -1 if it has no expire, or the actual
|
||||||
* TTL value otherwise. */
|
* TTL value otherwise. */
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpire = c->db->getExpire(c->argv[1]);
|
expireEntry *pexpire = c->db->getExpire(c->argv[1]);
|
||||||
|
|
||||||
if (c->argc == 2) {
|
if (c->argc == 2) {
|
||||||
|
@ -119,6 +119,7 @@ void freeObjAsync(robj *o) {
|
|||||||
* create a new empty set of hash tables and scheduling the old ones for
|
* create a new empty set of hash tables and scheduling the old ones for
|
||||||
* lazy freeing. */
|
* lazy freeing. */
|
||||||
void redisDbPersistentData::emptyDbAsync() {
|
void redisDbPersistentData::emptyDbAsync() {
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
dict *oldht1 = m_pdict;
|
dict *oldht1 = m_pdict;
|
||||||
auto *set = m_setexpire;
|
auto *set = m_setexpire;
|
||||||
m_setexpire = new (MALLOC_LOCAL) expireset();
|
m_setexpire = new (MALLOC_LOCAL) expireset();
|
||||||
|
@ -2167,6 +2167,7 @@ int RM_UnlinkKey(RedisModuleKey *key) {
|
|||||||
* If no TTL is associated with the key or if the key is empty,
|
* If no TTL is associated with the key or if the key is empty,
|
||||||
* REDISMODULE_NO_EXPIRE is returned. */
|
* REDISMODULE_NO_EXPIRE is returned. */
|
||||||
mstime_t RM_GetExpire(RedisModuleKey *key) {
|
mstime_t RM_GetExpire(RedisModuleKey *key) {
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
expireEntry *pexpire = key->db->getExpire(key->key);
|
expireEntry *pexpire = key->db->getExpire(key->key);
|
||||||
mstime_t expire = -1;
|
mstime_t expire = -1;
|
||||||
if (pexpire != nullptr)
|
if (pexpire != nullptr)
|
||||||
|
@ -1078,8 +1078,10 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
|||||||
db->size() * sizeof(robj);
|
db->size() * sizeof(robj);
|
||||||
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
mem = db->setexpire()->bytes_used();
|
mem = db->setexpire()->bytes_used();
|
||||||
|
|
||||||
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
|
@ -1150,8 +1150,11 @@ int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t
|
|||||||
robj key;
|
robj key;
|
||||||
|
|
||||||
initStaticStringObject(key,(char*)keystr);
|
initStaticStringObject(key,(char*)keystr);
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
const expireEntry *pexpire = db->getExpire(&key);
|
const expireEntry *pexpire = db->getExpire(&key);
|
||||||
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
|
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
|
||||||
|
if (pexpire == nullptr)
|
||||||
|
ul.unlock(); // no need to hold the lock if we're not saving the expire
|
||||||
|
|
||||||
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
|
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2859,6 +2859,7 @@ bool getCommandAsync(client *c)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Are we expired?
|
// Are we expired?
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
const expireEntry *expire = serverTL->rgdbSnapshot[idb]->getExpire(c->argv[1]);
|
const expireEntry *expire = serverTL->rgdbSnapshot[idb]->getExpire(c->argv[1]);
|
||||||
long long when;
|
long long when;
|
||||||
if (expire && expire->FGetPrimaryExpire(&when) && when > 0) {
|
if (expire && expire->FGetPrimaryExpire(&when) && when > 0) {
|
||||||
|
@ -1182,6 +1182,7 @@ public:
|
|||||||
explicit operator long long() const noexcept { return when(); }
|
explicit operator long long() const noexcept { return when(); }
|
||||||
};
|
};
|
||||||
typedef semiorderedset<expireEntry, sdsview, true /*expireEntry can be memmoved*/> expireset;
|
typedef semiorderedset<expireEntry, sdsview, true /*expireEntry can be memmoved*/> expireset;
|
||||||
|
extern fastlock g_expireLock;
|
||||||
|
|
||||||
/* The a string name for an object's type as listed above
|
/* The a string name for an object's type as listed above
|
||||||
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
|
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
|
||||||
@ -2425,9 +2426,7 @@ struct redisServer {
|
|||||||
|
|
||||||
inline int redisServerThreadVars::getRdbKeySaveDelay() {
|
inline int redisServerThreadVars::getRdbKeySaveDelay() {
|
||||||
if (rdb_key_save_delay < 0) {
|
if (rdb_key_save_delay < 0) {
|
||||||
aeAcquireLock();
|
__atomic_load(&g_pserver->rdb_key_save_delay, &rdb_key_save_delay, __ATOMIC_ACQUIRE);
|
||||||
rdb_key_save_delay = g_pserver->rdb_key_save_delay;
|
|
||||||
aeReleaseLock();
|
|
||||||
}
|
}
|
||||||
return rdb_key_save_delay;
|
return rdb_key_save_delay;
|
||||||
}
|
}
|
||||||
|
@ -49,8 +49,10 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
dictForceRehash(m_spdbSnapshotHOLDER->m_pdictTombstone);
|
dictForceRehash(m_spdbSnapshotHOLDER->m_pdictTombstone);
|
||||||
dictMerge(m_pdbSnapshot->m_pdict, m_pdict);
|
dictMerge(m_pdbSnapshot->m_pdict, m_pdict);
|
||||||
dictEmpty(m_pdictTombstone, nullptr);
|
dictEmpty(m_pdictTombstone, nullptr);
|
||||||
delete m_spdbSnapshotHOLDER->m_setexpire;
|
{
|
||||||
m_spdbSnapshotHOLDER->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
(*m_spdbSnapshotHOLDER->m_setexpire) = *m_setexpire;
|
||||||
|
}
|
||||||
|
|
||||||
m_pdbSnapshotASYNC = nullptr;
|
m_pdbSnapshotASYNC = nullptr;
|
||||||
serverAssert(m_pdbSnapshot->m_pdict->iterators == 1);
|
serverAssert(m_pdbSnapshot->m_pdict->iterators == 1);
|
||||||
@ -82,6 +84,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
spdb->m_mvccCheckpoint = getMvccTstamp();
|
spdb->m_mvccCheckpoint = getMvccTstamp();
|
||||||
if (m_setexpire != nullptr)
|
if (m_setexpire != nullptr)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
|
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
|
||||||
spdb->m_setexpire->pause_rehash(); // needs to be const
|
spdb->m_setexpire->pause_rehash(); // needs to be const
|
||||||
}
|
}
|
||||||
@ -161,8 +164,11 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
|
|||||||
size_t expectedSize = psnapshot->size();
|
size_t expectedSize = psnapshot->size();
|
||||||
dictEmpty(m_pdict, nullptr);
|
dictEmpty(m_pdict, nullptr);
|
||||||
dictEmpty(m_pdictTombstone, nullptr);
|
dictEmpty(m_pdictTombstone, nullptr);
|
||||||
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
delete m_setexpire;
|
delete m_setexpire;
|
||||||
m_setexpire = new (MALLOC_LOCAL) expireset(*psnapshot->m_setexpire);
|
m_setexpire = new (MALLOC_LOCAL) expireset(*psnapshot->m_setexpire);
|
||||||
|
}
|
||||||
endSnapshot(psnapshot);
|
endSnapshot(psnapshot);
|
||||||
serverAssert(size() == expectedSize);
|
serverAssert(size() == expectedSize);
|
||||||
}
|
}
|
||||||
@ -555,18 +561,30 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
|
|||||||
spdb->initialize();
|
spdb->initialize();
|
||||||
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
||||||
|
|
||||||
|
volatile size_t skipped = 0;
|
||||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
||||||
if (o != nullptr) {
|
if (o != nullptr) {
|
||||||
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
||||||
incrRefCount(o);
|
incrRefCount(o);
|
||||||
|
} else {
|
||||||
|
++skipped;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}, true /*fKeyOnly*/, true /*fCacheOnly*/);
|
}, true /*fKeyOnly*/, true /*fCacheOnly*/);
|
||||||
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
||||||
|
{
|
||||||
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
delete spdb->m_setexpire;
|
||||||
|
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_pdbSnapshot->m_setexpire);
|
||||||
|
}
|
||||||
|
|
||||||
spdb->m_pdict->iterators++;
|
spdb->m_pdict->iterators++;
|
||||||
|
|
||||||
serverAssert(spdb->size() == m_pdbSnapshot->size());
|
if (m_spstorage) {
|
||||||
|
serverAssert(spdb->size() == m_pdbSnapshot->size());
|
||||||
|
} else {
|
||||||
|
serverAssert((spdb->size()+skipped) == m_pdbSnapshot->size());
|
||||||
|
}
|
||||||
|
|
||||||
// Now wire us in (Acquire the LOCK)
|
// Now wire us in (Acquire the LOCK)
|
||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
#define KEYDB_REAL_VERSION "0.0.0"
|
#define KEYDB_REAL_VERSION "6.0.12"
|
||||||
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
|
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
|
||||||
|
|
||||||
enum VersionCompareResult
|
enum VersionCompareResult
|
||||||
@ -25,4 +25,4 @@ enum VersionCompareResult compareVersion(struct SymVer *pver);
|
|||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
x
Reference in New Issue
Block a user