add async mget
Former-commit-id: c7bd2327f8e4330e2bcd857fc87ce7e86e075d20
This commit is contained in:
parent
ebbf84fb15
commit
6f7283c065
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include <cmath> /* isnan(), isinf() */
|
#include <cmath> /* isnan(), isinf() */
|
||||||
|
#include "aelocker.h"
|
||||||
|
|
||||||
/* Forward declarations */
|
/* Forward declarations */
|
||||||
int getGenericCommand(client *c);
|
int getGenericCommand(client *c);
|
||||||
@ -524,10 +525,54 @@ void getrangeCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void mgetCommand(client *c) {
|
void mgetCommand(client *c) {
|
||||||
int j;
|
// Do async version for large number of arguments
|
||||||
|
if (c->argc > 100) {
|
||||||
|
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
||||||
|
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
||||||
|
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
|
||||||
|
if (snapshot != nullptr) {
|
||||||
|
list *keys = listCreate();
|
||||||
|
aeEventLoop *el = serverTL->el;
|
||||||
|
blockClient(c, BLOCKED_ASYNC);
|
||||||
|
redisDb *db = c->db;
|
||||||
|
g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] {
|
||||||
|
for (int j = 1; j < c->argc; j++) {
|
||||||
|
incrRefCount(c->argv[j]);
|
||||||
|
listAddNodeTail(keys, c->argv[j]);
|
||||||
|
}
|
||||||
|
aePostFunction(el, [c, keys, snapshot, db] {
|
||||||
|
aeReleaseLock();
|
||||||
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
|
AeLocker locker;
|
||||||
|
locker.arm(c);
|
||||||
|
unblockClient(c);
|
||||||
|
|
||||||
|
addReplyArrayLen(c,listLength(keys));
|
||||||
|
listNode *ln = listFirst(keys);
|
||||||
|
while (ln != nullptr) {
|
||||||
|
robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val();
|
||||||
|
if (o == nullptr || o->type != OBJ_STRING) {
|
||||||
|
addReplyNull(c);
|
||||||
|
} else {
|
||||||
|
addReplyBulk(c,o);
|
||||||
|
}
|
||||||
|
ln = ln->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
locker.disarm();
|
||||||
|
lock.unlock();
|
||||||
|
db->endSnapshotAsync(snapshot);
|
||||||
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
|
listRelease(keys);
|
||||||
|
aeAcquireLock();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
addReplyArrayLen(c,c->argc-1);
|
addReplyArrayLen(c,c->argc-1);
|
||||||
for (j = 1; j < c->argc; j++) {
|
for (int j = 1; j < c->argc; j++) {
|
||||||
robj_roptr o = lookupKeyRead(c->db,c->argv[j]);
|
robj_roptr o = lookupKeyRead(c->db,c->argv[j]);
|
||||||
if (o == nullptr) {
|
if (o == nullptr) {
|
||||||
addReplyNull(c);
|
addReplyNull(c);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user