diff --git a/src/db.cpp b/src/db.cpp index cdbeb3d38..357559a38 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1184,7 +1184,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { { // Do an async version if (c->asyncCommand( - [c, cursor, keys, pat, type, use_pattern, count] (const redisDbPersistentDataSnapshot * snapshot) { + [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, std::vector) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); @@ -1207,16 +1207,13 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sdsfree(patCopy); if (typeCopy != nullptr) sdsfree(typeCopy); - return (void *)cursorResult; - }, - [c, keys] (const redisDbPersistentDataSnapshot *, void *data) { mstime_t timeScanFilter; latencyStartMonitor(timeScanFilter); - scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data); + scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); latencyEndMonitor(timeScanFilter); latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); }, - [keys] (const redisDbPersistentDataSnapshot *, void *) { + [keys] (const redisDbPersistentDataSnapshot *) { listSetFreeMethod(keys,decrRefCountVoid); listRelease(keys); } diff --git a/src/server.cpp b/src/server.cpp index 94c95d500..573fdcb3e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,23 +4952,16 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } -list *client::argsAsList() { - list *args = listCreate(); +std::vector client::args() { + std::vector args; for (int j = 1; j < this->argc; j++) { - incrRefCount(this->argv[j]); - listAddNodeTail(args, this->argv[j]); + args.push_back(robj_sharedptr(argv[j])); } return args; } -void client::freeArgList(list* args) { - listSetFreeMethod(args,decrRefCountVoid); - listRelease(args); -} - -bool client::asyncCommand(std::function &&preFn, - std::function &&mainFn, - std::function &&postFn) +bool client::asyncCommand(std::function)> &&mainFn, + std::function &&postFn) { serverAssert(FCorrectThread(this)); const redisDbPersistentDataSnapshot *snapshot = nullptr; @@ -4979,18 +4972,19 @@ bool client::asyncCommand(std::functionel; blockClient(this, BLOCKED_ASYNC); - g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { - void *preData = preFn(snapshot); - aePostFunction(el, [this, mainFn, postFn, snapshot, preData] { + g_pserver->asyncworkqueue->AddWorkFunction([el, this, mainFn, postFn, snapshot] { + std::vector args = this->args(); + aePostFunction(el, [this, mainFn, postFn, snapshot, args] { aeReleaseLock(); std::unique_locklock)> lock(this->lock); AeLocker locker; locker.arm(this); unblockClient(this); - mainFn(snapshot, preData); + mainFn(snapshot, args); locker.disarm(); lock.unlock(); - postFn(snapshot, preData); + if (postFn) + postFn(snapshot); this->db->endSnapshotAsync(snapshot); aeAcquireLock(); }); diff --git a/src/server.h b/src/server.h index d1d22dbd1..101ca34eb 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,11 +1661,9 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; - list *argsAsList(); - void freeArgList(list* args); - bool asyncCommand(std::function &&preFn, - std::function &&mainFn, - std::function &&postFn); + std::vector args(); + bool asyncCommand(std::function)> &&mainFn, + std::function &&postFn = nullptr); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 5823738c9..0e555351e 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -524,21 +524,19 @@ void getrangeCommand(client *c) { } } -void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { +void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnapshot *snapshot = nullptr) { + addReplyArrayLen(c,count); + for (int i = 0; i < count; i++) { robj_roptr o; if (snapshot) - o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + o = snapshot->find_cached_threadsafe(szFromObj(keys[i])).val(); else - o = lookupKeyRead(c->db,(robj*)listNodeValue(ln)); + o = lookupKeyRead(c->db,keys[i]); if (o == nullptr || o->type != OBJ_STRING) { addReplyNull(c); } else { addReplyBulk(c,o); } - ln = ln->next; } } @@ -546,23 +544,15 @@ void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { if (c->asyncCommand( - [c] (const redisDbPersistentDataSnapshot *) { - return c->argsAsList(); - }, - [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { - mgetCore(c, (list *)keys, snapshot); - }, - [c] (const redisDbPersistentDataSnapshot *, void *keys) { - c->freeArgList((list *)keys); + [c] (const redisDbPersistentDataSnapshot *snapshot, std::vector keys) { + mgetCore(c, (robj **)keys.data(), keys.size(), snapshot); } )) { return; } } - list *keys = c->argsAsList(); - mgetCore(c, keys); - c->freeArgList(keys); + mgetCore(c, c->argv + 1, c->argc - 1); } void msetGenericCommand(client *c, int nx) {