diff --git a/src/server.cpp b/src/server.cpp index 7bedf8747..3e8517ad8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,24 +4952,35 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } -void client::asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn) { +bool client::asyncCommand(std::function &&preFn, + std::function &&mainFn, + std::function &&postFn) +{ + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */); + if (snapshot == nullptr) { + return false; + } aeEventLoop *el = serverTL->el; blockClient(this, BLOCKED_ASYNC); - g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] { - preFn(); - aePostFunction(el, [this, mainFn, postFn] { + g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { + void *preData = preFn(snapshot); + aePostFunction(el, [this, mainFn, postFn, snapshot, preData] { aeReleaseLock(); std::unique_locklock)> lock(this->lock); AeLocker locker; locker.arm(this); unblockClient(this); - mainFn(); + mainFn(snapshot, preData); locker.disarm(); lock.unlock(); - postFn(); + postFn(snapshot, preData); + this->db->endSnapshotAsync(snapshot); aeAcquireLock(); }); }); + return true; } /* ====================== Error lookup and execution ===================== */ diff --git a/src/server.h b/src/server.h index 0911e860e..e617bafb6 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,7 +1661,9 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; - void asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn); + bool asyncCommand(std::function &&preFn, + std::function &&mainFn, + std::function &&postFn); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 4f965e44f..dc8adccda 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -524,58 +524,59 @@ void getrangeCommand(client *c) { } } +list *mgetKeysFromClient(client *c) { + list *keys = listCreate(); + for (int j = 1; j < c->argc; j++) { + incrRefCount(c->argv[j]); + listAddNodeTail(keys, c->argv[j]); + } + return keys; +} + +void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o; + if (snapshot) + o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + else + o = lookupKeyRead(c->db,(robj*)listNodeValue(ln)); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); + } + ln = ln->next; + } +} + +void mgetClearKeys(list *keys) { + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); +} + void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { - const redisDbPersistentDataSnapshot *snapshot = nullptr; - if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) - snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); - if (snapshot != nullptr) { - list *keys = listCreate(); - redisDb *db = c->db; - c->asyncCommand( - [c, keys] { - for (int j = 1; j < c->argc; j++) { - incrRefCount(c->argv[j]); - listAddNodeTail(keys, c->argv[j]); - } + if (c->asyncCommand( + [c] (const redisDbPersistentDataSnapshot *snapshot) { + return mgetKeysFromClient(c); }, - [c, keys, snapshot] { - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { - robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); - if (o == nullptr || o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - ln = ln->next; - } + [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + mgetCore(c, (list *)keys, snapshot); }, - [keys, snapshot, db] { - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); + [] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + mgetClearKeys((list *)keys); } - ); + )) { return; } } - - addReplyArrayLen(c,c->argc-1); - for (int j = 1; j < c->argc; j++) { - robj_roptr o = lookupKeyRead(c->db,c->argv[j]); - if (o == nullptr) { - addReplyNull(c); - } else { - if (o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - } - } + + list *keys = mgetKeysFromClient(c); + mgetCore(c, keys); + mgetClearKeys(keys); } void msetGenericCommand(client *c, int nx) {