From 29e4973d7baf75a33f1bac8bcb54c7f7a498f524 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 9 Jul 2020 12:57:35 -0400 Subject: [PATCH 1/8] Initial scan implementation (not tested) Former-commit-id: 0f3911d56e6878d91b6e3cacc7d637934ffe6099 --- src/db.cpp | 58 +++++++++++++++++++++++++++++++++++++++++++----- src/server.h | 2 ++ src/snapshot.cpp | 55 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index a908f409c..08349bcd5 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -961,10 +961,10 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { * * In the case of a Hash object the function returns both the field and value * of every element on the Hash. */ +void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor); void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { int i, j; list *keys = listCreate(); - listNode *node, *nextnode; long count = 10; sds pat = NULL; sds type = NULL; @@ -1014,6 +1014,46 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } } + if (o == nullptr && count > 100) + { + // Do an async version + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); + if (snapshot != nullptr) + { + aeEventLoop *el = serverTL->el; + blockClient(c, BLOCKED_ASYNC); + redisDb *db = c->db; + sds patCopy = pat ? sdsdup(pat) : nullptr; + sds typeCopy = type ? sdsdup(type) : nullptr; + g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ + auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr); + + aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ + aeReleaseLock(); // we need to lock with coordination of the client + + std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); + + unblockClient(c); + scanFilterAndReply(c, keys, patCopy, typeCopy, use_pattern, nullptr, cursorResult); + if (patCopy != nullptr) + sdsfree(patCopy); + if (typeCopy != nullptr) + sdsfree(typeCopy); + + db->endSnapshot(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + aeAcquireLock(); + }); + }); + return; + } + } + /* Step 2: Iterate the collection. * * Note that if the object is encoded with a ziplist, intset, or any other @@ -1080,6 +1120,18 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { serverPanic("Not handled encoding in SCAN."); } + scanFilterAndReply(c, keys, pat, type, use_pattern, o, cursor); + +cleanup: + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); +} + +void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor) +{ + listNode *node, *nextnode; + int patlen = (pat != nullptr) ? sdslen(pat) : 0; + /* Step 3: Filter elements. */ node = listFirst(keys); while (node) { @@ -1144,10 +1196,6 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { decrRefCount(kobj); listDelNode(keys, node); } - -cleanup: - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); } /* The SCAN command completely relies on scanGenericCommand. */ diff --git a/src/server.h b/src/server.h index ad15c319a..1aab7bb85 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,6 +1395,8 @@ public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; + unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys, struct scan_callback_data *pdata = nullptr) const; + using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::endSnapshotAsync; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index cb65eb30a..229c56a4f 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -388,6 +388,61 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const return dict_iter(de); } +struct scan_callback_data +{ + dict *dictTombstone; + list *keys; +}; +void snapshot_scan_callback(void *privdata, const dictEntry *de) +{ + scan_callback_data *data = (scan_callback_data*)privdata; + if (data->dictTombstone != nullptr && dictFind(data->dictTombstone, dictGetKey(de)) != nullptr) + return; + + sds sdskey = (sds)dictGetKey(de); + listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); +} +unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys, scan_callback_data *pdata) const +{ + unsigned long iteratorReturn = 0; + + scan_callback_data dataT; + if (pdata == nullptr) + { + dataT.dictTombstone = m_pdictTombstone; + dataT.keys = keys; + pdata = &dataT; + } + + const redisDbPersistentDataSnapshot *psnapshot; + __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); + if (psnapshot != nullptr) + { + // Always process the snapshot first as we assume its bigger than we are + iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys, pdata); + } + + if (psnapshot == nullptr) + { + long maxiterations = count * 10; // allow more iterations than keys for sparse tables + iteratorReturn = iterator; + do { + iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, pdata); + } while (iteratorReturn && + maxiterations-- && + listLength(keys) < (unsigned long)count); + } + else + { + // Just catch up with our snapshot + do + { + iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata); + } while (iterator != 0 && iterator < iteratorReturn); + } + return iteratorReturn; +} + bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly, bool fCacheOnly) const { // Take the size so we can ensure we visited every element exactly once From bce5cb098bf291df2cf50bfc5efbfe47d140bfb3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 9 Jul 2020 21:58:06 +0000 Subject: [PATCH 2/8] Do actual filtering on the async thread Former-commit-id: 64b17139b7b5ff55bb52f27dc6f91cb81adde031 --- src/db.cpp | 46 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 08349bcd5..022f013c9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -950,6 +950,24 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { return C_OK; } + +static bool filterKey(robj_roptr kobj, sds pat, int patlen) +{ + bool filter = false; + if (sdsEncodedObject(kobj)) { + if (!stringmatchlen(pat, patlen, szFromObj(kobj), sdslen(szFromObj(kobj)), 0)) + filter = true; + } else { + char buf[LONG_STR_SIZE]; + int len; + + serverAssert(kobj->encoding == OBJ_ENCODING_INT); + len = ll2string(buf,sizeof(buf),(long)ptrFromObj(kobj)); + if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = true; + } + return filter; +} + /* This command implements SCAN, HSCAN and SSCAN commands. * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise * if 'o' is NULL the command will operate on the dictionary associated with @@ -1029,6 +1047,19 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sds typeCopy = type ? sdsdup(type) : nullptr; g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr); + if (use_pattern) { + listNode *ln = listFirst(keys); + int patlen = sdslen(patCopy); + while (ln != nullptr) + { + listNode *next = ln->next; + if (filterKey((robj*)listNodeValue(ln), patCopy, patlen)) + { + listDelNode(keys, ln); + } + ln = next; + } + } aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ aeReleaseLock(); // we need to lock with coordination of the client @@ -1038,7 +1069,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { locker.arm(c); unblockClient(c); - scanFilterAndReply(c, keys, patCopy, typeCopy, use_pattern, nullptr, cursorResult); + scanFilterAndReply(c, keys, nullptr, typeCopy, false, nullptr, cursorResult); if (patCopy != nullptr) sdsfree(patCopy); if (typeCopy != nullptr) @@ -1141,17 +1172,8 @@ void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_patter /* Filter element if it does not match the pattern. */ if (!filter && use_pattern) { - if (sdsEncodedObject(kobj)) { - if (!stringmatchlen(pat, patlen, szFromObj(kobj), sdslen(szFromObj(kobj)), 0)) - filter = 1; - } else { - char buf[LONG_STR_SIZE]; - int len; - - serverAssert(kobj->encoding == OBJ_ENCODING_INT); - len = ll2string(buf,sizeof(buf),(long)ptrFromObj(kobj)); - if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1; - } + if (filterKey(kobj, pat, patlen)) + filter = 1; } /* Filter an element if it isn't the type we want. */ From 993359e1bb95c299d2b7ea20bf9355b844af02ca Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 9 Jul 2020 22:29:27 +0000 Subject: [PATCH 3/8] Fix race in db iterators in scan Former-commit-id: a6444870660c0d3f52cd7b1dc0b80223f0d58e70 --- src/dict.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 0e8827165..ef7365fdb 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -952,7 +952,7 @@ unsigned long dictScan(dict *d, /* Having a safe iterator means no rehashing can happen, see _dictRehashStep. * This is needed in case the scan callback tries to do dictFind or alike. */ - d->iterators++; + __atomic_fetch_add(&d->iterators, 1, __ATOMIC_SEQ_CST); if (!dictIsRehashing(d)) { t0 = &(d->ht[0]); @@ -1021,7 +1021,7 @@ unsigned long dictScan(dict *d, } /* undo the ++ at the top */ - d->iterators--; + __atomic_fetch_sub(&d->iterators, 1, __ATOMIC_SEQ_CST); return v; } From 2db9eef2b1d3f64aa48243c5f825be7855e9502f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 00:17:39 +0000 Subject: [PATCH 4/8] Graceful shutdown running async jobs Former-commit-id: 2ce5b94741d384b2dfd6d3b347fa1582cd45df8f --- src/AsyncWorkQueue.cpp | 12 ++++++++++++ src/AsyncWorkQueue.h | 2 ++ src/server.cpp | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index 91fc2d2d2..be85f5ac2 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -73,6 +73,18 @@ bool AsyncWorkQueue::removeClientAsyncWrites(client *c) return fFound; } +void AsyncWorkQueue::shutdown() +{ + std::unique_lock lock(m_mutex); + serverAssert(!GlobalLocksAcquired()); + m_fQuitting = true; + m_cvWakeup.notify_all(); + lock.unlock(); + + for (auto &thread : m_vecthreads) + thread.join(); +} + void AsyncWorkQueue::abandonThreads() { std::unique_lock lock(m_mutex); diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h index 2a413404d..1f019324a 100644 --- a/src/AsyncWorkQueue.h +++ b/src/AsyncWorkQueue.h @@ -34,5 +34,7 @@ public: void AddWorkFunction(std::function &&fnAsync, bool fHiPri = false); bool removeClientAsyncWrites(struct client *c); + void shutdown(); + void abandonThreads(); }; \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index f5f2a93bd..539295de1 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4225,6 +4225,13 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); + if (g_pserver->asyncworkqueue) + { + aeReleaseLock(); + g_pserver->asyncworkqueue->shutdown(); + aeAcquireLock(); + } + for (int iel = 0; iel < cserver.cthreads; ++iel) { aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{ From 4ec5c3eb3166360d062255fb671b86bc1d9fbcd0 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 00:18:10 +0000 Subject: [PATCH 5/8] Fix issue where we fail to return all data Former-commit-id: 7341c9ed14d13386a045afd6deda9c0db422cd5f --- src/snapshot.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 229c56a4f..df9155cef 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -438,7 +438,7 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera do { iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata); - } while (iterator != 0 && iterator < iteratorReturn); + } while (iterator != 0 && (iterator < iteratorReturn || iteratorReturn == 0)); } return iteratorReturn; } From 1fef6c42b77d85271159c7909ac685e7e052c7da Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 01:43:32 +0000 Subject: [PATCH 6/8] Disable MVCC GET, the overhead is not worth it Former-commit-id: 8c7e1001e98be0d0e6fe0079fc9406d5d65366ff --- src/server.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server.cpp b/src/server.cpp index 539295de1..4a1b1ff72 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4096,11 +4096,13 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { queueMultiCommand(c); addReply(c,shared.queued); } else { +#if 0 if (cserver.cthreads >= 2 && !g_fTestMode && g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->monitors) == 0 && c->cmd->proc == getCommand) { if (getCommandAsync(c)) return C_OK; } +#endif locker.arm(c); incrementMvccTstamp(); call(c,callFlags); From 36927714571a6caddf4795da8f0b406fb26faaae Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 01:43:51 +0000 Subject: [PATCH 7/8] Fix issue where SCAN misses elements while snapshot is in flight Former-commit-id: ce005d748ebf0e116d674a96f74d698d17394010 --- src/db.cpp | 43 +++++++++++++++++++++++++------------------ src/server.h | 2 +- src/snapshot.cpp | 40 +++++++++++++++++++--------------------- 3 files changed, 45 insertions(+), 40 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 022f013c9..1ce382686 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1032,7 +1032,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } } - if (o == nullptr && count > 100) + if (o == nullptr && count >= 100) { // Do an async version const redisDbPersistentDataSnapshot *snapshot = nullptr; @@ -1046,7 +1046,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ - auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr); + auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys); if (use_pattern) { listNode *ln = listFirst(keys); int patlen = sdslen(patCopy); @@ -1109,23 +1109,30 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } if (ht) { - void *privdata[2]; - /* We set the max number of iterations to ten times the specified - * COUNT, so if the hash table is in a pathological state (very - * sparsely populated) we avoid to block too much time at the cost - * of returning no or very few elements. */ - long maxiterations = count*10; + if (ht == c->db->dictUnsafeKeyOnly()) + { + cursor = c->db->scan_threadsafe(cursor, count, keys); + } + else + { + void *privdata[2]; + /* We set the max number of iterations to ten times the specified + * COUNT, so if the hash table is in a pathological state (very + * sparsely populated) we avoid to block too much time at the cost + * of returning no or very few elements. */ + long maxiterations = count*10; - /* We pass two pointers to the callback: the list to which it will - * add new elements, and the object containing the dictionary so that - * it is possible to fetch more data in a type-dependent way. */ - privdata[0] = keys; - privdata[1] = o.unsafe_robjcast(); - do { - cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); - } while (cursor && - maxiterations-- && - listLength(keys) < (unsigned long)count); + /* We pass two pointers to the callback: the list to which it will + * add new elements, and the object containing the dictionary so that + * it is possible to fetch more data in a type-dependent way. */ + privdata[0] = keys; + privdata[1] = o.unsafe_robjcast(); + do { + cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); + } while (cursor && + maxiterations-- && + listLength(keys) < (unsigned long)count); + } } else if (o->type == OBJ_SET) { int pos = 0; int64_t ll; diff --git a/src/server.h b/src/server.h index 1aab7bb85..a3759cba8 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,7 +1395,7 @@ public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; - unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys, struct scan_callback_data *pdata = nullptr) const; + unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index df9155cef..b52edd2e1 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -20,6 +20,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 if (m_spdbSnapshotHOLDER != nullptr) { + serverLog(LL_DEBUG, "Attempting reuse of snapshot, client tstamp: %llu snapshot tstamp: %llu", mvccCheckpoint, m_spdbSnapshotHOLDER->m_mvccCheckpoint); // If possible reuse an existing snapshot (we want to minimize nesting) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint) { @@ -59,8 +60,11 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 } // See if we have too many levels and can bail out of this to reduce load - if (fOptional && (levels >= 4)) + if (fOptional && (levels >= 6)) + { + serverLog(LL_DEBUG, "Snapshot nesting too deep, abondoning"); return nullptr; + } auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); @@ -402,44 +406,38 @@ void snapshot_scan_callback(void *privdata, const dictEntry *de) sds sdskey = (sds)dictGetKey(de); listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); } -unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys, scan_callback_data *pdata) const +unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys) const { unsigned long iteratorReturn = 0; - scan_callback_data dataT; - if (pdata == nullptr) - { - dataT.dictTombstone = m_pdictTombstone; - dataT.keys = keys; - pdata = &dataT; - } + scan_callback_data data; + data.dictTombstone = m_pdictTombstone; + data.keys = keys; const redisDbPersistentDataSnapshot *psnapshot; __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); if (psnapshot != nullptr) { // Always process the snapshot first as we assume its bigger than we are - iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys, pdata); - } + iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys); - if (psnapshot == nullptr) + // Just catch up with our snapshot + do + { + iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, &data); + } while (iterator != 0 && (iterator < iteratorReturn || iteratorReturn == 0)); + } + else { long maxiterations = count * 10; // allow more iterations than keys for sparse tables iteratorReturn = iterator; do { - iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, pdata); + iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, &data); } while (iteratorReturn && maxiterations-- && listLength(keys) < (unsigned long)count); } - else - { - // Just catch up with our snapshot - do - { - iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata); - } while (iterator != 0 && (iterator < iteratorReturn || iteratorReturn == 0)); - } + return iteratorReturn; } From 2192c97d35bbf18c6b7ae6251249d20dd922ddbd Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 03:43:56 +0000 Subject: [PATCH 8/8] MVCC scan support filtering by type on the async thread Former-commit-id: 14f8c0ff686b93976eead5fa6bf526c2eecb5ae0 --- src/db.cpp | 16 ++++++++-------- src/server.h | 2 +- src/snapshot.cpp | 12 +++++++++--- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 1ce382686..39ba16cb9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1046,7 +1046,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ - auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys); + auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); if (use_pattern) { listNode *ln = listFirst(keys); int patlen = sdslen(patCopy); @@ -1060,8 +1060,12 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { ln = next; } } + if (patCopy != nullptr) + sdsfree(patCopy); + if (typeCopy != nullptr) + sdsfree(typeCopy); - aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ + aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{ aeReleaseLock(); // we need to lock with coordination of the client std::unique_locklock)> lock(c->lock); @@ -1069,11 +1073,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { locker.arm(c); unblockClient(c); - scanFilterAndReply(c, keys, nullptr, typeCopy, false, nullptr, cursorResult); - if (patCopy != nullptr) - sdsfree(patCopy); - if (typeCopy != nullptr) - sdsfree(typeCopy); + scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); db->endSnapshot(snapshot); listSetFreeMethod(keys,decrRefCountVoid); @@ -1111,7 +1111,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { if (ht) { if (ht == c->db->dictUnsafeKeyOnly()) { - cursor = c->db->scan_threadsafe(cursor, count, keys); + cursor = c->db->scan_threadsafe(cursor, count, nullptr, keys); } else { diff --git a/src/server.h b/src/server.h index a3759cba8..81f47304c 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,7 +1395,7 @@ public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; - unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys) const; + unsigned long scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index b52edd2e1..7cbbf94a2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -20,7 +20,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 if (m_spdbSnapshotHOLDER != nullptr) { - serverLog(LL_DEBUG, "Attempting reuse of snapshot, client tstamp: %llu snapshot tstamp: %llu", mvccCheckpoint, m_spdbSnapshotHOLDER->m_mvccCheckpoint); // If possible reuse an existing snapshot (we want to minimize nesting) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint) { @@ -395,6 +394,7 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const struct scan_callback_data { dict *dictTombstone; + sds type; list *keys; }; void snapshot_scan_callback(void *privdata, const dictEntry *de) @@ -404,22 +404,28 @@ void snapshot_scan_callback(void *privdata, const dictEntry *de) return; sds sdskey = (sds)dictGetKey(de); + if (data->type != nullptr) + { + if (strcasecmp(data->type, getObjectTypeName((robj*)dictGetVal(de))) != 0) + return; + } listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); } -unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys) const +unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const { unsigned long iteratorReturn = 0; scan_callback_data data; data.dictTombstone = m_pdictTombstone; data.keys = keys; + data.type = type; const redisDbPersistentDataSnapshot *psnapshot; __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); if (psnapshot != nullptr) { // Always process the snapshot first as we assume its bigger than we are - iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys); + iteratorReturn = psnapshot->scan_threadsafe(iterator, count, type, keys); // Just catch up with our snapshot do