From 2192c97d35bbf18c6b7ae6251249d20dd922ddbd Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 10 Jul 2020 03:43:56 +0000 Subject: [PATCH] MVCC scan support filtering by type on the async thread Former-commit-id: 14f8c0ff686b93976eead5fa6bf526c2eecb5ae0 --- src/db.cpp | 16 ++++++++-------- src/server.h | 2 +- src/snapshot.cpp | 12 +++++++++--- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 1ce382686..39ba16cb9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1046,7 +1046,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ - auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys); + auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); if (use_pattern) { listNode *ln = listFirst(keys); int patlen = sdslen(patCopy); @@ -1060,8 +1060,12 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { ln = next; } } + if (patCopy != nullptr) + sdsfree(patCopy); + if (typeCopy != nullptr) + sdsfree(typeCopy); - aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ + aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{ aeReleaseLock(); // we need to lock with coordination of the client std::unique_locklock)> lock(c->lock); @@ -1069,11 +1073,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { locker.arm(c); unblockClient(c); - scanFilterAndReply(c, keys, nullptr, typeCopy, false, nullptr, cursorResult); - if (patCopy != nullptr) - sdsfree(patCopy); - if (typeCopy != nullptr) - sdsfree(typeCopy); + scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); db->endSnapshot(snapshot); listSetFreeMethod(keys,decrRefCountVoid); @@ -1111,7 +1111,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { if (ht) { if (ht == c->db->dictUnsafeKeyOnly()) { - cursor = c->db->scan_threadsafe(cursor, count, keys); + cursor = c->db->scan_threadsafe(cursor, count, nullptr, keys); } else { diff --git a/src/server.h b/src/server.h index a3759cba8..81f47304c 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,7 +1395,7 @@ public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; - unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys) const; + unsigned long scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index b52edd2e1..7cbbf94a2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -20,7 +20,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 if (m_spdbSnapshotHOLDER != nullptr) { - serverLog(LL_DEBUG, "Attempting reuse of snapshot, client tstamp: %llu snapshot tstamp: %llu", mvccCheckpoint, m_spdbSnapshotHOLDER->m_mvccCheckpoint); // If possible reuse an existing snapshot (we want to minimize nesting) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint) { @@ -395,6 +394,7 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const struct scan_callback_data { dict *dictTombstone; + sds type; list *keys; }; void snapshot_scan_callback(void *privdata, const dictEntry *de) @@ -404,22 +404,28 @@ void snapshot_scan_callback(void *privdata, const dictEntry *de) return; sds sdskey = (sds)dictGetKey(de); + if (data->type != nullptr) + { + if (strcasecmp(data->type, getObjectTypeName((robj*)dictGetVal(de))) != 0) + return; + } listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); } -unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys) const +unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const { unsigned long iteratorReturn = 0; scan_callback_data data; data.dictTombstone = m_pdictTombstone; data.keys = keys; + data.type = type; const redisDbPersistentDataSnapshot *psnapshot; __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); if (psnapshot != nullptr) { // Always process the snapshot first as we assume its bigger than we are - iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys); + iteratorReturn = psnapshot->scan_threadsafe(iterator, count, type, keys); // Just catch up with our snapshot do