diff --git a/monkey/monkey.py b/monkey/monkey.py new file mode 100644 index 000000000..653970728 --- /dev/null +++ b/monkey/monkey.py @@ -0,0 +1,209 @@ +import keydb +import random +import sched, time +import socket +import asyncore +import threading +import sys +from pprint import pprint + +# Parameters +numclients = 50 +#numkeys = 1000000 +numkeys = 100000 + +# Globals +ops=0 +s = sched.scheduler(time.time, time.sleep) +g_exit = False + +def _buildResp(*args): + result = "*" + str(len(args)) + "\r\n" + for v in args: + result = result + "$" + str(len(v)) + "\r\n" + result = result + v + "\r\n" + return result.encode('utf-8') + +class Client(asyncore.dispatcher): + def __init__(self, host, port): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((host, port)) + self.buf = b'' + self.inbuf = b'' + self.callbacks = list() + + def handle_connect(self): + pass + + def handle_read(self): + self.inbuf += self.recv(8192) + self.parse_response() + + def handle_write(self): + sent = self.send(self.buf) + self.buf = self.buf[sent:] + + def handle_close(self): + self.close() + + def writable(self): + return len(self.buf) > 0 + + def parse_array(self, startpos): + assert(self.inbuf[startpos] == ord('*')) + endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos + assert(endrange > 0) + numargs = int(self.inbuf[startpos+1:endrange]) + assert(numargs > 0) + args = list() + startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char + + while len(args) < numargs: + # We're parsing entries of the form "$N\r\nnnnnnn\r\n" + if startpos >= len(self.inbuf): + return # Not the full response + if self.inbuf[startpos] == ord('*'): + startpos, arr = self.parse_array(startpos) + args.append(arr) + else: + assert(self.inbuf[startpos] == ord('$')) + startpos = startpos + 1 + endrange = self.inbuf[startpos:].find(b'\r') + if endrange < 0: + return + endrange += startpos + assert(endrange <= len(self.inbuf)) + length = int(self.inbuf[startpos:endrange]) + if length < 0: + return + startpos = endrange + 2 + assert((startpos + length) <= len(self.inbuf)) + assert(self.inbuf[startpos+length] == ord('\r')) + assert(self.inbuf[startpos+length+1] == ord('\n')) + args.append(self.inbuf[startpos:(startpos+length)]) + startpos += length + 2 + assert(len(args) == numargs) + return startpos, args + + def parse_response(self): + if len(self.inbuf) == 0: + return + + while len(self.inbuf) > 0: + if self.inbuf[0] == ord('+') or self.inbuf[0] == ord('-') or self.inbuf[0] == ord(':'): + # This is a single line response + endpos = self.inbuf.find(b'\n') + if endpos < 0: + return # incomplete response + self.callbacks[0](self, self.inbuf[0:endpos-1]) + self.callbacks.pop(0) + self.inbuf = self.inbuf[endpos+1:] + + elif self.inbuf[0] == ord('*'): + #RESP response + try: + startpos, args = self.parse_array(0) + except: + return # Not all data here yet + self.callbacks[0](self, args) + self.callbacks.pop(0) + self.inbuf = self.inbuf[startpos:] + else: + print("ERROR: Unknown response:") + pprint(self.inbuf) + assert(False) + + + def default_result_handler(self, result): + pprint(result) + + # Public Methods + def set(self, key, val, callback = default_result_handler): + self.buf += _buildResp("set", key, val) + self.callbacks.append(callback) + + def lpush(self, key, val, callback = default_result_handler): + self.buf += _buildResp("lpush", key, val) + self.callbacks.append(callback) + + def delete(self, key, callback = default_result_handler): + self.buf += _buildResp("del", key) + self.callbacks.append(callback) + + def scan(self, iter, match=None, count=None, callback = default_result_handler): + args = ["scan", str(iter)] + if match != None: + args.append("MATCH") + args.append(match) + if count != None: + args.append("COUNT") + args.append(str(count)) + self.buf += _buildResp(*args) + self.callbacks.append(callback) + + def get(self, key, callback = None): + return + + +def getrandomkey(): + return str(random.randrange(0, numkeys)) + +def handle_lpush_response(c, resp): + global ops + if resp != None: + ops = ops + 1 + assert(resp[0] == ord(':')) + c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) + +def handle_set_response(c, resp): + global ops + if resp != None: + ops = ops + 1 + assert(resp[0] == ord('+')) + c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) + +def handle_del_response(c, resp): + global ops + if resp != None: + ops = ops + 1 + c.delete("list_" + getrandomkey(), handle_del_response) + +def scan_callback(c, resp): + global ops + nextstart = int(resp[0]) + c.scan(nextstart, count=500, callback=scan_callback) + ops = ops+1 + +def stats_thread(): + global ops + global g_exit + while not g_exit: + time.sleep(1) + print("Ops per second: " + str(ops)) + ops = 0 + +def main(): + global g_exit + clients = [] + + for i in range(numclients): + clients.append(Client('127.0.0.1', 6379)) + for i in range (10): + handle_lpush_response(clients[-1], None) + #handle_set_response(clients[-1], None) + + scan_client = Client('127.0.0.1', 6379) + scan_client.scan(0, count=500, callback=scan_callback) + + del_client = Client('127.0.0.1', 6379) + handle_del_response(del_client, None) + + threading.Thread(target=stats_thread).start() + asyncore.loop() + g_exit = True + sys.exit(0) + print("DONE") + +if __name__ == "__main__": + main() diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index 48252ac97..dae36cc06 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -49,6 +49,11 @@ void AsyncWorkQueue::WorkerThreadMain() } listRelease(vars.clients_pending_asyncwrite); + + std::unique_lock lockf(serverTL->lockPendingWrite); + serverTL->vecclientsProcess.clear(); + serverTL->clients_pending_write.clear(); + std::atomic_thread_fence(std::memory_order_seq_cst); } bool AsyncWorkQueue::removeClientAsyncWrites(client *c) diff --git a/src/Makefile b/src/Makefile index 3f2ad9dd6..d76255fd4 100644 --- a/src/Makefile +++ b/src/Makefile @@ -55,7 +55,9 @@ ifneq ($(strip $(SANITIZE)),) USEASM=false endif - +ifeq ($(CHECKED),true) + CXXFLAGS+= -DCHECKED_BUILD +endif # Do we use our assembly spinlock? X64 only ifeq ($(uname_S),Linux) diff --git a/src/cron.cpp b/src/cron.cpp index 584407fac..876c69d88 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -31,7 +31,8 @@ void cronCommand(client *c) if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK) return; - long long base = g_pserver->mstime; + long long base; + __atomic_load(&g_pserver->mstime, &base, __ATOMIC_ACQUIRE); if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) { arg_offset++; std::swap(base, interval); @@ -120,18 +121,20 @@ void executeCronJobExpireHook(const char *key, robj *o) else { job->startTime += job->interval; - if (job->startTime < (uint64_t)g_pserver->mstime) + mstime_t mstime; + __atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE); + if (job->startTime < (uint64_t)mstime) { // If we are more than one interval in the past then fast forward to // the first interval still in the future. If startTime wasn't zero align // this to the original startTime, if it was zero align to now if (job->startTime == job->interval) { // startTime was 0 - job->startTime = g_pserver->mstime + job->interval; + job->startTime = mstime + job->interval; } else { - auto delta = g_pserver->mstime - job->startTime; + auto delta = mstime - job->startTime; auto multiple = (delta / job->interval)+1; job->startTime += job->interval * multiple; } diff --git a/src/db.cpp b/src/db.cpp index b60462c09..20b70fe02 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1713,7 +1713,8 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ - long long now = g_pserver->mstime; + mstime_t now; + __atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE); db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed if (db->expireSize() == 0) db->avg_ttl = 0; @@ -1894,7 +1895,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) { * open object in a next call, if the next call will see the key expired, * while the first did not. */ else if (serverTL->fixed_time_expire > 0) { - now = g_pserver->mstime; + __atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE); } /* For the other cases, we want to use the most fresh time we have. */ else { @@ -2588,6 +2589,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) { serverAssert(sdsKey != nullptr); serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid + serverAssert(m_refCount == 0); std::unique_lock ul(g_expireLock); // First see if the key can be obtained from a snapshot @@ -2781,7 +2783,7 @@ redisDbPersistentData::~redisDbPersistentData() if (m_spdbSnapshotHOLDER != nullptr) endSnapshot(m_spdbSnapshotHOLDER.get()); - //serverAssert(m_pdbSnapshot == nullptr); + serverAssert(m_pdbSnapshot == nullptr); serverAssert(m_refCount == 0); //serverAssert(m_pdict->iterators == 0); serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0); @@ -2853,16 +2855,16 @@ bool redisDbPersistentData::removeCachedValue(const char *key) return true; } +redisDbPersistentData::redisDbPersistentData() { + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); +} + void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint) { m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); - if (m_dictChanged == nullptr) { - m_dictChanged = dictCreate(&dictChangeDescType, nullptr); - } - if (sizeHint > 0) dictExpand(m_dictChanged, sizeHint, false); } diff --git a/src/dict.cpp b/src/dict.cpp index f25eff002..c682e2ec9 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -129,6 +129,7 @@ int _dictInit(dict *d, dictType *type, d->rehashidx = -1; d->iterators = 0; d->asyncdata = nullptr; + d->refcount = 1; return DICT_OK; } @@ -195,7 +196,7 @@ int dictMerge(dict *dst, dict *src) } size_t expectedSize = dictSize(src) + dictSize(dst); - if (dictSize(src) > dictSize(dst)) + if (dictSize(src) > dictSize(dst) && src->asyncdata == nullptr && dst->asyncdata == nullptr) { std::swap(*dst, *src); std::swap(dst->iterators, src->iterators); @@ -369,8 +370,14 @@ int dictRehash(dict *d, int n) { return 1; } +dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { + queue.reserve(c_targetQueueSize); + __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); +} + dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { - if (!dictIsRehashing(d)) return 0; + assert(d->type->asyncfree != nullptr); + if (!dictIsRehashing(d) || d->iterators != 0) return nullptr; d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); @@ -454,7 +461,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { } } - if (fUnlinked && !ctl->release) { + if (fUnlinked && !ctl->abondon) { if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash for (auto &wi : ctl->queue) { // We need to remove it from the source hash table, and store it in the dest. @@ -487,23 +494,10 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { } if (fFree) { - while (ctl->deGCList != nullptr) { - auto next = ctl->deGCList->next; - dictFreeKey(d, ctl->deGCList); - dictFreeVal(d, ctl->deGCList); - zfree(ctl->deGCList); - ctl->deGCList = next; - } + d->type->asyncfree(ctl); - // Was the dictionary free'd while we were in flight? - if (ctl->release) { - if (d->asyncdata != nullptr) - d->asyncdata->release = true; - else - dictRelease(d); - } - - delete ctl; + // Remove our reference + dictRelease(d); } } @@ -514,6 +508,16 @@ long long timeInMilliseconds(void) { return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000); } +dictAsyncRehashCtl::~dictAsyncRehashCtl() { + while (deGCList != nullptr) { + auto next = deGCList->next; + dictFreeKey(dict, deGCList); + dictFreeVal(dict, deGCList); + zfree(deGCList); + deGCList = next; + } +} + /* Rehash in ms+"delta" milliseconds. The value of "delta" is larger * than 0, and is smaller than 1 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/ @@ -537,7 +541,7 @@ int dictRehashMilliseconds(dict *d, int ms) { * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { - unsigned long iterators; + unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); if (iterators == 0) dictRehash(d,2); } @@ -766,13 +770,11 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { /* Clear & Release the hash table */ void dictRelease(dict *d) { - if (d->asyncdata) { - d->asyncdata->release = true; - return; + if (__atomic_sub_fetch(&d->refcount, 1, __ATOMIC_ACQ_REL) == 0) { + _dictClear(d,&d->ht[0],NULL); + _dictClear(d,&d->ht[1],NULL); + zfree(d); } - _dictClear(d,&d->ht[0],NULL); - _dictClear(d,&d->ht[1],NULL); - zfree(d); } dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare) @@ -1460,7 +1462,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) { void dictForceRehash(dict *d) { - unsigned long iterators; + unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d); } diff --git a/src/dict.h b/src/dict.h index 54519996b..ab57a7d7f 100644 --- a/src/dict.h +++ b/src/dict.h @@ -53,6 +53,7 @@ extern "C" { /* Unused arguments generate annoying warnings... */ #define DICT_NOTUSED(V) ((void) V) +struct dictAsyncRehashCtl; typedef struct dictEntry { void *key; @@ -72,6 +73,7 @@ typedef struct dictType { int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); + void (*asyncfree)(dictAsyncRehashCtl *); } dictType; /* This is our hash table structure. Every dictionary has two of this as we @@ -98,13 +100,12 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; - bool release = false; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; + std::atomic abondon { false }; - dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { - queue.reserve(c_targetQueueSize); - } + dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next); + ~dictAsyncRehashCtl(); }; #else struct dictAsyncRehashCtl; @@ -115,7 +116,8 @@ typedef struct dict { void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ - unsigned long iterators; /* number of iterators currently running */ + unsigned iterators; /* number of iterators currently running */ + unsigned refcount; dictAsyncRehashCtl *asyncdata; } dict; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 6e6b6e4d2..38afaa075 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -433,7 +433,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock) serverAssert(pidT >= 0); // unlock after free int t = -1; __atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE); - std::atomic_thread_fence(std::memory_order_release); + std::atomic_thread_fence(std::memory_order_acq_rel); ANNOTATE_RWLOCK_RELEASED(lock, true); uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case #ifdef __linux__ diff --git a/src/gc.h b/src/gc.h index 5c0596963..4715bc8de 100644 --- a/src/gc.h +++ b/src/gc.h @@ -29,6 +29,11 @@ class GarbageCollector }; public: + ~GarbageCollector() { + // Silence TSAN errors + m_lock.lock(); + } + uint64_t startEpoch() { std::unique_lock lock(m_lock); @@ -41,6 +46,7 @@ public: { std::unique_lock lock(m_lock); m_vecepochs.clear(); + m_setepochOutstanding.clear(); } void endEpoch(uint64_t epoch, bool fNoFree = false) diff --git a/src/rdb.cpp b/src/rdb.cpp index cedf787ce..ec510546d 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2370,21 +2370,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } -class EvictionPolicyCleanup -{ - int oldpolicy; - -public: - EvictionPolicyCleanup() { - oldpolicy = g_pserver->maxmemory_policy; - g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM; - } - - ~EvictionPolicyCleanup() { - g_pserver->maxmemory_policy = oldpolicy; - } -}; - /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { @@ -2401,10 +2386,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { sds key = nullptr; bool fLastKeyExpired = false; - // If we're running flash we may evict during load. We want a fast eviction function - // because there isn't any difference in use times between keys anyways - EvictionPolicyCleanup ecleanup; - for (int idb = 0; idb < cserver.dbnum; ++idb) { g_pserver->db[idb]->trackChanges(true, 1024); diff --git a/src/sds.c b/src/sds.c index 2c099a160..1c5fef555 100644 --- a/src/sds.c +++ b/src/sds.c @@ -195,7 +195,7 @@ void sdsfree(const char *s) { if ((flags & SDS_TYPE_MASK) == SDS_TYPE_REFCOUNTED) { SDS_HDR_VAR_REFCOUNTED(s); - if (__atomic_fetch_sub(&sh->refcount, 1, __ATOMIC_RELAXED) > 1) + if (__atomic_fetch_sub(&sh->refcount, 1, __ATOMIC_ACQ_REL) > 1) return; } s_free((char*)s-sdsHdrSize(s[-1])); diff --git a/src/server.cpp b/src/server.cpp index 56c476aa8..96d62e4ea 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1359,6 +1359,8 @@ uint64_t dictEncObjHash(const void *key) { } } +void dictGCAsyncFree(dictAsyncRehashCtl *async); + /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -1407,8 +1409,9 @@ dictType dbDictType = { NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - dictDbKeyDestructor, /* key destructor */ - dictObjectDestructor /* val destructor */ + dictDbKeyDestructor, /* key destructor */ + dictObjectDestructor, /* val destructor */ + dictGCAsyncFree /* async free destructor */ }; /* db->pdict, keys are sds strings, vals are Redis objects. */ @@ -2424,18 +2427,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - run_with_period(100) { - bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) - fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots) - { - g_pserver->asyncworkqueue->AddWorkFunction([]{ - g_pserver->db[0]->consolidate_snapshot(); - }, true /*HiPri*/); - } - } - /* Fire the cron loop modules event. */ RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz}; moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, @@ -3926,7 +3917,7 @@ void call(client *c, int flags) { dirty = g_pserver->dirty; incrementMvccTstamp(); __atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST); - start = g_pserver->ustime; + try { c->cmd->proc(c); } catch (robj_roptr o) { @@ -5403,10 +5394,12 @@ sds genRedisInfoString(const char *section) { vkeys = g_pserver->db[j]->expireSize(); // Adjust TTL by the current time - g_pserver->db[j]->avg_ttl -= (g_pserver->mstime - g_pserver->db[j]->last_expire_set); + mstime_t mstime; + __atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE); + g_pserver->db[j]->avg_ttl -= (mstime - g_pserver->db[j]->last_expire_set); if (g_pserver->db[j]->avg_ttl < 0) g_pserver->db[j]->avg_ttl = 0; - g_pserver->db[j]->last_expire_set = g_pserver->mstime; + g_pserver->db[j]->last_expire_set = mstime; if (keys || vkeys) { info = sdscatprintf(info, diff --git a/src/server.h b/src/server.h index 42a32ecde..07f025c46 100644 --- a/src/server.h +++ b/src/server.h @@ -1045,9 +1045,9 @@ class redisDbPersistentData friend class redisDbPersistentDataSnapshot; public: + redisDbPersistentData(); virtual ~redisDbPersistentData(); - redisDbPersistentData() = default; redisDbPersistentData(redisDbPersistentData &&) = default; size_t slots() const { return dictSlots(m_pdict); } @@ -1130,8 +1130,6 @@ public: void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot); void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot); - void consolidate_snapshot(); - bool FStorageProvider() { return m_spstorage != nullptr; } bool removeCachedValue(const char *key); void removeAllCachedValues(); @@ -1187,7 +1185,6 @@ private: protected: static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); - void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); bool freeTombstoneObjects(int depth); public: @@ -1289,7 +1286,6 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::restoreSnapshot; - using redisDbPersistentData::consolidate_snapshot; using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 5d479fb7f..29fa4a663 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -26,6 +26,17 @@ public: std::vector vecde; }; +void discontinueAsyncRehash(dict *d) { + if (d->asyncdata != nullptr) { + auto adata = d->asyncdata; + while (adata != nullptr) { + adata->abondon = true; + adata = adata->next; + } + d->rehashidx = 0; + } +} + const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { serverAssert(GlobalLocksAcquired()); @@ -56,34 +67,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 } } - if (m_pdbSnapshot != nullptr && m_pdbSnapshot == m_pdbSnapshotASYNC && m_spdbSnapshotHOLDER->m_refCount == 1 && dictSize(m_pdictTombstone) < c_elementsSmallLimit) - { - serverLog(LL_WARNING, "Reusing old snapshot"); - // is there an existing snapshot only owned by us? - - dictIterator *di = dictGetIterator(m_pdictTombstone); - dictEntry *de; - while ((de = dictNext(di)) != nullptr) - { - if (dictDelete(m_pdbSnapshot->m_pdict, dictGetKey(de)) != DICT_OK) - dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); - } - dictReleaseIterator(di); - - dictForceRehash(m_spdbSnapshotHOLDER->m_pdictTombstone); - dictMerge(m_pdbSnapshot->m_pdict, m_pdict); - dictEmpty(m_pdictTombstone, nullptr); - { - std::unique_lock ul(g_expireLock); - (*m_spdbSnapshotHOLDER->m_setexpire) = *m_setexpire; - } - - m_pdbSnapshotASYNC = nullptr; - serverAssert(m_pdbSnapshot->m_pdict->iterators == 1); - serverAssert(m_spdbSnapshotHOLDER->m_refCount == 1); - return m_pdbSnapshot; - } - // See if we have too many levels and can bail out of this to reduce load if (fOptional && (levels >= 6)) { @@ -95,14 +78,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 // We can't have async rehash modifying these. Setting the asyncdata list to null // will cause us to throw away the async work rather than modify the tables in flight - if (m_pdict->asyncdata != nullptr) { - m_pdict->asyncdata = nullptr; - m_pdict->rehashidx = 0; - } - if (m_pdictTombstone->asyncdata != nullptr) { - m_pdictTombstone->rehashidx = 0; - m_pdictTombstone->asyncdata = nullptr; - } + discontinueAsyncRehash(m_pdict); + discontinueAsyncRehash(m_pdictTombstone); spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; @@ -125,7 +102,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 } if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) { - serverLog(LL_NOTICE, "NOTICE: Suboptimal snapshot"); + serverLog(LL_VERBOSE, "NOTICE: Suboptimal snapshot"); } m_pdict = dictCreate(&dbDictType,this); @@ -152,6 +129,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 m_pdbSnapshotASYNC = nullptr; } + std::atomic_thread_fence(std::memory_order_seq_cst); return m_pdbSnapshot; } @@ -189,7 +167,7 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot //psnapshot->m_pdict->iterators--; psnapshot->m_spdbSnapshotHOLDER.release(); - //psnapshot->m_pdbSnapshot = nullptr; + psnapshot->m_pdbSnapshot = nullptr; g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr(psnapshot)); serverLog(LL_VERBOSE, "Garbage collected snapshot"); } @@ -275,7 +253,6 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot aeReleaseLock(); return; } - const_cast(psnapshotT)->consolidate_children(this, true); // Final Cleanup aeAcquireLock(); latencyStartMonitor(latency); @@ -347,6 +324,12 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictForceRehash(dictTombstoneNew); aeAcquireLock(); + if (m_pdbSnapshot->m_pdict->asyncdata != nullptr) { + // In this case we use the asyncdata to free us, not our own lazy free + for (auto de : splazy->vecde) + dictFreeUnlinkedEntry(m_pdbSnapshot->m_pdict, de); + splazy->vecde.clear(); + } dict *dT = m_pdbSnapshot->m_pdict; splazy->vecdictLazyFree.push_back(m_pdictTombstone); __atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE); @@ -421,7 +404,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn #ifdef CHECKED_BUILD serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); #endif - dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); + dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de)); continue; } else if (deSnapshot == nullptr) @@ -431,8 +414,14 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - splazy->vecde.push_back(deSnapshot); - *dePrev = deSnapshot->next; + *dePrev = deSnapshot->next; // Unlink it first + if (deSnapshot != nullptr) { + if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) { + dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); + } else { + splazy->vecde.push_back(deSnapshot); + } + } ht->used--; } @@ -454,12 +443,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn if (m_pdbSnapshot != nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr) { m_pdbSnapshot = m_spdbSnapshotHOLDER->m_pdbSnapshot; - m_spdbSnapshotHOLDER->m_pdbSnapshot = nullptr; } else { m_pdbSnapshot = nullptr; } + m_spdbSnapshotHOLDER->m_pdbSnapshot = nullptr; // Fixup the about to free'd snapshots iterator count so the dtor doesn't complain if (m_refCount) @@ -663,111 +652,23 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const return 0; } - -void redisDbPersistentData::consolidate_snapshot() -{ - aeAcquireLock(); - auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr; - if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0) - { - aeReleaseLock(); - return; - } - - psnapshot->m_refCount++; // ensure it's not free'd - aeReleaseLock(); - psnapshot->consolidate_children(this, false /* fForce */); - aeAcquireLock(); - endSnapshot(psnapshot); - aeReleaseLock(); -} - -// only call this on the "real" database to consolidate the first child -void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) -{ - std::unique_lock lock(s_lock, std::defer_lock); - if (!lock.try_lock()) - return; // this is a best effort function - - if (!fForce && snapshot_depth() < 2) - return; - - auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); - spdb->initialize(); - dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); - - volatile size_t skipped = 0; - m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) { - if (o != nullptr || !m_spstorage) { - dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); - if (o != nullptr) { - incrRefCount(o); - } - } else { - ++skipped; - } - return true; - }, true /*fKeyOnly*/, true /*fCacheOnly*/); - spdb->m_spstorage = m_pdbSnapshot->m_spstorage; - { - std::unique_lock ul(g_expireLock); - delete spdb->m_setexpire; - spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_pdbSnapshot->m_setexpire); - } - - spdb->m_pdict->iterators++; - - if (m_spstorage) { - serverAssert(spdb->size() == m_pdbSnapshot->size()); - } else { - serverAssert((spdb->size()+skipped) == m_pdbSnapshot->size()); - } - - // Now wire us in (Acquire the LOCK) - AeLocker locker; - locker.arm(nullptr); - - int depth = 0; - redisDbPersistentDataSnapshot *psnapshotT = pdbPrimary->m_spdbSnapshotHOLDER.get(); - while (psnapshotT != nullptr) - { - ++depth; - if (psnapshotT == this) - break; - psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get(); - } - if (psnapshotT != this) - { - locker.disarm(); // don't run spdb's dtor in the lock - return; // we were unlinked and this was a waste of time - } - - serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1); - spdb->m_refCount = depth; - // Drop our refs from this snapshot and its children - psnapshotT = this; - std::vector vecT; - while ((psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get()) != nullptr) - { - vecT.push_back(psnapshotT); - } - for (auto itr = vecT.rbegin(); itr != vecT.rend(); ++itr) - { - psnapshotT = *itr; - psnapshotT->m_refCount -= (depth-1); // -1 because dispose will sub another - gcDisposeSnapshot(psnapshotT); - } - std::atomic_thread_fence(std::memory_order_seq_cst); - m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now - m_spdbSnapshotHOLDER = std::move(spdb); - const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get(); - __atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST); - locker.disarm(); // ensure we're not locked for any dtors -} - bool redisDbPersistentDataSnapshot::FStale() const { // 0.5 seconds considered stale; static const uint64_t msStale = 500; return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale; +} + +void dictGCAsyncFree(dictAsyncRehashCtl *async) { + if (async->deGCList != nullptr && serverTL != nullptr && !serverTL->gcEpoch.isReset()) { + auto splazy = std::make_unique(); + auto *de = async->deGCList; + while (de != nullptr) { + splazy->vecde.push_back(de); + de = de->next; + } + async->deGCList = nullptr; + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + } + delete async; } \ No newline at end of file