Merge branch 'async_mget' into 'keydbpro'

add async mget

See merge request keydb-dev/KeyDB-Pro!38

Former-commit-id: e315b39f4779d530f5ab93e1f15915211e6706e2
This commit is contained in:
jsully 2021-09-08 19:08:23 +00:00
commit d1e0684393
4 changed files with 83 additions and 47 deletions

View File

@ -1183,17 +1183,10 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
if (o == nullptr && count >= 100) if (o == nullptr && count >= 100)
{ {
// Do an async version // Do an async version
const redisDbPersistentDataSnapshot *snapshot = nullptr; if (c->asyncCommand(
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, const std::vector<robj_sharedptr> &) {
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); sds patCopy = pat ? sdsdup(pat) : nullptr;
if (snapshot != nullptr) sds typeCopy = type ? sdsdup(type) : nullptr;
{
aeEventLoop *el = serverTL->el;
blockClient(c, BLOCKED_ASYNC);
redisDb *db = c->db;
sds patCopy = pat ? sdsdup(pat) : nullptr;
sds typeCopy = type ? sdsdup(type) : nullptr;
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys);
if (use_pattern) { if (use_pattern) {
listNode *ln = listFirst(keys); listNode *ln = listFirst(keys);
@ -1214,30 +1207,17 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
sdsfree(patCopy); sdsfree(patCopy);
if (typeCopy != nullptr) if (typeCopy != nullptr)
sdsfree(typeCopy); sdsfree(typeCopy);
mstime_t timeScanFilter;
aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{ latencyStartMonitor(timeScanFilter);
aeReleaseLock(); // we need to lock with coordination of the client scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
latencyEndMonitor(timeScanFilter);
std::unique_lock<decltype(c->lock)> lock(c->lock); latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
AeLocker locker; },
locker.arm(c); [keys] (const redisDbPersistentDataSnapshot *) {
listSetFreeMethod(keys,decrRefCountVoid);
unblockClient(c); listRelease(keys);
mstime_t timeScanFilter; }
latencyStartMonitor(timeScanFilter); )) {
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
latencyEndMonitor(timeScanFilter);
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
locker.disarm();
lock.unlock();
db->endSnapshotAsync(snapshot);
listSetFreeMethod(keys,decrRefCountVoid);
listRelease(keys);
aeAcquireLock();
});
});
return; return;
} }
} }

View File

@ -4952,6 +4952,46 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
}, fLock) == AE_OK; }, fLock) == AE_OK;
} }
std::vector<robj_sharedptr> clientArgs(client *c) {
std::vector<robj_sharedptr> args;
for (int j = 1; j < c->argc; j++) {
args.push_back(robj_sharedptr(c->argv[j]));
}
return args;
}
bool client::asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, const std::vector<robj_sharedptr> &)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn)
{
serverAssert(FCorrectThread(this));
const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */);
if (snapshot == nullptr) {
return false;
}
aeEventLoop *el = serverTL->el;
blockClient(this, BLOCKED_ASYNC);
g_pserver->asyncworkqueue->AddWorkFunction([el, this, mainFn, postFn, snapshot] {
std::vector<robj_sharedptr> args = clientArgs(this);
aePostFunction(el, [this, mainFn, postFn, snapshot, args] {
aeReleaseLock();
std::unique_lock<decltype(this->lock)> lock(this->lock);
AeLocker locker;
locker.arm(this);
unblockClient(this);
mainFn(snapshot, args);
locker.disarm();
lock.unlock();
if (postFn)
postFn(snapshot);
this->db->endSnapshotAsync(snapshot);
aeAcquireLock();
});
});
return true;
}
/* ====================== Error lookup and execution ===================== */ /* ====================== Error lookup and execution ===================== */
void incrementErrorCount(const char *fullerr, size_t namelen) { void incrementErrorCount(const char *fullerr, size_t namelen) {

View File

@ -1661,6 +1661,8 @@ struct client {
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true); bool postFunction(std::function<void(client *)> fn, bool fLock = true);
size_t argv_len_sum() const; size_t argv_len_sum() const;
bool asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, const std::vector<robj_sharedptr> &)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn = nullptr);
}; };
struct saveparam { struct saveparam {

View File

@ -29,6 +29,7 @@
#include "server.h" #include "server.h"
#include <cmath> /* isnan(), isinf() */ #include <cmath> /* isnan(), isinf() */
#include "aelocker.h"
/* Forward declarations */ /* Forward declarations */
int getGenericCommand(client *c); int getGenericCommand(client *c);
@ -523,24 +524,37 @@ void getrangeCommand(client *c) {
} }
} }
void mgetCommand(client *c) { void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnapshot *snapshot = nullptr) {
int j; addReplyArrayLen(c,count);
for (int i = 0; i < count; i++) {
addReplyArrayLen(c,c->argc-1); robj_roptr o;
for (j = 1; j < c->argc; j++) { if (snapshot)
robj_roptr o = lookupKeyRead(c->db,c->argv[j]); o = snapshot->find_cached_threadsafe(szFromObj(keys[i])).val();
if (o == nullptr) { else
o = lookupKeyRead(c->db,keys[i]);
if (o == nullptr || o->type != OBJ_STRING) {
addReplyNull(c); addReplyNull(c);
} else { } else {
if (o->type != OBJ_STRING) { addReplyBulk(c,o);
addReplyNull(c);
} else {
addReplyBulk(c,o);
}
} }
} }
} }
void mgetCommand(client *c) {
// Do async version for large number of arguments
if (c->argc > 100) {
if (c->asyncCommand(
[c] (const redisDbPersistentDataSnapshot *snapshot, const std::vector<robj_sharedptr> &keys) {
mgetCore(c, (robj **)keys.data(), keys.size(), snapshot);
}
)) {
return;
}
}
mgetCore(c, c->argv + 1, c->argc - 1);
}
void msetGenericCommand(client *c, int nx) { void msetGenericCommand(client *c, int nx) {
int j; int j;