refactor of asyncCommand to include snapshot creation

Former-commit-id: c0908362162b5f2834b90cd9ce84fd1ee6768834
This commit is contained in:
malavan 2021-08-26 13:10:59 +00:00
parent 3a988e0232
commit d658891bf0
3 changed files with 64 additions and 50 deletions

View File

@ -4952,24 +4952,35 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
}, fLock) == AE_OK;
}
void client::asyncCommand(std::function<void()> &&preFn, std::function<void()> &&mainFn, std::function<void()> &&postFn) {
bool client::asyncCommand(std::function<void *(const redisDbPersistentDataSnapshot *)> &&preFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&postFn)
{
const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */);
if (snapshot == nullptr) {
return false;
}
aeEventLoop *el = serverTL->el;
blockClient(this, BLOCKED_ASYNC);
g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] {
preFn();
aePostFunction(el, [this, mainFn, postFn] {
g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] {
void *preData = preFn(snapshot);
aePostFunction(el, [this, mainFn, postFn, snapshot, preData] {
aeReleaseLock();
std::unique_lock<decltype(this->lock)> lock(this->lock);
AeLocker locker;
locker.arm(this);
unblockClient(this);
mainFn();
mainFn(snapshot, preData);
locker.disarm();
lock.unlock();
postFn();
postFn(snapshot, preData);
this->db->endSnapshotAsync(snapshot);
aeAcquireLock();
});
});
return true;
}
/* ====================== Error lookup and execution ===================== */

View File

@ -1661,7 +1661,9 @@ struct client {
// post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const;
void asyncCommand(std::function<void()> &&preFn, std::function<void()> &&mainFn, std::function<void()> &&postFn);
bool asyncCommand(std::function<void *(const redisDbPersistentDataSnapshot *)> &&preFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&postFn);
};
struct saveparam {

View File

@ -524,58 +524,59 @@ void getrangeCommand(client *c) {
}
}
list *mgetKeysFromClient(client *c) {
list *keys = listCreate();
for (int j = 1; j < c->argc; j++) {
incrRefCount(c->argv[j]);
listAddNodeTail(keys, c->argv[j]);
}
return keys;
}
void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) {
addReplyArrayLen(c,listLength(keys));
listNode *ln = listFirst(keys);
while (ln != nullptr) {
robj_roptr o;
if (snapshot)
o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val();
else
o = lookupKeyRead(c->db,(robj*)listNodeValue(ln));
if (o == nullptr || o->type != OBJ_STRING) {
addReplyNull(c);
} else {
addReplyBulk(c,o);
}
ln = ln->next;
}
}
void mgetClearKeys(list *keys) {
listSetFreeMethod(keys,decrRefCountVoid);
listRelease(keys);
}
void mgetCommand(client *c) {
// Do async version for large number of arguments
if (c->argc > 100) {
const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
if (snapshot != nullptr) {
list *keys = listCreate();
redisDb *db = c->db;
c->asyncCommand(
[c, keys] {
for (int j = 1; j < c->argc; j++) {
incrRefCount(c->argv[j]);
listAddNodeTail(keys, c->argv[j]);
}
if (c->asyncCommand(
[c] (const redisDbPersistentDataSnapshot *snapshot) {
return mgetKeysFromClient(c);
},
[c, keys, snapshot] {
addReplyArrayLen(c,listLength(keys));
listNode *ln = listFirst(keys);
while (ln != nullptr) {
robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val();
if (o == nullptr || o->type != OBJ_STRING) {
addReplyNull(c);
} else {
addReplyBulk(c,o);
}
ln = ln->next;
}
[c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) {
mgetCore(c, (list *)keys, snapshot);
},
[keys, snapshot, db] {
db->endSnapshotAsync(snapshot);
listSetFreeMethod(keys,decrRefCountVoid);
listRelease(keys);
[] (const redisDbPersistentDataSnapshot *snapshot, void *keys) {
mgetClearKeys((list *)keys);
}
);
)) {
return;
}
}
addReplyArrayLen(c,c->argc-1);
for (int j = 1; j < c->argc; j++) {
robj_roptr o = lookupKeyRead(c->db,c->argv[j]);
if (o == nullptr) {
addReplyNull(c);
} else {
if (o->type != OBJ_STRING) {
addReplyNull(c);
} else {
addReplyBulk(c,o);
}
}
}
list *keys = mgetKeysFromClient(c);
mgetCore(c, keys);
mgetClearKeys(keys);
}
void msetGenericCommand(client *c, int nx) {