Do actual filtering on the async thread

Former-commit-id: 64b17139b7b5ff55bb52f27dc6f91cb81adde031
This commit is contained in:
John Sully 2020-07-09 21:58:06 +00:00
parent d3b72484e4
commit ecea6ffe67

View File

@ -950,6 +950,24 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
return C_OK;
}
static bool filterKey(robj_roptr kobj, sds pat, int patlen)
{
bool filter = false;
if (sdsEncodedObject(kobj)) {
if (!stringmatchlen(pat, patlen, szFromObj(kobj), sdslen(szFromObj(kobj)), 0))
filter = true;
} else {
char buf[LONG_STR_SIZE];
int len;
serverAssert(kobj->encoding == OBJ_ENCODING_INT);
len = ll2string(buf,sizeof(buf),(long)ptrFromObj(kobj));
if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = true;
}
return filter;
}
/* This command implements SCAN, HSCAN and SSCAN commands.
* If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
* if 'o' is NULL the command will operate on the dictionary associated with
@ -1029,6 +1047,19 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
sds typeCopy = type ? sdsdup(type) : nullptr;
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
auto cursorResult = snapshot->scan_threadsafe(cursor, count, keys, nullptr);
if (use_pattern) {
listNode *ln = listFirst(keys);
int patlen = sdslen(patCopy);
while (ln != nullptr)
{
listNode *next = ln->next;
if (filterKey((robj*)listNodeValue(ln), patCopy, patlen))
{
listDelNode(keys, ln);
}
ln = next;
}
}
aePostFunction(el, [c, snapshot, keys, db, cursorResult, patCopy, typeCopy, use_pattern]{
aeReleaseLock(); // we need to lock with coordination of the client
@ -1038,7 +1069,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
locker.arm(c);
unblockClient(c);
scanFilterAndReply(c, keys, patCopy, typeCopy, use_pattern, nullptr, cursorResult);
scanFilterAndReply(c, keys, nullptr, typeCopy, false, nullptr, cursorResult);
if (patCopy != nullptr)
sdsfree(patCopy);
if (typeCopy != nullptr)
@ -1141,17 +1172,8 @@ void scanFilterAndReply(client *c, list *keys, sds pat, sds type, int use_patter
/* Filter element if it does not match the pattern. */
if (!filter && use_pattern) {
if (sdsEncodedObject(kobj)) {
if (!stringmatchlen(pat, patlen, szFromObj(kobj), sdslen(szFromObj(kobj)), 0))
filter = 1;
} else {
char buf[LONG_STR_SIZE];
int len;
serverAssert(kobj->encoding == OBJ_ENCODING_INT);
len = ll2string(buf,sizeof(buf),(long)ptrFromObj(kobj));
if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
}
if (filterKey(kobj, pat, patlen))
filter = 1;
}
/* Filter an element if it isn't the type we want. */