diff --git a/src/server.cpp b/src/server.cpp index 05ca231bd..7bedf8747 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,6 +4952,26 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } +void client::asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn) { + aeEventLoop *el = serverTL->el; + blockClient(this, BLOCKED_ASYNC); + g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] { + preFn(); + aePostFunction(el, [this, mainFn, postFn] { + aeReleaseLock(); + std::unique_locklock)> lock(this->lock); + AeLocker locker; + locker.arm(this); + unblockClient(this); + mainFn(); + locker.disarm(); + lock.unlock(); + postFn(); + aeAcquireLock(); + }); + }); +} + /* ====================== Error lookup and execution ===================== */ void incrementErrorCount(const char *fullerr, size_t namelen) { diff --git a/src/server.h b/src/server.h index b18989603..0911e860e 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,6 +1661,7 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; + void asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 7ffe24eff..3d40a2a62 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -526,47 +526,39 @@ void getrangeCommand(client *c) { void mgetCommand(client *c) { // Do async version for large number of arguments - if (c->argc > 100) { + if (c->argc > 1) { const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); if (snapshot != nullptr) { list *keys = listCreate(); - aeEventLoop *el = serverTL->el; - blockClient(c, BLOCKED_ASYNC); redisDb *db = c->db; - g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] { + c->asyncCommand( + [c, keys] { for (int j = 1; j < c->argc; j++) { incrRefCount(c->argv[j]); listAddNodeTail(keys, c->argv[j]); } - aePostFunction(el, [c, keys, snapshot, db] { - aeReleaseLock(); - std::unique_locklock)> lock(c->lock); - AeLocker locker; - locker.arm(c); - unblockClient(c); - - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { - robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); - if (o == nullptr || o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - ln = ln->next; + }, + [c, keys, snapshot] { + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); } - - locker.disarm(); - lock.unlock(); - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); - aeAcquireLock(); - }); - }); + ln = ln->next; + } + }, + [keys, snapshot, db] { + db->endSnapshotAsync(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + } + ); return; } }