refactor scan to use client::asyncCommand
Former-commit-id: 4de596631f48626b770d0217c7ff21001ea46bcf
This commit is contained in:
parent
d658891bf0
commit
7e10c9e9c3
53
src/db.cpp
53
src/db.cpp
@ -1183,17 +1183,10 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
if (o == nullptr && count >= 100)
|
if (o == nullptr && count >= 100)
|
||||||
{
|
{
|
||||||
// Do an async version
|
// Do an async version
|
||||||
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
if (c->asyncCommand(
|
||||||
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
[c, cursor, keys, pat, type, use_pattern, count] (const redisDbPersistentDataSnapshot * snapshot) {
|
||||||
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
|
sds patCopy = pat ? sdsdup(pat) : nullptr;
|
||||||
if (snapshot != nullptr)
|
sds typeCopy = type ? sdsdup(type) : nullptr;
|
||||||
{
|
|
||||||
aeEventLoop *el = serverTL->el;
|
|
||||||
blockClient(c, BLOCKED_ASYNC);
|
|
||||||
redisDb *db = c->db;
|
|
||||||
sds patCopy = pat ? sdsdup(pat) : nullptr;
|
|
||||||
sds typeCopy = type ? sdsdup(type) : nullptr;
|
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{
|
|
||||||
auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys);
|
auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys);
|
||||||
if (use_pattern) {
|
if (use_pattern) {
|
||||||
listNode *ln = listFirst(keys);
|
listNode *ln = listFirst(keys);
|
||||||
@ -1214,30 +1207,20 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
sdsfree(patCopy);
|
sdsfree(patCopy);
|
||||||
if (typeCopy != nullptr)
|
if (typeCopy != nullptr)
|
||||||
sdsfree(typeCopy);
|
sdsfree(typeCopy);
|
||||||
|
return (void *)cursorResult;
|
||||||
aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{
|
},
|
||||||
aeReleaseLock(); // we need to lock with coordination of the client
|
[c, keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) {
|
||||||
|
mstime_t timeScanFilter;
|
||||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
latencyStartMonitor(timeScanFilter);
|
||||||
AeLocker locker;
|
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data);
|
||||||
locker.arm(c);
|
latencyEndMonitor(timeScanFilter);
|
||||||
|
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
|
||||||
unblockClient(c);
|
},
|
||||||
mstime_t timeScanFilter;
|
[keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) {
|
||||||
latencyStartMonitor(timeScanFilter);
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
|
listRelease(keys);
|
||||||
latencyEndMonitor(timeScanFilter);
|
}
|
||||||
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
|
)) {
|
||||||
|
|
||||||
locker.disarm();
|
|
||||||
lock.unlock();
|
|
||||||
|
|
||||||
db->endSnapshotAsync(snapshot);
|
|
||||||
listSetFreeMethod(keys,decrRefCountVoid);
|
|
||||||
listRelease(keys);
|
|
||||||
aeAcquireLock();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user