From 6bb1429a460b42eba6ed1ac4ba0f13ba7298076a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Feb 2020 01:22:25 -0500 Subject: [PATCH 1/3] module fixes Former-commit-id: ef4e11ecb8a6f1a05bb21f014120b0ef9e771b60 --- src/module.cpp | 83 ++++++++++++++++++++++++++++++++++++---------- src/networking.cpp | 42 +++++++++++++++++------ src/server.h | 3 ++ 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index c9cc29bfd..b93ad36ad 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include "rdb.h" +#include "aelocker.h" #include #include #include @@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyLongLong(c,ll); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyLongLongAsync(c,ll); return REDISMODULE_OK; } @@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyProto(c,prefix,strlen(prefix)); - addReplyProto(c,msg,strlen(msg)); - addReplyProto(c,"\r\n",2); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyProtoAsync(c,prefix,strlen(prefix)); + addReplyProtoAsync(c,msg,strlen(msg)); + addReplyProtoAsync(c,"\r\n",2); return REDISMODULE_OK; } @@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) { * The function always returns REDISMODULE_OK. */ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); + AeLocker locker; + if (c == NULL) return REDISMODULE_OK; + std::unique_lock lock(c->lock); + locker.arm(c); if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* (ctx->postponed_arrays_count+1), MALLOC_LOCAL); ctx->postponed_arrays[ctx->postponed_arrays_count] = - addReplyDeferredLen(c); + addReplyDeferredLenAsync(c); ctx->postponed_arrays_count++; } else { - addReplyArrayLen(c,len); + addReplyArrayLenAsync(c,len); } return REDISMODULE_OK; } @@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNullArray(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullArrayAsync(c); return REDISMODULE_OK; } @@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptyarray); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptyarray); return REDISMODULE_OK; } @@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); if (ctx->postponed_arrays_count == 0) { serverLog(LL_WARNING, "API misuse detected in module %s: " @@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { return; } ctx->postponed_arrays_count--; - setDeferredArrayLen(c, + setDeferredArrayLenAsync(c, ctx->postponed_arrays[ctx->postponed_arrays_count], len); if (ctx->postponed_arrays_count == 0) { @@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c,(char*)buf,len); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCBufferAsync(c,(char*)buf,len); return REDISMODULE_OK; } @@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCString(c,(char*)buf); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCStringAsync(c,(char*)buf); return REDISMODULE_OK; } @@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulk(c,str); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkAsync(c,str); return REDISMODULE_OK; } @@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptybulk); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyVerbatim(c, buf, len, "txt"); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyVerbatimAsync(c, buf, len, "txt"); return REDISMODULE_OK; } @@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) int RM_ReplyWithNull(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNull(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullAsync(c); return REDISMODULE_OK; } @@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); sds proto = sdsnewlen(reply->proto, reply->protolen); - addReplySds(c,proto); + addReplySdsAsync(c,proto); return REDISMODULE_OK; } @@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyDouble(c,d); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyDoubleAsync(c,d); return REDISMODULE_OK; } @@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyHumanLongDouble(c, ld); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyHumanLongDoubleAsync(c, ld); return REDISMODULE_OK; } diff --git a/src/networking.cpp b/src/networking.cpp index 8624c4e2c..746e48628 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -661,23 +661,33 @@ void addReplyDoubleAsync(client *c, double d) { addReplyDoubleCore(c, d, true); } +void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync); + /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ -void addReplyHumanLongDouble(client *c, long double d) { +void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulk(c,o); + addReplyBulkCore(c,o,fAsync); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); - addReplyProto(c,",",1); - addReplyProto(c,buf,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,",",1,fAsync); + addReplyProtoCore(c,buf,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyHumanLongDouble(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, false); +} + +void addReplyHumanLongDoubleAsync(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, true); +} + /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { @@ -912,6 +922,10 @@ void addReplyBulkCString(client *c, const char *s) { addReplyBulkCStringCore(c, s, false); } +void addReplyBulkCStringAsync(client *c, const char *s) { + addReplyBulkCStringCore(c, s, true); +} + /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -930,9 +944,9 @@ void addReplyBulkLongLong(client *c, long long ll) { * three first characters of the extension are used, and if the * provided one is shorter than that, the remaining is filled with * spaces. */ -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { +void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) { if (c->resp == 2) { - addReplyBulkCBuffer(c,s,len); + addReplyBulkCBufferCore(c,s,len,fAsync); } else { char buf[32]; size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); @@ -944,12 +958,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { p[i] = *ext++; } } - addReplyProto(c,buf,preflen); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,buf,preflen,fAsync); + addReplyProtoCore(c,s,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, false); +} + +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, true); +} + /* Add an array of C strings as status replies with a heading. * This function is typically invoked by from commands that support * subcommands in response to the 'help' subcommand. The help array diff --git a/src/server.h b/src/server.h index c3a59db8a..8d976253d 100644 --- a/src/server.h +++ b/src/server.h @@ -2121,10 +2121,12 @@ void addReplyNullArray(client *c); void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); +void addReplyBulkCStringAsync(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); @@ -2134,6 +2136,7 @@ void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); +void addReplyHumanLongDoubleAsync(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); From 8800a516d152190c46e547bd69da3aca6515944c Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:12 -0500 Subject: [PATCH 2/3] Add test to detect issue #137 and #132 Former-commit-id: 49d86746edef497a568c6f3a64695d420305cca8 --- src/debug.cpp | 20 +++++++++++++++++++- tests/integration/replication.tcl | 2 ++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 6 +++--- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 234f197be..1916dc79b 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -685,8 +685,26 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); - } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) { + } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) { c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY; + if (!strcasecmp(szFromObj(c->argv[2]), "yes")) + { + redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + mi->master = c; + listAddNodeHead(g_pserver->masters, mi); + } + else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset + { + serverAssert(c->flags & CLIENT_MASTER); + if (listLength(g_pserver->masters)) + { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + serverAssert(mi->master == c); + listDelNode(g_pserver->masters, listFirst(g_pserver->masters)); + zfree(mi); + } + c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY); + } addReply(c, shared.ok); } else { addReplySubcommandSyntaxError(c); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..5ed1ffc52 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -309,3 +309,5 @@ start_server {tags {"repl"}} { } } } + + diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index ec1d7cd31..0711d4cc2 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -37,6 +37,7 @@ set ::all_tests { unit/acl unit/rreplay unit/cron + unit/replication integration/block-repl integration/replication integration/replication-2 diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2029f521d..2fd1d3714 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ -start_server {tags {"rreplay"}} { +start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master + r debug force-master flagonly r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} { reconnect test {RREPLAY db different} { - r debug force-master + r debug force-master flagonly r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eabc436814c58c112d56bda275d70b0622843168 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:31 -0500 Subject: [PATCH 3/3] Fix issue #137 and #132 Former-commit-id: 050d58007f84e4f71b0ae8b053ae4d6fd5bb4ec7 --- src/db.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 605baaf40..9190d22f1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -177,9 +177,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { + expireIfNeeded(db,key); robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC); - if (expireIfNeeded(db,key)) - o = NULL; return o; }