From 6f7283c06526fbcac8800ca4463f51e09cff927a Mon Sep 17 00:00:00 2001 From: malavan Date: Tue, 24 Aug 2021 21:44:17 +0000 Subject: [PATCH 01/11] add async mget Former-commit-id: c7bd2327f8e4330e2bcd857fc87ce7e86e075d20 --- src/t_string.cpp | 49 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/src/t_string.cpp b/src/t_string.cpp index c1f2e134a..7ffe24eff 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -29,6 +29,7 @@ #include "server.h" #include /* isnan(), isinf() */ +#include "aelocker.h" /* Forward declarations */ int getGenericCommand(client *c); @@ -524,10 +525,54 @@ void getrangeCommand(client *c) { } void mgetCommand(client *c) { - int j; + // Do async version for large number of arguments + if (c->argc > 100) { + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); + if (snapshot != nullptr) { + list *keys = listCreate(); + aeEventLoop *el = serverTL->el; + blockClient(c, BLOCKED_ASYNC); + redisDb *db = c->db; + g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] { + for (int j = 1; j < c->argc; j++) { + incrRefCount(c->argv[j]); + listAddNodeTail(keys, c->argv[j]); + } + aePostFunction(el, [c, keys, snapshot, db] { + aeReleaseLock(); + std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); + unblockClient(c); + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); + } + ln = ln->next; + } + + locker.disarm(); + lock.unlock(); + db->endSnapshotAsync(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + aeAcquireLock(); + }); + }); + return; + } + } + addReplyArrayLen(c,c->argc-1); - for (j = 1; j < c->argc; j++) { + for (int j = 1; j < c->argc; j++) { robj_roptr o = lookupKeyRead(c->db,c->argv[j]); if (o == nullptr) { addReplyNull(c); From dcd3d47f7fe9a96db23c71fb1fc2869a9f51aa62 Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 25 Aug 2021 20:06:06 +0000 Subject: [PATCH 02/11] refactor async to offload common code to client Former-commit-id: 9a7547bfaa0ceff76e604262913fb11a64c627d8 --- src/server.cpp | 20 +++++++++++++++++++ src/server.h | 1 + src/t_string.cpp | 52 ++++++++++++++++++++---------------------------- 3 files changed, 43 insertions(+), 30 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 05ca231bd..7bedf8747 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,6 +4952,26 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } +void client::asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn) { + aeEventLoop *el = serverTL->el; + blockClient(this, BLOCKED_ASYNC); + g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] { + preFn(); + aePostFunction(el, [this, mainFn, postFn] { + aeReleaseLock(); + std::unique_locklock)> lock(this->lock); + AeLocker locker; + locker.arm(this); + unblockClient(this); + mainFn(); + locker.disarm(); + lock.unlock(); + postFn(); + aeAcquireLock(); + }); + }); +} + /* ====================== Error lookup and execution ===================== */ void incrementErrorCount(const char *fullerr, size_t namelen) { diff --git a/src/server.h b/src/server.h index b18989603..0911e860e 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,6 +1661,7 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; + void asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 7ffe24eff..3d40a2a62 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -526,47 +526,39 @@ void getrangeCommand(client *c) { void mgetCommand(client *c) { // Do async version for large number of arguments - if (c->argc > 100) { + if (c->argc > 1) { const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); if (snapshot != nullptr) { list *keys = listCreate(); - aeEventLoop *el = serverTL->el; - blockClient(c, BLOCKED_ASYNC); redisDb *db = c->db; - g_pserver->asyncworkqueue->AddWorkFunction([el, c, keys, snapshot, db] { + c->asyncCommand( + [c, keys] { for (int j = 1; j < c->argc; j++) { incrRefCount(c->argv[j]); listAddNodeTail(keys, c->argv[j]); } - aePostFunction(el, [c, keys, snapshot, db] { - aeReleaseLock(); - std::unique_locklock)> lock(c->lock); - AeLocker locker; - locker.arm(c); - unblockClient(c); - - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { - robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); - if (o == nullptr || o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - ln = ln->next; + }, + [c, keys, snapshot] { + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); } - - locker.disarm(); - lock.unlock(); - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); - aeAcquireLock(); - }); - }); + ln = ln->next; + } + }, + [keys, snapshot, db] { + db->endSnapshotAsync(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + } + ); return; } } From 7451f1738aa87544d693166a39f466f916ea8cca Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 25 Aug 2021 22:15:28 +0000 Subject: [PATCH 03/11] change indentation and increase async mget threshold Former-commit-id: ffaa176d5eafabfae372224c790e99dd9e520fef --- src/t_string.cpp | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/t_string.cpp b/src/t_string.cpp index 3d40a2a62..4f965e44f 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -526,7 +526,7 @@ void getrangeCommand(client *c) { void mgetCommand(client *c) { // Do async version for large number of arguments - if (c->argc > 1) { + if (c->argc > 100) { const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); @@ -534,30 +534,30 @@ void mgetCommand(client *c) { list *keys = listCreate(); redisDb *db = c->db; c->asyncCommand( - [c, keys] { - for (int j = 1; j < c->argc; j++) { - incrRefCount(c->argv[j]); - listAddNodeTail(keys, c->argv[j]); - } - }, - [c, keys, snapshot] { - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { - robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); - if (o == nullptr || o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); + [c, keys] { + for (int j = 1; j < c->argc; j++) { + incrRefCount(c->argv[j]); + listAddNodeTail(keys, c->argv[j]); } - ln = ln->next; + }, + [c, keys, snapshot] { + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); + } + ln = ln->next; + } + }, + [keys, snapshot, db] { + db->endSnapshotAsync(snapshot); + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); } - }, - [keys, snapshot, db] { - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); - } ); return; } From 6175535d5479fd34e0668efda87763c1e18f9704 Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 26 Aug 2021 13:10:59 +0000 Subject: [PATCH 04/11] refactor of asyncCommand to include snapshot creation Former-commit-id: c0908362162b5f2834b90cd9ce84fd1ee6768834 --- src/server.cpp | 23 +++++++++---- src/server.h | 4 ++- src/t_string.cpp | 87 ++++++++++++++++++++++++------------------------ 3 files changed, 64 insertions(+), 50 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 7bedf8747..3e8517ad8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,24 +4952,35 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } -void client::asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn) { +bool client::asyncCommand(std::function &&preFn, + std::function &&mainFn, + std::function &&postFn) +{ + const redisDbPersistentDataSnapshot *snapshot = nullptr; + if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */); + if (snapshot == nullptr) { + return false; + } aeEventLoop *el = serverTL->el; blockClient(this, BLOCKED_ASYNC); - g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn] { - preFn(); - aePostFunction(el, [this, mainFn, postFn] { + g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { + void *preData = preFn(snapshot); + aePostFunction(el, [this, mainFn, postFn, snapshot, preData] { aeReleaseLock(); std::unique_locklock)> lock(this->lock); AeLocker locker; locker.arm(this); unblockClient(this); - mainFn(); + mainFn(snapshot, preData); locker.disarm(); lock.unlock(); - postFn(); + postFn(snapshot, preData); + this->db->endSnapshotAsync(snapshot); aeAcquireLock(); }); }); + return true; } /* ====================== Error lookup and execution ===================== */ diff --git a/src/server.h b/src/server.h index 0911e860e..e617bafb6 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,7 +1661,9 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; - void asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn); + bool asyncCommand(std::function &&preFn, + std::function &&mainFn, + std::function &&postFn); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 4f965e44f..dc8adccda 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -524,58 +524,59 @@ void getrangeCommand(client *c) { } } +list *mgetKeysFromClient(client *c) { + list *keys = listCreate(); + for (int j = 1; j < c->argc; j++) { + incrRefCount(c->argv[j]); + listAddNodeTail(keys, c->argv[j]); + } + return keys; +} + +void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { + addReplyArrayLen(c,listLength(keys)); + listNode *ln = listFirst(keys); + while (ln != nullptr) { + robj_roptr o; + if (snapshot) + o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + else + o = lookupKeyRead(c->db,(robj*)listNodeValue(ln)); + if (o == nullptr || o->type != OBJ_STRING) { + addReplyNull(c); + } else { + addReplyBulk(c,o); + } + ln = ln->next; + } +} + +void mgetClearKeys(list *keys) { + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); +} + void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { - const redisDbPersistentDataSnapshot *snapshot = nullptr; - if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) - snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); - if (snapshot != nullptr) { - list *keys = listCreate(); - redisDb *db = c->db; - c->asyncCommand( - [c, keys] { - for (int j = 1; j < c->argc; j++) { - incrRefCount(c->argv[j]); - listAddNodeTail(keys, c->argv[j]); - } + if (c->asyncCommand( + [c] (const redisDbPersistentDataSnapshot *snapshot) { + return mgetKeysFromClient(c); }, - [c, keys, snapshot] { - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { - robj_roptr o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); - if (o == nullptr || o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - ln = ln->next; - } + [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + mgetCore(c, (list *)keys, snapshot); }, - [keys, snapshot, db] { - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); + [] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + mgetClearKeys((list *)keys); } - ); + )) { return; } } - - addReplyArrayLen(c,c->argc-1); - for (int j = 1; j < c->argc; j++) { - robj_roptr o = lookupKeyRead(c->db,c->argv[j]); - if (o == nullptr) { - addReplyNull(c); - } else { - if (o->type != OBJ_STRING) { - addReplyNull(c); - } else { - addReplyBulk(c,o); - } - } - } + + list *keys = mgetKeysFromClient(c); + mgetCore(c, keys); + mgetClearKeys(keys); } void msetGenericCommand(client *c, int nx) { From 1b074c39833dc80b04519502732655ef40b6a546 Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 26 Aug 2021 13:55:32 +0000 Subject: [PATCH 05/11] refactor scan to use client::asyncCommand Former-commit-id: 4de596631f48626b770d0217c7ff21001ea46bcf --- src/db.cpp | 53 ++++++++++++++++++----------------------------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 4a31e8ef6..5eee7bd18 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1183,17 +1183,10 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { if (o == nullptr && count >= 100) { // Do an async version - const redisDbPersistentDataSnapshot *snapshot = nullptr; - if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) - snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); - if (snapshot != nullptr) - { - aeEventLoop *el = serverTL->el; - blockClient(c, BLOCKED_ASYNC); - redisDb *db = c->db; - sds patCopy = pat ? sdsdup(pat) : nullptr; - sds typeCopy = type ? sdsdup(type) : nullptr; - g_pserver->asyncworkqueue->AddWorkFunction([c, snapshot, cursor, count, keys, el, db, patCopy, typeCopy, use_pattern]{ + if (c->asyncCommand( + [c, cursor, keys, pat, type, use_pattern, count] (const redisDbPersistentDataSnapshot * snapshot) { + sds patCopy = pat ? sdsdup(pat) : nullptr; + sds typeCopy = type ? sdsdup(type) : nullptr; auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); if (use_pattern) { listNode *ln = listFirst(keys); @@ -1214,30 +1207,20 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sdsfree(patCopy); if (typeCopy != nullptr) sdsfree(typeCopy); - - aePostFunction(el, [c, snapshot, keys, db, cursorResult, use_pattern]{ - aeReleaseLock(); // we need to lock with coordination of the client - - std::unique_locklock)> lock(c->lock); - AeLocker locker; - locker.arm(c); - - unblockClient(c); - mstime_t timeScanFilter; - latencyStartMonitor(timeScanFilter); - scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); - latencyEndMonitor(timeScanFilter); - latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); - - locker.disarm(); - lock.unlock(); - - db->endSnapshotAsync(snapshot); - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); - aeAcquireLock(); - }); - }); + return (void *)cursorResult; + }, + [c, keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) { + mstime_t timeScanFilter; + latencyStartMonitor(timeScanFilter); + scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data); + latencyEndMonitor(timeScanFilter); + latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); + }, + [keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) { + listSetFreeMethod(keys,decrRefCountVoid); + listRelease(keys); + } + )) { return; } } From b92c0e8d5c0e9624df17c6cdcfca26f8604a636a Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 26 Aug 2021 13:58:12 +0000 Subject: [PATCH 06/11] add correct thread assert to client::asyncCommand Former-commit-id: 8970f832348a80db6c183d25c4a5342a258a6ba9 --- src/server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.cpp b/src/server.cpp index 3e8517ad8..94709b60f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4963,6 +4963,7 @@ bool client::asyncCommand(std::functionel; + serverAssert(FCorrectThread(this)); blockClient(this, BLOCKED_ASYNC); g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { void *preData = preFn(snapshot); From 0baa5438197a5e249798421821d3640d80740bf0 Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 26 Aug 2021 14:23:13 +0000 Subject: [PATCH 07/11] add correct thread assert to client::asyncCommand Former-commit-id: a892fb4d551fb58d619bc80c333a6b3a9ed34215 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 94709b60f..6aae2a2bc 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4956,6 +4956,7 @@ bool client::asyncCommand(std::function &&mainFn, std::function &&postFn) { + serverAssert(FCorrectThread(this)); const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(this->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) snapshot = this->db->createSnapshot(this->mvccCheckpoint, false /* fOptional */); @@ -4963,7 +4964,6 @@ bool client::asyncCommand(std::functionel; - serverAssert(FCorrectThread(this)); blockClient(this, BLOCKED_ASYNC); g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { void *preData = preFn(snapshot); From 7ee39396c0a5ec0d95902dc1a7bea3bb3e5f07c2 Mon Sep 17 00:00:00 2001 From: malavan Date: Mon, 30 Aug 2021 21:09:22 +0000 Subject: [PATCH 08/11] add method to get client args as list Former-commit-id: 576b8cd2153c48c0ca4dfc9ed7d12f77d6f76f7e --- src/server.cpp | 14 ++++++++++++++ src/server.h | 2 ++ src/t_string.cpp | 24 +++++------------------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 6aae2a2bc..94c95d500 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,6 +4952,20 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } +list *client::argsAsList() { + list *args = listCreate(); + for (int j = 1; j < this->argc; j++) { + incrRefCount(this->argv[j]); + listAddNodeTail(args, this->argv[j]); + } + return args; +} + +void client::freeArgList(list* args) { + listSetFreeMethod(args,decrRefCountVoid); + listRelease(args); +} + bool client::asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn) diff --git a/src/server.h b/src/server.h index e617bafb6..d1d22dbd1 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,6 +1661,8 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; + list *argsAsList(); + void freeArgList(list* args); bool asyncCommand(std::function &&preFn, std::function &&mainFn, std::function &&postFn); diff --git a/src/t_string.cpp b/src/t_string.cpp index dc8adccda..b015061d0 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -524,15 +524,6 @@ void getrangeCommand(client *c) { } } -list *mgetKeysFromClient(client *c) { - list *keys = listCreate(); - for (int j = 1; j < c->argc; j++) { - incrRefCount(c->argv[j]); - listAddNodeTail(keys, c->argv[j]); - } - return keys; -} - void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { addReplyArrayLen(c,listLength(keys)); listNode *ln = listFirst(keys); @@ -551,32 +542,27 @@ void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapsh } } -void mgetClearKeys(list *keys) { - listSetFreeMethod(keys,decrRefCountVoid); - listRelease(keys); -} - void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { if (c->asyncCommand( [c] (const redisDbPersistentDataSnapshot *snapshot) { - return mgetKeysFromClient(c); + return c->argsAsList(); }, [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { mgetCore(c, (list *)keys, snapshot); }, - [] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { - mgetClearKeys((list *)keys); + [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + c->freeArgList((list *)keys); } )) { return; } } - list *keys = mgetKeysFromClient(c); + list *keys = c->argsAsList(); mgetCore(c, keys); - mgetClearKeys(keys); + c->freeArgList(keys); } void msetGenericCommand(client *c, int nx) { From 3bd777bf5573b657c91f4505b032b90e0d7bf664 Mon Sep 17 00:00:00 2001 From: malavan Date: Tue, 31 Aug 2021 20:03:55 +0000 Subject: [PATCH 09/11] remove unused variable names Former-commit-id: 18d688c2f04a8ce67409bd4442c7635d426fc0ac --- src/db.cpp | 4 ++-- src/t_string.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 5eee7bd18..cdbeb3d38 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1209,14 +1209,14 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sdsfree(typeCopy); return (void *)cursorResult; }, - [c, keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) { + [c, keys] (const redisDbPersistentDataSnapshot *, void *data) { mstime_t timeScanFilter; latencyStartMonitor(timeScanFilter); scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data); latencyEndMonitor(timeScanFilter); latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); }, - [keys] (const redisDbPersistentDataSnapshot * snapshot, void *data) { + [keys] (const redisDbPersistentDataSnapshot *, void *) { listSetFreeMethod(keys,decrRefCountVoid); listRelease(keys); } diff --git a/src/t_string.cpp b/src/t_string.cpp index b015061d0..5823738c9 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -546,13 +546,13 @@ void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { if (c->asyncCommand( - [c] (const redisDbPersistentDataSnapshot *snapshot) { + [c] (const redisDbPersistentDataSnapshot *) { return c->argsAsList(); }, [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { mgetCore(c, (list *)keys, snapshot); }, - [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { + [c] (const redisDbPersistentDataSnapshot *, void *keys) { c->freeArgList((list *)keys); } )) { From b300551862565c7c9f8350aecbfd0b5f1951fbab Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 1 Sep 2021 20:18:41 +0000 Subject: [PATCH 10/11] refactor asyncCommand Former-commit-id: 6af5775e01872f130bc18791fdb4c0b22507b37f --- src/db.cpp | 9 +++------ src/server.cpp | 28 +++++++++++----------------- src/server.h | 8 +++----- src/t_string.cpp | 26 ++++++++------------------ 4 files changed, 25 insertions(+), 46 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index cdbeb3d38..357559a38 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1184,7 +1184,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { { // Do an async version if (c->asyncCommand( - [c, cursor, keys, pat, type, use_pattern, count] (const redisDbPersistentDataSnapshot * snapshot) { + [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, std::vector) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); @@ -1207,16 +1207,13 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { sdsfree(patCopy); if (typeCopy != nullptr) sdsfree(typeCopy); - return (void *)cursorResult; - }, - [c, keys] (const redisDbPersistentDataSnapshot *, void *data) { mstime_t timeScanFilter; latencyStartMonitor(timeScanFilter); - scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, (unsigned long)data); + scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); latencyEndMonitor(timeScanFilter); latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); }, - [keys] (const redisDbPersistentDataSnapshot *, void *) { + [keys] (const redisDbPersistentDataSnapshot *) { listSetFreeMethod(keys,decrRefCountVoid); listRelease(keys); } diff --git a/src/server.cpp b/src/server.cpp index 94c95d500..573fdcb3e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,23 +4952,16 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } -list *client::argsAsList() { - list *args = listCreate(); +std::vector client::args() { + std::vector args; for (int j = 1; j < this->argc; j++) { - incrRefCount(this->argv[j]); - listAddNodeTail(args, this->argv[j]); + args.push_back(robj_sharedptr(argv[j])); } return args; } -void client::freeArgList(list* args) { - listSetFreeMethod(args,decrRefCountVoid); - listRelease(args); -} - -bool client::asyncCommand(std::function &&preFn, - std::function &&mainFn, - std::function &&postFn) +bool client::asyncCommand(std::function)> &&mainFn, + std::function &&postFn) { serverAssert(FCorrectThread(this)); const redisDbPersistentDataSnapshot *snapshot = nullptr; @@ -4979,18 +4972,19 @@ bool client::asyncCommand(std::functionel; blockClient(this, BLOCKED_ASYNC); - g_pserver->asyncworkqueue->AddWorkFunction([el, this, preFn, mainFn, postFn, snapshot] { - void *preData = preFn(snapshot); - aePostFunction(el, [this, mainFn, postFn, snapshot, preData] { + g_pserver->asyncworkqueue->AddWorkFunction([el, this, mainFn, postFn, snapshot] { + std::vector args = this->args(); + aePostFunction(el, [this, mainFn, postFn, snapshot, args] { aeReleaseLock(); std::unique_locklock)> lock(this->lock); AeLocker locker; locker.arm(this); unblockClient(this); - mainFn(snapshot, preData); + mainFn(snapshot, args); locker.disarm(); lock.unlock(); - postFn(snapshot, preData); + if (postFn) + postFn(snapshot); this->db->endSnapshotAsync(snapshot); aeAcquireLock(); }); diff --git a/src/server.h b/src/server.h index d1d22dbd1..101ca34eb 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,11 +1661,9 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; - list *argsAsList(); - void freeArgList(list* args); - bool asyncCommand(std::function &&preFn, - std::function &&mainFn, - std::function &&postFn); + std::vector args(); + bool asyncCommand(std::function)> &&mainFn, + std::function &&postFn = nullptr); }; struct saveparam { diff --git a/src/t_string.cpp b/src/t_string.cpp index 5823738c9..0e555351e 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -524,21 +524,19 @@ void getrangeCommand(client *c) { } } -void mgetCore(client *c, list *keys, const redisDbPersistentDataSnapshot *snapshot = nullptr) { - addReplyArrayLen(c,listLength(keys)); - listNode *ln = listFirst(keys); - while (ln != nullptr) { +void mgetCore(client *c, robj **keys, int count, const redisDbPersistentDataSnapshot *snapshot = nullptr) { + addReplyArrayLen(c,count); + for (int i = 0; i < count; i++) { robj_roptr o; if (snapshot) - o = snapshot->find_cached_threadsafe(szFromObj((robj*)listNodeValue(ln))).val(); + o = snapshot->find_cached_threadsafe(szFromObj(keys[i])).val(); else - o = lookupKeyRead(c->db,(robj*)listNodeValue(ln)); + o = lookupKeyRead(c->db,keys[i]); if (o == nullptr || o->type != OBJ_STRING) { addReplyNull(c); } else { addReplyBulk(c,o); } - ln = ln->next; } } @@ -546,23 +544,15 @@ void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { if (c->asyncCommand( - [c] (const redisDbPersistentDataSnapshot *) { - return c->argsAsList(); - }, - [c] (const redisDbPersistentDataSnapshot *snapshot, void *keys) { - mgetCore(c, (list *)keys, snapshot); - }, - [c] (const redisDbPersistentDataSnapshot *, void *keys) { - c->freeArgList((list *)keys); + [c] (const redisDbPersistentDataSnapshot *snapshot, std::vector keys) { + mgetCore(c, (robj **)keys.data(), keys.size(), snapshot); } )) { return; } } - list *keys = c->argsAsList(); - mgetCore(c, keys); - c->freeArgList(keys); + mgetCore(c, c->argv + 1, c->argc - 1); } void msetGenericCommand(client *c, int nx) { From 286e9cccdc838dc74383cd1c90c011fa634b0865 Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 1 Sep 2021 21:00:27 +0000 Subject: [PATCH 11/11] updates from comments Former-commit-id: 852885f09e7df1d9570408546baffa8545707335 --- src/db.cpp | 2 +- src/server.cpp | 10 +++++----- src/server.h | 3 +-- src/t_string.cpp | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 357559a38..509673cbe 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1184,7 +1184,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { { // Do an async version if (c->asyncCommand( - [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, std::vector) { + [c, keys, pat, type, cursor, count, use_pattern] (const redisDbPersistentDataSnapshot *snapshot, const std::vector &) { sds patCopy = pat ? sdsdup(pat) : nullptr; sds typeCopy = type ? sdsdup(type) : nullptr; auto cursorResult = snapshot->scan_threadsafe(cursor, count, typeCopy, keys); diff --git a/src/server.cpp b/src/server.cpp index 573fdcb3e..a039b2863 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4952,15 +4952,15 @@ bool client::postFunction(std::function fn, bool fLock) { }, fLock) == AE_OK; } -std::vector client::args() { +std::vector clientArgs(client *c) { std::vector args; - for (int j = 1; j < this->argc; j++) { - args.push_back(robj_sharedptr(argv[j])); + for (int j = 1; j < c->argc; j++) { + args.push_back(robj_sharedptr(c->argv[j])); } return args; } -bool client::asyncCommand(std::function)> &&mainFn, +bool client::asyncCommand(std::function &)> &&mainFn, std::function &&postFn) { serverAssert(FCorrectThread(this)); @@ -4973,7 +4973,7 @@ bool client::asyncCommand(std::functionel; blockClient(this, BLOCKED_ASYNC); g_pserver->asyncworkqueue->AddWorkFunction([el, this, mainFn, postFn, snapshot] { - std::vector args = this->args(); + std::vector args = clientArgs(this); aePostFunction(el, [this, mainFn, postFn, snapshot, args] { aeReleaseLock(); std::unique_locklock)> lock(this->lock); diff --git a/src/server.h b/src/server.h index 101ca34eb..4b1a84bf1 100644 --- a/src/server.h +++ b/src/server.h @@ -1661,8 +1661,7 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; - std::vector args(); - bool asyncCommand(std::function)> &&mainFn, + bool asyncCommand(std::function &)> &&mainFn, std::function &&postFn = nullptr); }; diff --git a/src/t_string.cpp b/src/t_string.cpp index 0e555351e..a09cfd919 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -544,7 +544,7 @@ void mgetCommand(client *c) { // Do async version for large number of arguments if (c->argc > 100) { if (c->asyncCommand( - [c] (const redisDbPersistentDataSnapshot *snapshot, std::vector keys) { + [c] (const redisDbPersistentDataSnapshot *snapshot, const std::vector &keys) { mgetCore(c, (robj **)keys.data(), keys.size(), snapshot); } )) {