refactor asyncCommand

Former-commit-id: 6af5775e01872f130bc18791fdb4c0b22507b37f
This commit is contained in:
malavan 2021-09-01 20:18:41 +00:00
parent 1fe8506536
commit 3239f678d5
4 changed files with 25 additions and 46 deletions

View File

@ -1184,7 +1184,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
{ {
// Do an async version // Do an async version
if (c->asyncCommand( if (c->asyncCommand(
[c, cursor, keys, pat, type, use_pattern, count] (const redisDbPersistentDataSnapshot * snapshot) { [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, std::vector<robj_sharedptr>) {
sds patCopy = pat ? sdsdup(pat) : nullptr; sds patCopy = pat ? sdsdup(pat) : nullptr;
sds typeCopy = type ? sdsdup(type) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr;
auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys);
@ -1207,16 +1207,13 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
sdsfree(patCopy); sdsfree(patCopy);
if (typeCopy != nullptr) if (typeCopy != nullptr)
sdsfree(typeCopy); sdsfree(typeCopy);
return (void *)cursorResult;
},
[c, keys] (const redisDbPersistentDataSnapshot *, void *data) {
mstime_t timeScanFilter; mstime_t timeScanFilter;
latencyStartMonitor(timeScanFilter); latencyStartMonitor(timeScanFilter);
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data); scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
latencyEndMonitor(timeScanFilter); latencyEndMonitor(timeScanFilter);
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
}, },
[keys] (const redisDbPersistentDataSnapshot *, void *) { [keys] (const redisDbPersistentDataSnapshot *) {
listSetFreeMethod(keys,decrRefCountVoid); listSetFreeMethod(keys,decrRefCountVoid);
listRelease(keys); listRelease(keys);
} }

View File

@ -4952,23 +4952,16 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
}, fLock) == AE_OK; }, fLock) == AE_OK;
} }
list *client::argsAsList() { std::vector<robj_sharedptr> client::args() {
list *args = listCreate(); std::vector<robj_sharedptr> args;
for (int j = 1; j < this->argc; j++) { for (int j = 1; j < this->argc; j++) {
incrRefCount(this->argv[j]); args.push_back(robj_sharedptr(argv[j]));
listAddNodeTail(args, this->argv[j]);
} }
return args; return args;
} }
void client::freeArgList(list* args) { bool client::asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, std::vector<robj_sharedptr>)> &&mainFn,
listSetFreeMethod(args,decrRefCountVoid); std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn)
listRelease(args);
}
bool client::asyncCommand(std::function<void *(const redisDbPersistentDataSnapshot *)> &&preFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&postFn)
{ {
serverAssert(FCorrectThread(this)); serverAssert(FCorrectThread(this));
const redisDbPersistentDataSnapshot *snapshot = nullptr; const redisDbPersistentDataSnapshot *snapshot = nullptr;
@ -4979,18 +4972,19 @@ bool client::asyncCommand(std::function<void *(const redisDbPersistentDataSnapsh
} }
aeEventLoop *el = serverTL->el; aeEventLoop *el = serverTL->el;
blockClient(this, BLOCKED_ASYNC); blockClient(this, BLOCKED_ASYNC);
g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { g_pserver->asyncworkqueue->AddWorkFunction([el, this, mainFn, postFn, snapshot] {
void *preData = preFn(snapshot); std::vector<robj_sharedptr> args = this->args();
aePostFunction(el, [this, mainFn, postFn, snapshot, preData] { aePostFunction(el, [this, mainFn, postFn, snapshot, args] {
aeReleaseLock(); aeReleaseLock();
std::unique_lock<decltype(this->lock)> lock(this->lock); std::unique_lock<decltype(this->lock)> lock(this->lock);
AeLocker locker; AeLocker locker;
locker.arm(this); locker.arm(this);
unblockClient(this); unblockClient(this);
mainFn(snapshot, preData); mainFn(snapshot, args);
locker.disarm(); locker.disarm();
lock.unlock(); lock.unlock();
postFn(snapshot, preData); if (postFn)
postFn(snapshot);
this->db->endSnapshotAsync(snapshot); this->db->endSnapshotAsync(snapshot);
aeAcquireLock(); aeAcquireLock();
}); });

View File

@ -1661,11 +1661,9 @@ struct client {
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true); bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const; size_t argv_len_sum() const;
list *argsAsList(); std::vector<robj_sharedptr> args();
void freeArgList(list* args); bool asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, std::vector<robj_sharedptr>)> &&mainFn,
bool asyncCommand(std::function<void *(const redisDbPersistentDataSnapshot *)> &&preFn, std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn = nullptr);
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *, void *)> &&postFn);
}; };
struct saveparam { struct saveparam {

View File

@ -524,21 +524,19 @@ void getrangeCommand(client *c) {
} }
} }
void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnapshot *snapshot = nullptr) {
addReplyArrayLen(c,listLength(keys)); addReplyArrayLen(c,count);
listNode *ln = listFirst(keys); for (int i = 0; i < count; i++) {
while (ln != nullptr) {
robj_roptr o; robj_roptr o;
if (snapshot) if (snapshot)
o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); o = snapshot->find_cached_threadsafe(szFromObj(keys[i])).val();
else else
o = lookupKeyRead(c->db,(robj*)listNodeValue(ln)); o = lookupKeyRead(c->db,keys[i]);
if (o == nullptr || o->type != OBJ_STRING) { if (o == nullptr || o->type != OBJ_STRING) {
addReplyNull(c); addReplyNull(c);
} else { } else {
addReplyBulk(c,o); addReplyBulk(c,o);
} }
ln = ln->next;
} }
} }
@ -546,23 +544,15 @@ void mgetCommand(client *c) {
// Do async version for large number of arguments // Do async version for large number of arguments
if (c->argc > 100) { if (c->argc > 100) {
if (c->asyncCommand( if (c->asyncCommand(
[c] (const redisDbPersistentDataSnapshot *) { [c] (const redisDbPersistentDataSnapshot *snapshot, std::vector<robj_sharedptr> keys) {
return c->argsAsList(); mgetCore(c, (robj **)keys.data(), keys.size(), snapshot);
},
[c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) {
mgetCore(c, (list *)keys, snapshot);
},
[c] (const redisDbPersistentDataSnapshot *, void *keys) {
c->freeArgList((list *)keys);
} }
)) { )) {
return; return;
} }
} }
list *keys = c->argsAsList(); mgetCore(c, c->argv + 1, c->argc - 1);
mgetCore(c, keys);
c->freeArgList(keys);
} }
void msetGenericCommand(client *c, int nx) { void msetGenericCommand(client *c, int nx) {