diff --git a/src/db.cpp b/src/db.cpp index a908f409c..08349bcd5 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -961,10 +961,10 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) { * * In the case of a Hash object the function returns both the field and value * of every element on the Hash. */ +void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor); void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { int i, j; list *keys = listCreate(); - listNode *node, *nextnode; long count = 10; sds pat = NULL; sds type = NULL; @@ -1014,6 +1014,46 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } } + if (o == nullptr && count > 100) + { + // Do an async version + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); + if (snapshot != nullptr) + { + aeEventLoop *el = serverTL->el; + blockClient(c, BLOCKED_ASYNC); + redisDb *db = c->db; + sds patCopy = pat ? sdsdup(pat) : nullptr; + sds typeCopy = type ? sdsdup(type) : nullptr; + g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ + auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr); + + aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ + aeReleaseLock(); // we need to lock with coordination of the client + + std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); + + unblockClient(c); + scanFilterAndReply(c, keys, patCopy, typeCopy, use_pattern, nullptr, cursorResult); + if (patCopy != nullptr) + sdsfree(patCopy); + if (typeCopy != nullptr) + sdsfree(typeCopy); + + db->endSnapshot(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + aeAcquireLock(); + }); + }); + return; + } + } + /* Step 2: Iterate the collection. * * Note that if the object is encoded with a ziplist, intset, or any other @@ -1080,6 +1120,18 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { serverPanic("Not handled encoding in SCAN."); } + scanFilterAndReply(c, keys, pat, type, use_pattern, o, cursor); + +cleanup: + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); +} + +void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_pattern, robj_roptr o, unsigned long cursor) +{ + listNode *node, *nextnode; + int patlen = (pat != nullptr) ? sdslen(pat) : 0; + /* Step 3: Filter elements. */ node = listFirst(keys); while (node) { @@ -1144,10 +1196,6 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { decrRefCount(kobj); listDelNode(keys, node); } - -cleanup: - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); } /* The SCAN command completely relies on scanGenericCommand. */ diff --git a/src/server.h b/src/server.h index ad15c319a..1aab7bb85 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,6 +1395,8 @@ public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; + unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys, struct scan_callback_data *pdata = nullptr) const; + using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::endSnapshotAsync; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index cb65eb30a..229c56a4f 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -388,6 +388,61 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const return dict_iter(de); } +struct scan_callback_data +{ + dict *dictTombstone; + list *keys; +}; +void snapshot_scan_callback(void *privdata, const dictEntry *de) +{ + scan_callback_data *data = (scan_callback_data*)privdata; + if (data->dictTombstone != nullptr && dictFind(data->dictTombstone, dictGetKey(de)) != nullptr) + return; + + sds sdskey = (sds)dictGetKey(de); + listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); +} +unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys, scan_callback_data *pdata) const +{ + unsigned long iteratorReturn = 0; + + scan_callback_data dataT; + if (pdata == nullptr) + { + dataT.dictTombstone = m_pdictTombstone; + dataT.keys = keys; + pdata = &dataT; + } + + const redisDbPersistentDataSnapshot *psnapshot; + __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); + if (psnapshot != nullptr) + { + // Always process the snapshot first as we assume its bigger than we are + iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys, pdata); + } + + if (psnapshot == nullptr) + { + long maxiterations = count * 10; // allow more iterations than keys for sparse tables + iteratorReturn = iterator; + do { + iteratorReturn = dictScan(m_pdict, iteratorReturn, snapshot_scan_callback, NULL, pdata); + } while (iteratorReturn && + maxiterations-- && + listLength(keys) < (unsigned long)count); + } + else + { + // Just catch up with our snapshot + do + { + iterator = dictScan(m_pdict, iterator, snapshot_scan_callback, nullptr, pdata); + } while (iterator != 0 && iterator < iteratorReturn); + } + return iteratorReturn; +} + bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly, bool fCacheOnly) const { // Take the size so we can ensure we visited every element exactly once