diff --git a/pkg/deb/master_changelog b/pkg/deb/master_changelog index 999c9c872..e92f8cc95 100644 --- a/pkg/deb/master_changelog +++ b/pkg/deb/master_changelog @@ -1,3 +1,9 @@ +keydb-pro (6:6.0.12-1distribution_placeholder) codename_placeholder; urgency=medium + + * 6.0.12 Enable SCAN for MVCC + + -- Ben Schermel Fri, 10 Jul 2020 20:00:37 +0000 + keydb-pro (6:6.0.11-1distribution_placeholder) codename_placeholder; urgency=medium * 6.0.11 fixes applied related to cluster usage and expires diff --git a/src/aof.cpp b/src/aof.cpp index 8055007f1..014a56b44 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1436,8 +1436,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { initStaticStringObject(key,(sds)keystr); - expireEntry *pexpire = db->getExpire(&key); - /* Save the key and associated value */ if (o->type == OBJ_STRING) { /* Emit a SET command */ @@ -1462,6 +1460,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { serverPanic("Unknown object type"); } /* Save the expire time */ + std::unique_lock ul(g_expireLock); + expireEntry *pexpire = db->getExpire(&key); if (pexpire != nullptr) { for (auto &subExpire : *pexpire) { if (subExpire.subkey() == nullptr) @@ -1480,6 +1480,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common } } + ul.unlock(); + /* Read some diff from the parent process from time to time. */ if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = aof->processed_bytes; diff --git a/src/cluster.cpp b/src/cluster.cpp index a13386cba..3a20c9897 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5332,6 +5332,7 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; + std::unique_lock ul(g_expireLock); expireEntry *pexpire = c->db->getExpire(kv[j]); long long expireat = -1; if (pexpire != nullptr) diff --git a/src/db.cpp b/src/db.cpp index 0c1cb0350..e3dfd103f 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -39,7 +39,7 @@ * C-level DB API *----------------------------------------------------------------------------*/ -int keyIsExpired(redisDb *db, robj *key); +int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); @@ -847,7 +847,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { keyobj = createStringObject(key,sdslen(key)); - if (!keyIsExpired(c->db,keyobj)) { + if (!keyIsExpired(db,keyobj)) { addReplyBulk(c,keyobj); numkeys++; } @@ -1325,6 +1325,7 @@ void renameGenericCommand(client *c, int nx) { std::unique_ptr spexpire; { // scope pexpireOld since it will be invalid soon + std::unique_lock ul(g_expireLock); expireEntry *pexpireOld = c->db->getExpire(c->argv[1]); if (pexpireOld != nullptr) spexpire = std::make_unique(std::move(*pexpireOld)); @@ -1403,6 +1404,7 @@ void moveCommand(client *c) { std::unique_ptr spexpire; { // scope pexpireOld + std::unique_lock ul(g_expireLock); expireEntry *pexpireOld = c->db->getExpire(c->argv[1]); if (pexpireOld != nullptr) spexpire = std::make_unique(std::move(*pexpireOld)); @@ -1526,6 +1528,7 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ serverAssertWithInfo(NULL,key,itr != nullptr); + std::unique_lock ul(g_expireLock); robj *val = itr.val(); if (!val->FExpires()) @@ -1542,7 +1545,8 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { auto de = find(szFromObj(key)); serverAssertWithInfo(NULL,key,de != nullptr); - + std::unique_lock ul(g_expireLock); + robj *val = de.val(); if (!val->FExpires()) return 0; @@ -1574,6 +1578,7 @@ int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { void redisDbPersistentData::resortExpire(expireEntry &e) { + std::unique_lock ul(g_expireLock); auto itr = m_setexpire->find(e.key()); expireEntry eT = std::move(e); m_setexpire->erase(itr); @@ -1732,8 +1737,9 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) } /* Check if the key is expired. Note, this does not check subexpires */ -int keyIsExpired(redisDb *db, robj *key) { - expireEntry *pexpire = db->getExpire(key); +int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) { + std::unique_lock ul(g_expireLock); + const expireEntry *pexpire = db->getExpire(key); mstime_t now; if (pexpire == nullptr) return 0; /* No expire for this key */ @@ -2358,6 +2364,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) { /* Reuse the sds from the main dict in the expire dict */ + std::unique_lock ul(g_expireLock); dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); trackkey(key, true /* fUpdate */); @@ -2387,12 +2394,14 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(expireEntry &&e) { + std::unique_lock ul(g_expireLock); trackkey(e.key(), true /* fUpdate */); m_setexpire->insert(e); } bool redisDb::FKeyExpires(const char *key) { + std::unique_lock ul(g_expireLock); return setexpireUnsafe()->find(key) != setexpire()->end(); } @@ -2412,6 +2421,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) { serverAssert(sdsKey != nullptr); serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid + std::unique_lock ul(g_expireLock); // First see if the key can be obtained from a snapshot if (*pde == nullptr && m_pdbSnapshot != nullptr) diff --git a/src/debug.cpp b/src/debug.cpp index 9aebe8885..fc565d436 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -127,6 +127,7 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) { void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); + std::unique_lock ul(g_expireLock); expireEntry *pexpire = db->getExpire(keyobj); long long expiretime = -1; char buf[128]; diff --git a/src/defrag.cpp b/src/defrag.cpp index e60705a3e..65aecfa34 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -811,6 +811,7 @@ long defragStream(redisDb *db, dictEntry *kde) { * all the various pointers it has. Returns a stat of how many pointers were * moved. */ long defragKey(redisDb *db, dictEntry *de) { + std::unique_lock ul(g_expireLock); sds keysds = (sds)dictGetKey(de); robj *newob, *ob; unsigned char *newzl; diff --git a/src/evict.cpp b/src/evict.cpp index 6965748c5..ce9c420c4 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -262,6 +262,7 @@ int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evi { if (setexpire != nullptr) { + std::unique_lock ul(g_expireLock); visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 }; setexpire->random_visit(visitor); return visitor.count; diff --git a/src/expire.cpp b/src/expire.cpp index 5784690ee..b442038d4 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -33,6 +33,8 @@ #include "server.h" #include "cron.h" +fastlock g_expireLock {"Expire"}; + /* Helper function for the activeExpireCycle() function. * This function will try to expire the key that is stored in the hash table * entry 'de' of the 'expires' hash table of a Redis database. @@ -371,6 +373,7 @@ void activeExpireCycle(int type) { continue; } + std::unique_lock ul(g_expireLock); size_t expired = 0; size_t tried = 0; long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough @@ -660,6 +663,7 @@ void ttlGenericCommand(client *c, int output_ms) { /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ + std::unique_lock ul(g_expireLock); expireEntry *pexpire = c->db->getExpire(c->argv[1]); if (c->argc == 2) { diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 141df38f2..1ee49103a 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -119,6 +119,7 @@ void freeObjAsync(robj *o) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void redisDbPersistentData::emptyDbAsync() { + std::unique_lock ul(g_expireLock); dict *oldht1 = m_pdict; auto *set = m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); diff --git a/src/module.cpp b/src/module.cpp index 27055e595..7df8f7c66 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -2167,6 +2167,7 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { + std::unique_lock ul(g_expireLock); expireEntry *pexpire = key->db->getExpire(key->key); mstime_t expire = -1; if (pexpire != nullptr) diff --git a/src/object.cpp b/src/object.cpp index 72ac1b961..a45a91db8 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1078,8 +1078,10 @@ struct redisMemOverhead *getMemoryOverheadData(void) { db->size() * sizeof(robj); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - + + std::unique_lock ul(g_expireLock); mem = db->setexpire()->bytes_used(); + mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; diff --git a/src/rdb.cpp b/src/rdb.cpp index 278c657e4..0f5735332 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1150,8 +1150,11 @@ int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t robj key; initStaticStringObject(key,(char*)keystr); + std::unique_lock ul(g_expireLock); const expireEntry *pexpire = db->getExpire(&key); serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr)); + if (pexpire == nullptr) + ul.unlock(); // no need to hold the lock if we're not saving the expire if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) return 0; diff --git a/src/server.cpp b/src/server.cpp index e6e7df3c2..daf1eb8a0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2859,6 +2859,7 @@ bool getCommandAsync(client *c) } // Are we expired? + std::unique_lock ul(g_expireLock); const expireEntry *expire = serverTL->rgdbSnapshot[idb]->getExpire(c->argv[1]); long long when; if (expire && expire->FGetPrimaryExpire(&when) && when > 0) { diff --git a/src/server.h b/src/server.h index cbff46156..683b9871e 100644 --- a/src/server.h +++ b/src/server.h @@ -1182,6 +1182,7 @@ public: explicit operator long long() const noexcept { return when(); } }; typedef semiorderedset expireset; +extern fastlock g_expireLock; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -2425,9 +2426,7 @@ struct redisServer { inline int redisServerThreadVars::getRdbKeySaveDelay() { if (rdb_key_save_delay < 0) { - aeAcquireLock(); - rdb_key_save_delay = g_pserver->rdb_key_save_delay; - aeReleaseLock(); + __atomic_load(&g_pserver->rdb_key_save_delay, &rdb_key_save_delay, __ATOMIC_ACQUIRE); } return rdb_key_save_delay; } diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 7cbbf94a2..38ab9b7b2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -49,8 +49,10 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 dictForceRehash(m_spdbSnapshotHOLDER->m_pdictTombstone); dictMerge(m_pdbSnapshot->m_pdict, m_pdict); dictEmpty(m_pdictTombstone, nullptr); - delete m_spdbSnapshotHOLDER->m_setexpire; - m_spdbSnapshotHOLDER->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); + { + std::unique_lock ul(g_expireLock); + (*m_spdbSnapshotHOLDER->m_setexpire) = *m_setexpire; + } m_pdbSnapshotASYNC = nullptr; serverAssert(m_pdbSnapshot->m_pdict->iterators == 1); @@ -82,6 +84,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_mvccCheckpoint = getMvccTstamp(); if (m_setexpire != nullptr) { + std::unique_lock ul(g_expireLock); spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); spdb->m_setexpire->pause_rehash(); // needs to be const } @@ -161,8 +164,11 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot size_t expectedSize = psnapshot->size(); dictEmpty(m_pdict, nullptr); dictEmpty(m_pdictTombstone, nullptr); + { + std::unique_lock ul(g_expireLock); delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(*psnapshot->m_setexpire); + } endSnapshot(psnapshot); serverAssert(size() == expectedSize); } @@ -555,18 +561,30 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * spdb->initialize(); dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); + volatile size_t skipped = 0; m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) { if (o != nullptr) { dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); incrRefCount(o); + } else { + ++skipped; } return true; }, true /*fKeyOnly*/, true /*fCacheOnly*/); spdb->m_spstorage = m_pdbSnapshot->m_spstorage; + { + std::unique_lock ul(g_expireLock); + delete spdb->m_setexpire; + spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_pdbSnapshot->m_setexpire); + } spdb->m_pdict->iterators++; - serverAssert(spdb->size() == m_pdbSnapshot->size()); + if (m_spstorage) { + serverAssert(spdb->size() == m_pdbSnapshot->size()); + } else { + serverAssert((spdb->size()+skipped) == m_pdbSnapshot->size()); + } // Now wire us in (Acquire the LOCK) AeLocker locker; diff --git a/src/version.h b/src/version.h index 94c254ac2..92689b67a 100644 --- a/src/version.h +++ b/src/version.h @@ -1,4 +1,4 @@ -#define KEYDB_REAL_VERSION "0.0.0" +#define KEYDB_REAL_VERSION "6.0.12" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config enum VersionCompareResult @@ -25,4 +25,4 @@ enum VersionCompareResult compareVersion(struct SymVer *pver); #ifdef __cplusplus } -#endif \ No newline at end of file +#endif