MVCC scan support filtering by type on the async thread

Former-commit-id: 14f8c0ff686b93976eead5fa6bf526c2eecb5ae0
This commit is contained in:
John Sully 2020-07-10 03:43:56 +00:00
parent 3692771457
commit 2192c97d35
3 changed files with 18 additions and 12 deletions

View File

@ -1046,7 +1046,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
sds patCopy = pat ? sdsdup(pat) : nullptr; sds patCopy = pat ? sdsdup(pat) : nullptr;
sds typeCopy = type ? sdsdup(type) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr;
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys); auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys);
if (use_pattern) { if (use_pattern) {
listNode *ln = listFirst(keys); listNode *ln = listFirst(keys);
int patlen = sdslen(patCopy); int patlen = sdslen(patCopy);
@ -1060,8 +1060,12 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
ln = next; ln = next;
} }
} }
if (patCopy != nullptr)
sdsfree(patCopy);
if (typeCopy != nullptr)
sdsfree(typeCopy);
aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{ aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{
aeReleaseLock(); // we need to lock with coordination of the client aeReleaseLock(); // we need to lock with coordination of the client
std::unique_lock<decltype(c->lock)> lock(c->lock); std::unique_lock<decltype(c->lock)> lock(c->lock);
@ -1069,11 +1073,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
locker.arm(c); locker.arm(c);
unblockClient(c); unblockClient(c);
scanFilterAndReply(c, keys, nullptr, typeCopy, false, nullptr, cursorResult); scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
if (patCopy != nullptr)
sdsfree(patCopy);
if (typeCopy != nullptr)
sdsfree(typeCopy);
db->endSnapshot(snapshot); db->endSnapshot(snapshot);
listSetFreeMethod(keys,decrRefCountVoid); listSetFreeMethod(keys,decrRefCountVoid);
@ -1111,7 +1111,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
if (ht) { if (ht) {
if (ht == c->db->dictUnsafeKeyOnly()) if (ht == c->db->dictUnsafeKeyOnly())
{ {
cursor = c->db->scan_threadsafe(cursor, count, keys); cursor = c->db->scan_threadsafe(cursor, count, nullptr, keys);
} }
else else
{ {

View File

@ -1395,7 +1395,7 @@ public:
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const; bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
unsigned long scan_threadsafe(unsigned long iterator, long count, list *keys) const; unsigned long scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const;
using redisDbPersistentData::createSnapshot; using redisDbPersistentData::createSnapshot;
using redisDbPersistentData::endSnapshot; using redisDbPersistentData::endSnapshot;

View File

@ -20,7 +20,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
if (m_spdbSnapshotHOLDER != nullptr) if (m_spdbSnapshotHOLDER != nullptr)
{ {
serverLog(LL_DEBUG, "Attempting reuse of snapshot, client tstamp: %llu snapshot tstamp: %llu", mvccCheckpoint, m_spdbSnapshotHOLDER->m_mvccCheckpoint);
// If possible reuse an existing snapshot (we want to minimize nesting) // If possible reuse an existing snapshot (we want to minimize nesting)
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->m_mvccCheckpoint)
{ {
@ -395,6 +394,7 @@ dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const
struct scan_callback_data struct scan_callback_data
{ {
dict *dictTombstone; dict *dictTombstone;
sds type;
list *keys; list *keys;
}; };
void snapshot_scan_callback(void *privdata, const dictEntry *de) void snapshot_scan_callback(void *privdata, const dictEntry *de)
@ -404,22 +404,28 @@ void snapshot_scan_callback(void *privdata, const dictEntry *de)
return; return;
sds sdskey = (sds)dictGetKey(de); sds sdskey = (sds)dictGetKey(de);
if (data->type != nullptr)
{
if (strcasecmp(data->type, getObjectTypeName((robj*)dictGetVal(de))) != 0)
return;
}
listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey))); listAddNodeHead(data->keys, createStringObject(sdskey, sdslen(sdskey)));
} }
unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, list *keys) const unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long iterator, long count, sds type, list *keys) const
{ {
unsigned long iteratorReturn = 0; unsigned long iteratorReturn = 0;
scan_callback_data data; scan_callback_data data;
data.dictTombstone = m_pdictTombstone; data.dictTombstone = m_pdictTombstone;
data.keys = keys; data.keys = keys;
data.type = type;
const redisDbPersistentDataSnapshot *psnapshot; const redisDbPersistentDataSnapshot *psnapshot;
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
if (psnapshot != nullptr) if (psnapshot != nullptr)
{ {
// Always process the snapshot first as we assume its bigger than we are // Always process the snapshot first as we assume its bigger than we are
iteratorReturn = psnapshot->scan_threadsafe(iterator, count, keys); iteratorReturn = psnapshot->scan_threadsafe(iterator, count, type, keys);
// Just catch up with our snapshot // Just catch up with our snapshot
do do