diff --git a/src/t_string.cpp b/src/t_string.cpp index c1f2e134a..7ffe24eff 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -29,6 +29,7 @@ #include "server.h" #include /* isnan(), isinf() */ +#include "aelocker.h" /* Forward declarations */ int getGenericCommand(client *c); @@ -524,10 +525,54 @@ void getrangeCommand(client *c) { } void mgetCommand(client *c) { - int j; + // Do async version for large number of arguments + if (c->argc > 100) { + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); + if (snapshot != nullptr) { + list *keys = listCreate(); + aeEventLoop *el = serverTL->el; + blockClient(c, BLOCKED_ASYNC); + redisDb *db = c->db; + g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] { + for (int j = 1; j < c->argc; j++) { + incrRefCount(c->argv[j]); + listAddNodeTail(keys, c->argv[j]); + } + aePostFunction(el, [c, keys, snapshot, db] { + aeReleaseLock(); + std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); + unblockClient(c); + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); + } + ln = ln->next; + } + + locker.disarm(); + lock.unlock(); + db->endSnapshotAsync(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + aeAcquireLock(); + }); + }); + return; + } + } + addReplyArrayLen(c,c->argc-1); - for (j = 1; j < c->argc; j++) { + for (int j = 1; j < c->argc; j++) { robj_roptr o = lookupKeyRead(c->db,c->argv[j]); if (o == nullptr) { addReplyNull(c);