diff --git a/src/Makefile b/src/Makefile index 9d4c22cf3..7ddd26db8 100644 --- a/src/Makefile +++ b/src/Makefile @@ -207,7 +207,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/blocked.cpp b/src/blocked.cpp index 19d7f2fb6..13cc2347f 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -190,6 +190,8 @@ void unblockClient(client *c) { unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { unblockClientFromModule(c); + } else if (c->btype == BLOCKED_ASYNC) { + } else { serverPanic("Unknown btype in unblockClient()."); } diff --git a/src/db.cpp b/src/db.cpp index 9cba03a2a..f76edb2af 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -704,20 +704,25 @@ bool redisDbPersistentData::iterate(std::function fn) return fResult; } -void keysCommand(client *c) { - sds pattern = szFromObj(c->argv[1]); +void keysCommandCore(client *c, redisDbPersistentData *db, sds pattern) +{ int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; - void *replylen = addReplyDeferredLen(c); + + aeAcquireLock(); + void *replylen = addReplyDeferredLenAsync(c); + aeReleaseLock(); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); - c->db->iterate([&](const char *key)->bool { + db->iterate([&](const char *key)->bool { robj *keyobj; if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { keyobj = createStringObject(key,sdslen(key)); if (!keyIsExpired(c->db,keyobj)) { - addReplyBulk(c,keyobj); + aeAcquireLock(); + addReplyBulkAsync(c,keyobj); + aeReleaseLock(); numkeys++; } decrRefCount(keyobj); @@ -725,7 +730,42 @@ void keysCommand(client *c) { return true; }); - setDeferredArrayLen(c,replylen,numkeys); + setDeferredArrayLenAsync(c,replylen,numkeys); +} + +int prepareClientToWrite(client *c, bool fAsync); +void keysCommand(client *c) { + sds pattern = szFromObj(c->argv[1]); + + redisDbPersistentData *snapshot = nullptr; + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) + snapshot = c->db->createSnapshot(c->mvccCheckpoint); + if (snapshot != nullptr) + { + sds patternCopy = sdsdup(pattern); + aeEventLoop *el = serverTL->el; + blockClient(c, BLOCKED_ASYNC); + redisDb *db = c->db; + g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{ + keysCommandCore(c, snapshot, patternCopy); + sdsfree(patternCopy); + aePostFunction(el, [c, db, snapshot]{ + aeReleaseLock(); // we need to lock with coordination of the client + + std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); + + unblockClient(c); + db->endSnapshot(snapshot); + aeAcquireLock(); + }); + }); + } + else + { + keysCommandCore(c, c->db, pattern); + } } /* This callback is used by scanGenericCommand in order to collect elements diff --git a/src/networking.cpp b/src/networking.cpp index 905a64693..bf08c74a6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -530,6 +530,7 @@ void *addReplyDeferredLenAsync(client *c) { if (FCorrectThread(c)) return addReplyDeferredLen(c); + prepareClientToWrite(c, true); return (void*)((ssize_t)c->bufposAsync); } @@ -1316,7 +1317,11 @@ bool freeClient(client *c) { c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ - if (c->flags & CLIENT_BLOCKED) unblockClient(c); + if (c->flags & CLIENT_BLOCKED) + { + serverAssert(c->btype != BLOCKED_ASYNC); + unblockClient(c); + } dictRelease(c->bpop.keys); /* UNWATCH all the keys */ diff --git a/src/server.cpp b/src/server.cpp index 20a91a763..51ca9530d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3076,6 +3076,8 @@ void initServer(void) { latencyMonitorInit(); bioInit(); g_pserver->initial_memory_usage = zmalloc_used_memory(); + + g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); } /* Parse the flags string description 'strflags' and set them to the @@ -3831,6 +3833,7 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); + serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); return C_OK; diff --git a/src/server.h b/src/server.h index 77455228c..2e8bc3f73 100644 --- a/src/server.h +++ b/src/server.h @@ -92,6 +92,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "endianconv.h" #include "crc64.h" #include "IStorage.h" +#include "AsyncWorkQueue.h" extern int g_fTestMode; @@ -389,7 +390,8 @@ public: #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ #define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_NUM 6 /* Number of blocked states. */ +#define BLOCKED_ASYNC 6 +#define BLOCKED_NUM 7 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -2060,6 +2062,8 @@ struct redisServer { // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition uint64_t mvcc_tstamp; + AsyncWorkQueue *asyncworkqueue; + /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */