refactor async to offload common code to client

Former-commit-id: 9a7547bfaa0ceff76e604262913fb11a64c627d8
This commit is contained in:
malavan 2021-08-25 20:06:06 +00:00
parent cb8f55c057
commit dcd3d47f7f
3 changed files with 43 additions and 30 deletions

View File

@ -4952,6 +4952,26 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
}, fLock) == AE_OK; }, fLock) == AE_OK;
} }
void client::asyncCommand(std::function<void()> &&preFn, std::function<void()> &&mainFn, std::function<void()> &&postFn) {
aeEventLoop *el = serverTL->el;
blockClient(this, BLOCKED_ASYNC);
g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] {
preFn();
aePostFunction(el, [this, mainFn, postFn] {
aeReleaseLock();
std::unique_lock<decltype(this->lock)> lock(this->lock);
AeLocker locker;
locker.arm(this);
unblockClient(this);
mainFn();
locker.disarm();
lock.unlock();
postFn();
aeAcquireLock();
});
});
}
/* ====================== Error lookup and execution ===================== */ /* ====================== Error lookup and execution ===================== */
void incrementErrorCount(const char *fullerr, size_t namelen) { void incrementErrorCount(const char *fullerr, size_t namelen) {

View File

@ -1661,6 +1661,7 @@ struct client {
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true); bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const; size_t argv_len_sum() const;
void asyncCommand(std::function<void()> &&preFn, std::function<void()> &&mainFn, std::function<void()> &&postFn);
}; };
struct saveparam { struct saveparam {

View File

@ -526,47 +526,39 @@ void getrangeCommand(client *c) {
void mgetCommand(client *c) { void mgetCommand(client *c) {
// Do async version for large number of arguments // Do async version for large number of arguments
if (c->argc > 100) { if (c->argc > 1) {
const redisDbPersistentDataSnapshot *snapshot = nullptr; const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
if (snapshot != nullptr) { if (snapshot != nullptr) {
list *keys = listCreate(); list *keys = listCreate();
aeEventLoop *el = serverTL->el;
blockClient(c, BLOCKED_ASYNC);
redisDb *db = c->db; redisDb *db = c->db;
g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] { c->asyncCommand(
[c, keys] {
for (int j = 1; j < c->argc; j++) { for (int j = 1; j < c->argc; j++) {
incrRefCount(c->argv[j]); incrRefCount(c->argv[j]);
listAddNodeTail(keys, c->argv[j]); listAddNodeTail(keys, c->argv[j]);
} }
aePostFunction(el, [c, keys, snapshot, db] { },
aeReleaseLock(); [c, keys, snapshot] {
std::unique_lock<decltype(c->lock)> lock(c->lock); addReplyArrayLen(c,listLength(keys));
AeLocker locker; listNode *ln = listFirst(keys);
locker.arm(c); while (ln != nullptr) {
unblockClient(c); robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val();
if (o == nullptr || o->type != OBJ_STRING) {
addReplyArrayLen(c,listLength(keys)); addReplyNull(c);
listNode *ln = listFirst(keys); } else {
while (ln != nullptr) { addReplyBulk(c,o);
robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val();
if (o == nullptr || o->type != OBJ_STRING) {
addReplyNull(c);
} else {
addReplyBulk(c,o);
}
ln = ln->next;
} }
ln = ln->next;
locker.disarm(); }
lock.unlock(); },
db->endSnapshotAsync(snapshot); [keys, snapshot, db] {
listSetFreeMethod(keys,decrRefCountVoid); db->endSnapshotAsync(snapshot);
listRelease(keys); listSetFreeMethod(keys,decrRefCountVoid);
aeAcquireLock(); listRelease(keys);
}); }
}); );
return; return;
} }
} }