From b00e1891c04bd375bb7d9877397c675335f8b40c Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Tue, 15 Jan 2019 07:26:19 +0000 Subject: [PATCH 01/35] Fixed a rounding bug in geo.tcl --- tests/unit/geo.tcl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 604697be4..49e421ee9 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -61,6 +61,7 @@ set regression_vectors { {939895 151 59.149620271823181 65.204186651485145} {1412 156 149.29737817929004 15.95807862745508} {564862 149 84.062063109158544 -65.685403922426232} + {1546032440391 16751 -1.8175081637769495 20.665668878082954} } set rv_idx 0 @@ -274,8 +275,19 @@ start_server {tags {"geo"}} { foreach place $diff { set mydist [geo_distance $lon $lat $search_lon $search_lat] set mydist [expr $mydist/1000] - if {($mydist / $radius_km) > 0.999} {incr rounding_errors} + if {($mydist / $radius_km) > 0.999} { + incr rounding_errors + continue + } + if {$mydist < $radius_m} { + # This is a false positive for redis since given the + # same points the higher precision calculation provided + # by TCL shows the point within range + incr rounding_errors + continue + } } + # Make sure this is a real error and not a rounidng issue. if {[llength $diff] == $rounding_errors} { set res $res2; # Error silenced From 2d236d7aecbf12373618191b7ef4b2365d487c6f Mon Sep 17 00:00:00 2001 From: "zheng.ren01@mljr.com" Date: Tue, 25 Jun 2019 18:34:35 +0800 Subject: [PATCH 02/35] =?UTF-8?q?fix=20readme.md=EF=BC=8CRedis=20data=20ty?= =?UTF-8?q?pes=20should=20add=20`t=5Fstream.c`.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6c9435b53..3442659e6 100644 --- a/README.md +++ b/README.md @@ -406,7 +406,7 @@ replicas, or to continue the replication after a disconnection. Other C files --- -* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c` and `t_zset.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. +* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. * `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand. * `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information. * `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel. From 54c4e7f86c280f6a3050fdce17039c1572124007 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 12 Jul 2019 12:18:33 +0200 Subject: [PATCH 03/35] Vertically compact code in aofWrite(). --- src/aof.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/aof.c b/src/aof.c index 565ee8073..ae9c4bb68 100644 --- a/src/aof.c +++ b/src/aof.c @@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { nwritten = write(fd, buf, len); if (nwritten < 0) { - if (errno == EINTR) { - continue; - } + if (errno == EINTR) continue; return totwritten ? totwritten : -1; } From 6e07fac40542ae00ea0a3a2522482bbbd547a2da Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 17 Jul 2019 15:08:18 +0300 Subject: [PATCH 04/35] RM_Log - add support for logging without a context or context without module for instance detached thread safe contexts, or various callbacks that don't provide a context. --- src/module.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/module.c b/src/module.c index f4f753c00..442499b7c 100644 --- a/src/module.c +++ b/src/module.c @@ -3515,7 +3515,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li if (level < server.verbosity) return; - name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); + name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module"); vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); serverLogRaw(level,msg); } @@ -3533,13 +3533,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li * There is a fixed limit to the length of the log line this function is able * to emit, this limit is not specified but is guaranteed to be more than * a few lines of text. + * + * The ctx argument may be NULL if cannot be provided in the context of the + * caller for instance threads or callbacks, in which case a generic "module" + * will be used instead of the module name. */ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { - if (!ctx->module) return; /* Can only log if module is initialized */ - va_list ap; va_start(ap, fmt); - RM_LogRaw(ctx->module,levelstr,fmt,ap); + RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap); va_end(ap); } From 6191ea90a1ec8b3ebeab65e8b783b2cb6714c0d4 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 17 Jul 2019 20:33:52 +0800 Subject: [PATCH 05/35] Client side caching: implement trackingInvalidateKeysOnFlush() --- src/db.c | 1 + src/server.h | 1 + src/tracking.c | 81 +++++++++++++++++++++++++++++++------------------- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/db.c b/src/db.c index 51f5a12b4..568e0b8de 100644 --- a/src/db.c +++ b/src/db.c @@ -417,6 +417,7 @@ void signalModifiedKey(redisDb *db, robj *key) { void signalFlushedDb(int dbid) { touchWatchedKeysOnFlush(dbid); + if (server.tracking_clients) trackingInvalidateKeysOnFlush(dbid); } /*----------------------------------------------------------------------------- diff --git a/src/server.h b/src/server.h index f81b1010e..b200a6696 100644 --- a/src/server.h +++ b/src/server.h @@ -1638,6 +1638,7 @@ void enableTracking(client *c, uint64_t redirect_to); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); +void trackingInvalidateKeysOnFlush(int dbid); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index bbfc66a72..f3ff7ed03 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -117,6 +117,40 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } +void sendTrackingMessage(client *c, long long hash) { + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + return; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } +} + /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such . */ @@ -134,37 +168,7 @@ void trackingInvalidateKey(robj *keyobj) { memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); if (c == NULL) continue; - int using_redirection = 0; - if (c->client_tracking_redirection) { - client *redir = lookupClientByID(c->client_tracking_redirection); - if (!redir) { - /* We need to signal to the original connection that we - * are unable to send invalidation messages to the redirected - * connection, because the client no longer exist. */ - if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); - } - continue; - } - c = redir; - using_redirection = 1; - } - - /* Only send such info for clients in RESP version 3 or more. However - * if redirection is active, and the connection we redirect to is - * in Pub/Sub mode, we can support the feature with RESP 2 as well, - * by sending Pub/Sub messages in the __redis__:invalidate channel. */ - if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); - } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); - } + sendTrackingMessage(c,hash); } raxStop(&ri); @@ -173,3 +177,18 @@ void trackingInvalidateKey(robj *keyobj) { raxFree(TrackingTable[hash]); TrackingTable[hash] = NULL; } + +void trackingInvalidateKeysOnFlush(int dbid) { + UNUSED(dbid); + if (server.tracking_clients == 0) return; + + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + sendTrackingMessage(c,-1); + } + } +} From c56b4ddc6f216ff751f9e8e94d345eee30d9997c Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 16 Jul 2019 11:00:34 +0300 Subject: [PATCH 06/35] prevent diskless replica from terminating on short read now that replica can read rdb directly from the socket, it should avoid exiting on short read and instead try to re-sync. this commit tries to have minimal effects on non-diskless rdb reading. and includes a test that tries to trigger this scenario on various read cases. --- src/rdb.c | 190 +++++++++++++++++++++--------- src/rdb.h | 4 +- src/redis-check-rdb.c | 7 +- src/stream.h | 1 + tests/integration/replication.tcl | 97 +++++++++++++-- tests/support/util.tcl | 19 +++ 6 files changed, 250 insertions(+), 68 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index c566378fb..5111b6b88 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -42,31 +42,35 @@ #include #include -#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +/* This macro is called when the internal RDB stracture is corrupt */ +#define rdbExitReportCorruptRDB(...) rdbReportReadError(0, __LINE__,__VA_ARGS__) +/* This macro is called when RDB read failed (possibly a short read) */ +#define rdbReportReadError(...) rdbReportError(1, __LINE__,__VA_ARGS__) char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); -void rdbCheckThenExit(int linenum, char *reason, ...) { +void rdbReportError(int read_error, int linenum, char *reason, ...) { va_list ap; char msg[1024]; int len; len = snprintf(msg,sizeof(msg), - "Internal error in RDB reading function at rdb.c:%d -> ", linenum); + "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ", + (unsigned long long)server.loading_loaded_bytes, linenum); va_start(ap,reason); vsnprintf(msg+len,sizeof(msg)-len,reason,ap); va_end(ap); if (!rdbCheckMode) { - serverLog(LL_WARNING, "%s", msg); - if (rdbFileBeingLoaded) { + if (rdbFileBeingLoaded || !read_error) { + serverLog(LL_WARNING, "%s", msg); char *argv[2] = {"",rdbFileBeingLoaded}; redis_check_rdb_main(2,argv,NULL); } else { - serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg); return; } } else { @@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) { return len; } -/* This is just a wrapper for the low level function rioRead() that will - * automatically abort if it is not possible to read the specified amount - * of bytes. */ -void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { - if (rioRead(rdb,buf,len) == 0) { - rdbExitReportCorruptRDB( - "Impossible to read %llu bytes in rdbLoadRaw()", - (unsigned long long) len); - return; /* Not reached. */ - } -} - int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } @@ -110,10 +102,11 @@ int rdbLoadType(rio *rdb) { /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS * opcode. */ -time_t rdbLoadTime(rio *rdb) { +int rdbLoadTime(rio *rdb, time_t *t) { int32_t t32; - rdbLoadRaw(rdb,&t32,4); - return (time_t)t32; + if (rioRead(rdb,&t32,4) == 0) return C_ERR; + *t = (time_t)t32; + return C_OK;; } int rdbSaveMillisecondTime(rio *rdb, long long t) { @@ -133,12 +126,13 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { * own old RDB files. Because of that, we instead fix the function only for new * RDB versions, and load older RDB versions as we used to do in the past, * allowing big endian systems to load their own old RDB files. */ -long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { +int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver) { int64_t t64; - rdbLoadRaw(rdb,&t64,8); + if (rioRead(rdb,&t64,8) == 0) return C_ERR; if (rdbver >= 9) /* Check the top comment of this function. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ - return (long long)t64; + *t = (long long)t64; + return C_OK; } /* Saves an encoded length. The first two bits in the first byte are used to @@ -216,9 +210,8 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) { if (rioRead(rdb,&len,8) == 0) return -1; *lenptr = ntohu64(len); } else { - rdbExitReportCorruptRDB( - "Unknown length encoding %d in rdbLoadLen()",type); - return -1; /* Never reached. */ + serverLog(LL_WARNING, "Unknown length encoding %d in rdbLoadLen()",type); + return -1; } return 0; } @@ -284,8 +277,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); val = (int32_t)v; } else { - val = 0; /* anti-warning */ - rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); + serverLog(LL_WARNING, "Unknown RDB integer encoding type %d", enctype); + return NULL; } if (plain || sds) { char buf[LONG_STR_SIZE], *p; @@ -502,7 +495,8 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { case RDB_ENC_LZF: return rdbLoadLzfStringObject(rdb,flags,lenptr); default: - rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); + serverLog(LL_WARNING, "Unknown RDB encoding type %llu", (unsigned long long)len); + return NULL; } } @@ -1644,6 +1638,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { hashTypeConvert(o, OBJ_ENCODING_HT); break; default: + /* totally unreachable */ rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } @@ -1651,6 +1646,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); + if (listpacks == RDB_LENERR) { + rdbReportReadError("Stream listpacks len loading failed."); + decrRefCount(o); + return NULL; + } while(listpacks--) { /* Get the master ID, the one we'll use as key of the radix tree @@ -1658,7 +1658,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (nodekey == NULL) { - rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error."); + decrRefCount(o); + return NULL; } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " @@ -1668,7 +1670,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Load the listpack. */ unsigned char *lp = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (lp == NULL) return NULL; + if (lp == NULL) { + rdbReportReadError("Stream listpacks loading failed."); + sdsfree(nodekey); + decrRefCount(o); + return NULL; + } unsigned char *first = lpFirst(lp); if (first == NULL) { /* Serialized listpacks should never be empty, since on @@ -1685,13 +1692,26 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { rdbExitReportCorruptRDB("Listpack re-added with existing key"); } /* Load total number of items inside the stream. */ - s->length = rdbLoadLen(rdb,NULL); + if (rdbLoadLenByRef(rdb,NULL,&s->length)) { + rdbReportReadError("Stream item count loading failed."); + decrRefCount(o); + return NULL; + } /* Load the last entry ID. */ - s->last_id.ms = rdbLoadLen(rdb,NULL); - s->last_id.seq = rdbLoadLen(rdb,NULL); + if (rdbLoadLenByRef(rdb,NULL,&s->last_id.ms) || + rdbLoadLenByRef(rdb,NULL,&s->last_id.seq)) { + rdbReportReadError("Stream last entry ID loading failed."); + decrRefCount(o); + return NULL; + } /* Consumer groups loading */ - size_t cgroups_count = rdbLoadLen(rdb,NULL); + uint64_t cgroups_count = rdbLoadLen(rdb,NULL); + if (cgroups_count == RDB_LENERR) { + rdbReportReadError("Stream cgroup count loading failed."); + decrRefCount(o); + return NULL; + } while(cgroups_count--) { /* Get the consumer group name and ID. We can then create the * consumer group ASAP and populate its structure as @@ -1699,11 +1719,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { streamID cg_id; sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cgname == NULL) { - rdbExitReportCorruptRDB( + rdbReportReadError( "Error reading the consumer group name from Stream"); + decrRefCount(o); + return NULL; + } + if (rdbLoadLenByRef(rdb,NULL,&cg_id.ms) || + rdbLoadLenByRef(rdb,NULL,&cg_id.seq)) { + rdbReportReadError("Stream cgroup ID loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; } - cg_id.ms = rdbLoadLen(rdb,NULL); - cg_id.seq = rdbLoadLen(rdb,NULL); streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); if (cgroup == NULL) rdbExitReportCorruptRDB("Duplicated consumer group name %s", @@ -1715,13 +1742,32 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * owner, since consumers for this group and their messages will * be read as a next step. So for now leave them not resolved * and later populate it. */ - size_t pel_size = rdbLoadLen(rdb,NULL); + uint64_t pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream PEL size loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream PEL ID loading failed."); + decrRefCount(o); + return NULL; + } streamNACK *nack = streamCreateNACK(NULL); - nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); - nack->delivery_count = rdbLoadLen(rdb,NULL); + if (rdbLoadMillisecondTime(rdb, &nack->delivery_time,RDB_VERSION) == C_ERR) { + rdbReportReadError("Stream PEL nack loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } + if ((nack->delivery_count = rdbLoadLen(rdb,NULL)) == RDB_LENERR) { + rdbReportReadError("Stream nack deliveries loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) rdbExitReportCorruptRDB("Duplicated gobal PEL entry " "loading stream consumer group"); @@ -1729,24 +1775,44 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Now that we loaded our global PEL, we need to load the * consumers and their local PELs. */ - size_t consumers_num = rdbLoadLen(rdb,NULL); + uint64_t consumers_num = rdbLoadLen(rdb,NULL); + if (consumers_num == RDB_LENERR) { + rdbReportReadError("Stream consumers num loading failed."); + decrRefCount(o); + return NULL; + } while(consumers_num--) { sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cname == NULL) { - rdbExitReportCorruptRDB( - "Error reading the consumer name from Stream group"); + rdbReportReadError( + "Error reading the consumer name from Stream group."); + decrRefCount(o); + return NULL; } streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1); sdsfree(cname); - consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rdbLoadMillisecondTime(rdb, &consumer->seen_time,RDB_VERSION) == C_ERR) { + rdbReportReadError("Stream short read reading seen time."); + decrRefCount(o); + return NULL; + } /* Load the PEL about entries owned by this specific * consumer. */ pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream consumer PEL num loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream short read reading PEL streamID."); + decrRefCount(o); + return NULL; + } streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); if (nack == raxNotFound) rdbExitReportCorruptRDB("Consumer entry not found in " @@ -1764,7 +1830,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { - uint64_t moduleid = rdbLoadLen(rdb,NULL); + uint64_t moduleid; + if (rdbLoadLenByRef(rdb,NULL, &moduleid)) { + return NULL; + } moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -1792,6 +1861,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Module v2 serialization has an EOF mark at the end. */ if (io.ver == 2) { uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof == RDB_LENERR) { + o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */ + decrRefCount(o); + return NULL; + } if (eof != RDB_MODULE_OPCODE_EOF) { serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name); exit(1); @@ -1805,7 +1879,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } o = createModuleObject(mt,ptr); } else { - rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); + rdbReportReadError("Unknown RDB encoding type %d",rdbtype); + return NULL; } return o; } @@ -1902,13 +1977,14 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - expiretime = rdbLoadTime(rdb); - expiretime *= 1000; + time_t t; + if (rdbLoadTime(rdb, &t) == C_ERR) goto eoferr; + expiretime = t * 1000; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ - expiretime = rdbLoadMillisecondTime(rdb,rdbver); + if (rdbLoadMillisecondTime(rdb, &expiretime, rdbver) == C_ERR) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ @@ -2017,7 +2093,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * we have the ability to read a MODULE_AUX opcode followed by an * identifier of the module, and a serialized value in "MODULE V2" * format. */ - uint64_t moduleid = rdbLoadLen(rdb,NULL); + uint64_t moduleid; + if (rdbLoadLenByRef(rdb,NULL,&moduleid)) goto eoferr; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; moduleTypeNameByID(name,moduleid); @@ -2090,8 +2167,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { eoferr: /* unexpected end of file is handled here with a fatal exit */ serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); - rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); - return C_ERR; /* Just to avoid warning */ + rdbReportReadError("Unexpected EOF reading RDB file"); +err: + return C_ERR; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The diff --git a/src/rdb.h b/src/rdb.h index 0acddf9ab..2bec0171b 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -127,10 +127,10 @@ int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); -time_t rdbLoadTime(rio *rdb); +int rdbLoadTime(rio *rdb, time_t *t); int rdbSaveLen(rio *rdb, uint64_t len); int rdbSaveMillisecondTime(rio *rdb, long long t); -long long rdbLoadMillisecondTime(rio *rdb, int rdbver); +int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver); uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index e2d71b5a5..fd3f07b0a 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -212,18 +212,19 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { /* Handle special types. */ if (type == RDB_OPCODE_EXPIRETIME) { + time_t t; rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; - expiretime *= 1000; + if (rdbLoadTime(&rdb, &t) == C_ERR) goto eoferr; + expiretime = t * 1000; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; - if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr; + if (rdbLoadMillisecondTime(&rdb, &expiretime, rdbver) == C_ERR) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ diff --git a/src/stream.h b/src/stream.h index ef08753b5..8ae90ce77 100644 --- a/src/stream.h +++ b/src/stream.h @@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); +void streamFreeNACK(streamNACK *na); #endif diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index d69a1761a..5d32555b0 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -192,12 +192,6 @@ foreach mdl {no yes} { set master_host [srv 0 host] set master_port [srv 0 port] set slaves {} - set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] - set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] - set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { @@ -205,6 +199,14 @@ foreach mdl {no yes} { start_server {} { lappend slaves [srv 0 client] test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # start load handles only inside the test, so that the test can be skipped + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork + # Send SLAVEOF commands to slaves [lindex $slaves 0] config set repl-diskless-load $sdl [lindex $slaves 1] config set repl-diskless-load $sdl @@ -278,9 +280,9 @@ start_server {tags {"repl"}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] - set load_handle0 [start_write_load $master_host $master_port 3] start_server {} { test "Master stream is correctly processed while the replica has a script in -BUSY state" { + set load_handle0 [start_write_load $master_host $master_port 3] set slave [srv 0 client] $slave config set lua-time-limit 500 $slave slaveof $master_host $master_port @@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} { } } } + +test {diskless loading short read} { + start_server {tags {"repl"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Set master and replica to use diskless replication + $master config set repl-diskless-sync yes + $master config set rdbcompression no + $replica config set repl-diskless-load swapdb + # Try to fill the master with all types of data types / encodings + for {set k 0} {$k < 3} {incr k} { + for {set i 0} {$i < 10} {incr i} { + r set "$k int_$i" [expr {int(rand()*10000)}] + r expire "$k int_$i" [expr {int(rand()*10000)}] + r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]] + r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]] + r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]] + r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]] + r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]] + r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]] + r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]] + r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]] + r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]] + for {set j 0} {$j < 10} {incr j} { + r xadd "$k stream" * foo "asdf" bar "1234" + } + r xgroup create "$k stream" "mygroup_$i" 0 + r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" > + } + } + + # Start the replication process... + $master config set repl-diskless-sync-delay 0 + $replica replicaof $master_host $master_port + + # kill the replication at various points + set attempts 3 + if {$::accurate} { set attempts 10 } + for {set i 0} {$i < $attempts} {incr i} { + # wait for the replica to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1 + + # add some additional random sleep so that we kill the master on a different place each time + after [expr {int(rand()*100)}] + + # kill the replica connection on the master + set killed [$master client kill type replica] + + if {[catch { + set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10] + if {$::verbose} { + puts $res + } + }]} { + puts "failed triggering short read" + # force the replica to try another full sync + $master client kill type replica + $master set asdf asdf + # the side effect of resizing the backlog is that it is flushed (16k is the min size) + $master config set repl-backlog-size [expr {16384 + $i}] + } + # wait for loading to stop (fail) + wait_for_condition 100 10 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + } + # enable fast shutdown + $master config set rdb-key-save-delay 0 + } + } +} + diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 41cc5612a..c2e76afad 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} { } } +proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} { + set retry $maxtries + set stdout [srv $srv_idx stdout] + while {$retry} { + set result [exec tail -$last_lines < $stdout] + set result [split $result "\n"] + foreach line $result { + if {[string match $pattern $line]} { + return $line + } + } + incr retry -1 + after $delay + } + if {$retry == 0} { + fail "log message of '$pattern' not found" + } +} + # Random integer between 0 and max (excluded). proc randomInt {max} { expr {int(rand()*$max)} From 48d91cf4cc53e820dafa3c6d5efdf4e5a06c2c29 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 17 Jul 2019 12:36:11 +0200 Subject: [PATCH 07/35] Rio: remember read/write error conditions. --- src/rdb.c | 1 - src/rio.c | 4 ++++ src/rio.h | 33 ++++++++++++++++++++++++++++----- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 5111b6b88..e94f2934a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2168,7 +2168,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { eoferr: /* unexpected end of file is handled here with a fatal exit */ serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); -err: return C_ERR; } diff --git a/src/rio.c b/src/rio.c index 5359bc3d6..bdbc5d0e9 100644 --- a/src/rio.c +++ b/src/rio.c @@ -92,6 +92,7 @@ static const rio rioBufferIO = { rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -145,6 +146,7 @@ static const rio rioFileIO = { rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -239,6 +241,7 @@ static const rio rioFdIO = { rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -374,6 +377,7 @@ static const rio rioFdsetIO = { rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ diff --git a/src/rio.h b/src/rio.h index beea06888..66036b5bd 100644 --- a/src/rio.h +++ b/src/rio.h @@ -1,6 +1,6 @@ /* * Copyright (c) 2009-2012, Pieter Noordhuis - * Copyright (c) 2009-2012, Salvatore Sanfilippo + * Copyright (c) 2009-2019, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,6 +36,9 @@ #include #include "sds.h" +#define RIO_FLAG_READ_ERROR (1<<0) +#define RIO_FLAG_WRITE_ERROR (1<<1) + struct _rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return @@ -51,8 +54,8 @@ struct _rio { * computation. */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); - /* The current checksum */ - uint64_t cksum; + /* The current checksum and flags (see RIO_FLAG_*) */ + uint64_t cksum, flags; /* number of bytes read or written */ size_t processed_bytes; @@ -102,8 +105,10 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); - if (r->write(r,buf,bytes_to_write) == 0) + if (r->write(r,buf,bytes_to_write) == 0) { + r->flags |= RIO_FLAG_WRITE_ERROR; return 0; + } buf = (char*)buf + bytes_to_write; len -= bytes_to_write; r->processed_bytes += bytes_to_write; @@ -114,8 +119,10 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { static inline size_t rioRead(rio *r, void *buf, size_t len) { while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; - if (r->read(r,buf,bytes_to_read) == 0) + if (r->read(r,buf,bytes_to_read) == 0) { + r->flags |= RIO_FLAG_READ_ERROR; return 0; + } if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); buf = (char*)buf + bytes_to_read; len -= bytes_to_read; @@ -132,6 +139,22 @@ static inline int rioFlush(rio *r) { return r->flush(r); } +/* This function allows to know if there was a read error in any past + * operation, since the rio stream was created or since the last call + * to rioClearError(). */ +static inline int rioGetReadError(rio *r) { + return (r->flags & RIO_FLAG_READ_ERROR) != 0; +} + +/* Like rioGetReadError() but for write errors. */ +static inline int rioGetWriteError(rio *r) { + return (r->flags & RIO_FLAG_READ_ERROR) != 0; +} + +static inline void rioClearErrors(rio *r) { + r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFd(rio *r, int fd, size_t read_limit); From 5189db3d818a7058283bad4f53fe88a59ef76ff1 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 17 Jul 2019 12:45:01 +0200 Subject: [PATCH 08/35] RDB: modify rdbReportError() var name for clarity. --- src/rdb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index e94f2934a..6d5c1f2e5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -43,16 +43,16 @@ #include /* This macro is called when the internal RDB stracture is corrupt */ -#define rdbExitReportCorruptRDB(...) rdbReportReadError(0, __LINE__,__VA_ARGS__) +#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__) /* This macro is called when RDB read failed (possibly a short read) */ -#define rdbReportReadError(...) rdbReportError(1, __LINE__,__VA_ARGS__) +#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__) char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); -void rdbReportError(int read_error, int linenum, char *reason, ...) { +void rdbReportError(int corruption_error, int linenum, char *reason, ...) { va_list ap; char msg[1024]; int len; @@ -65,7 +65,7 @@ void rdbReportError(int read_error, int linenum, char *reason, ...) { va_end(ap); if (!rdbCheckMode) { - if (rdbFileBeingLoaded || !read_error) { + if (rdbFileBeingLoaded || corruption_error) { serverLog(LL_WARNING, "%s", msg); char *argv[2] = {"",rdbFileBeingLoaded}; redis_check_rdb_main(2,argv,NULL); From 6b72b04a37e7a17bb1adc822d0685f6fd39886ff Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 17 Jul 2019 16:46:04 +0200 Subject: [PATCH 09/35] Rio: when in error condition avoid doing the operation. --- src/rio.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rio.h b/src/rio.h index 66036b5bd..6199bb039 100644 --- a/src/rio.h +++ b/src/rio.h @@ -102,6 +102,7 @@ typedef struct _rio rio; * if needed. */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { + if (r->flags & RIO_FLAG_WRITE_ERROR) return 0; while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); @@ -117,6 +118,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { } static inline size_t rioRead(rio *r, void *buf, size_t len) { + if (r->flags & RIO_FLAG_READ_ERROR) return 0; while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->read(r,buf,bytes_to_read) == 0) { From 42b6305964cb3a5461402652773ceacf5dec49fd Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 17 Jul 2019 17:30:02 +0200 Subject: [PATCH 10/35] RDB: try to make error handling code more readable. --- src/rdb.c | 86 ++++++++++++++++++++++--------------------- src/rdb.h | 4 +- src/redis-check-rdb.c | 9 +++-- 3 files changed, 52 insertions(+), 47 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 6d5c1f2e5..ac9973385 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -101,12 +101,13 @@ int rdbLoadType(rio *rdb) { /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS - * opcode. */ -int rdbLoadTime(rio *rdb, time_t *t) { + * opcode. On error -1 is returned, however this could be a valid time, so + * to check for loading errors the caller should call rioGetReadError() after + * calling this function. */ +time_t rdbLoadTime(rio *rdb) { int32_t t32; - if (rioRead(rdb,&t32,4) == 0) return C_ERR; - *t = (time_t)t32; - return C_OK;; + if (rioRead(rdb,&t32,4) == 0) return -1; + return (time_t)t32; } int rdbSaveMillisecondTime(rio *rdb, long long t) { @@ -125,14 +126,17 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { * after upgrading to Redis version 5 they will no longer be able to load their * own old RDB files. Because of that, we instead fix the function only for new * RDB versions, and load older RDB versions as we used to do in the past, - * allowing big endian systems to load their own old RDB files. */ -int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver) { + * allowing big endian systems to load their own old RDB files. + * + * On I/O error the function returns LLONG_MAX, however if this is also a + * valid stored value, the caller should use rioGetReadError() to check for + * errors after calling this function. */ +long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { int64_t t64; - if (rioRead(rdb,&t64,8) == 0) return C_ERR; + if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX; if (rdbver >= 9) /* Check the top comment of this function. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ - *t = (long long)t64; - return C_OK; + return (long long)t64; } /* Saves an encoded length. The first two bits in the first byte are used to @@ -1692,15 +1696,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { rdbExitReportCorruptRDB("Listpack re-added with existing key"); } /* Load total number of items inside the stream. */ - if (rdbLoadLenByRef(rdb,NULL,&s->length)) { - rdbReportReadError("Stream item count loading failed."); - decrRefCount(o); - return NULL; - } + s->length = rdbLoadLen(rdb,NULL); + /* Load the last entry ID. */ - if (rdbLoadLenByRef(rdb,NULL,&s->last_id.ms) || - rdbLoadLenByRef(rdb,NULL,&s->last_id.seq)) { - rdbReportReadError("Stream last entry ID loading failed."); + s->last_id.ms = rdbLoadLen(rdb,NULL); + s->last_id.seq = rdbLoadLen(rdb,NULL); + + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream object metadata loading failed."); decrRefCount(o); return NULL; } @@ -1724,13 +1727,16 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { decrRefCount(o); return NULL; } - if (rdbLoadLenByRef(rdb,NULL,&cg_id.ms) || - rdbLoadLenByRef(rdb,NULL,&cg_id.seq)) { + + cg_id.ms = rdbLoadLen(rdb,NULL); + cg_id.seq = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { rdbReportReadError("Stream cgroup ID loading failed."); sdsfree(cgname); decrRefCount(o); return NULL; } + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); if (cgroup == NULL) rdbExitReportCorruptRDB("Duplicated consumer group name %s", @@ -1756,14 +1762,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { return NULL; } streamNACK *nack = streamCreateNACK(NULL); - if (rdbLoadMillisecondTime(rdb, &nack->delivery_time,RDB_VERSION) == C_ERR) { - rdbReportReadError("Stream PEL nack loading failed."); - decrRefCount(o); - streamFreeNACK(nack); - return NULL; - } - if ((nack->delivery_count = rdbLoadLen(rdb,NULL)) == RDB_LENERR) { - rdbReportReadError("Stream nack deliveries loading failed."); + nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + nack->delivery_count = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream PEL NACK loading failed."); decrRefCount(o); streamFreeNACK(nack); return NULL; @@ -1792,7 +1794,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1); sdsfree(cname); - if (rdbLoadMillisecondTime(rdb, &consumer->seen_time,RDB_VERSION) == C_ERR) { + consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rioGetReadError(rdb)) { rdbReportReadError("Stream short read reading seen time."); decrRefCount(o); return NULL; @@ -1802,14 +1805,16 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * consumer. */ pel_size = rdbLoadLen(rdb,NULL); if (pel_size == RDB_LENERR) { - rdbReportReadError("Stream consumer PEL num loading failed."); + rdbReportReadError( + "Stream consumer PEL num loading failed."); decrRefCount(o); return NULL; } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { - rdbReportReadError("Stream short read reading PEL streamID."); + rdbReportReadError( + "Stream short read reading PEL streamID."); decrRefCount(o); return NULL; } @@ -1830,10 +1835,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { - uint64_t moduleid; - if (rdbLoadLenByRef(rdb,NULL, &moduleid)) { - return NULL; - } + uint64_t moduleid = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) return NULL; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -1977,14 +1980,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - time_t t; - if (rdbLoadTime(rdb, &t) == C_ERR) goto eoferr; - expiretime = t * 1000; + expiretime = rdbLoadTime(rdb); + expiretime *= 1000; + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ - if (rdbLoadMillisecondTime(rdb, &expiretime, rdbver) == C_ERR) goto eoferr; + expiretime = rdbLoadMillisecondTime(rdb,rdbver); + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ @@ -2093,8 +2097,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * we have the ability to read a MODULE_AUX opcode followed by an * identifier of the module, and a serialized value in "MODULE V2" * format. */ - uint64_t moduleid; - if (rdbLoadLenByRef(rdb,NULL,&moduleid)) goto eoferr; + uint64_t moduleid = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) goto eoferr; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; moduleTypeNameByID(name,moduleid); diff --git a/src/rdb.h b/src/rdb.h index 2bec0171b..0acddf9ab 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -127,10 +127,10 @@ int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); -int rdbLoadTime(rio *rdb, time_t *t); +time_t rdbLoadTime(rio *rdb); int rdbSaveLen(rio *rdb, uint64_t len); int rdbSaveMillisecondTime(rio *rdb, long long t); -int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver); +long long rdbLoadMillisecondTime(rio *rdb, int rdbver); uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index fd3f07b0a..5e7415046 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -212,19 +212,20 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { /* Handle special types. */ if (type == RDB_OPCODE_EXPIRETIME) { - time_t t; rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if (rdbLoadTime(&rdb, &t) == C_ERR) goto eoferr; - expiretime = t * 1000; + expiretime = rdbLoadTime(&rdb); + expiretime *= 1000; + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; - if (rdbLoadMillisecondTime(&rdb, &expiretime, rdbver) == C_ERR) goto eoferr; + expiretime = rdbLoadMillisecondTime(&rdb, rdbver); + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ From f59119aad68e92f7c08c37ab0dac9355be741198 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Thu, 18 Jul 2019 16:46:19 +0800 Subject: [PATCH 11/35] Client side caching: filter clients untracking --- src/tracking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index f3ff7ed03..84b29d28a 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -167,7 +167,7 @@ void trackingInvalidateKey(robj *keyobj) { uint64_t id; memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); - if (c == NULL) continue; + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; sendTrackingMessage(c,hash); } raxStop(&ri); From 947319cadd2d239eae9e269ed2f6ef6f240bee95 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 18 Jul 2019 12:37:55 +0200 Subject: [PATCH 12/35] RDB: update rdbLoadRio comment about EOF condition. --- src/rdb.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index ac9973385..e8d175ca2 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2169,8 +2169,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } return C_OK; -eoferr: /* unexpected end of file is handled here with a fatal exit */ - serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); + /* Unexpected end of file is handled here calling rdbReportReadError(): + * this will in turn either abort Redis in most cases, or if we are loading + * the RDB file from a socket during initial SYNC (diskless replica mode), + * we'll report the error to the caller, so that we can retry. */ +eoferr: + serverLog(LL_WARNING, + "Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); return C_ERR; } From bd0f06c18ccea62cb5e0fff018a5eaa876d3f90e Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 18 Jul 2019 18:51:45 +0200 Subject: [PATCH 13/35] RDB: handle encoding errors with rdbExitReportCorruptRDB(). Without such change, the diskless replicas, when loading RDB files from the socket will not abort when a broken RDB file gets loaded. This is potentially unsafe, because right now Redis is not able to guarantee that encoding errors are safe from the POV of memory corruptions (for instance the LZF library may not be safe against untrusted data?) so better to abort when the RDB file we are going to load is corrupted. Instead I/O errors are still returned to the caller without aborting, so that in case of short read the diskless replica can try again. --- src/rdb.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index e8d175ca2..226324627 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -214,8 +214,9 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) { if (rioRead(rdb,&len,8) == 0) return -1; *lenptr = ntohu64(len); } else { - serverLog(LL_WARNING, "Unknown length encoding %d in rdbLoadLen()",type); - return -1; + rdbExitReportCorruptRDB( + "Unknown length encoding %d in rdbLoadLen()",type); + return -1; /* Never reached. */ } return 0; } @@ -281,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); val = (int32_t)v; } else { - serverLog(LL_WARNING, "Unknown RDB integer encoding type %d", enctype); - return NULL; + rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); + return NULL; /* Never reached. */ } if (plain || sds) { char buf[LONG_STR_SIZE], *p; @@ -499,8 +500,8 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { case RDB_ENC_LZF: return rdbLoadLzfStringObject(rdb,flags,lenptr); default: - serverLog(LL_WARNING, "Unknown RDB encoding type %llu", (unsigned long long)len); - return NULL; + rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); + return NULL; /* Never reached. */ } } From 5f450e49280ce01d5329798d87136f109761433a Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 18 Jul 2019 18:59:38 +0200 Subject: [PATCH 14/35] RDB: make sure to abort on LZF encoding error. --- src/rdb.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 226324627..97fad2573 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -386,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { /* Load the compressed representation and uncompress it to target. */ if (rioRead(rdb,c,clen) == 0) goto err; if (lzf_decompress(c,clen,val,len) == 0) { - if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string"); - goto err; + rdbExitReportCorruptRDB("Invalid LZF compressed string"); } zfree(c); From bc5cb168f504c188c7e67ca61853fd73c341fa62 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 19 Jul 2019 11:12:39 +0200 Subject: [PATCH 15/35] RDB: fix MODULE_AUX loading by continuing to next opcode. Thanks to @JohnSully for noticing this problem. --- src/rdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rdb.c b/src/rdb.c index 97fad2573..2118a00f4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2116,6 +2116,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* RDB check mode. */ robj *aux = rdbLoadCheckModuleValue(rdb,name); decrRefCount(aux); + continue; /* Read next opcode. */ } } From e00442eb139c833f35406440a956e03679842034 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 11:47:44 +0200 Subject: [PATCH 16/35] Clinet side caching: take count of used caching slots. --- src/tracking.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 84b29d28a..5938efade 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -60,6 +60,7 @@ * use the most significant bits instead of the full 24 bits. */ #define TRACKING_TABLE_SIZE (1<<24) rax **TrackingTable = NULL; +unsigned long TrackingTableUsedSlots = 0; robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much @@ -109,8 +110,10 @@ void trackingRememberKeys(client *c) { sds sdskey = c->argv[idx]->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) + if (TrackingTable[hash] == NULL) { TrackingTable[hash] = raxNew(); + TrackingTableUsedSlots++; + } raxTryInsert(TrackingTable[hash], (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); } @@ -176,6 +179,7 @@ void trackingInvalidateKey(robj *keyobj) { * again if more keys will be modified in this hash slot. */ raxFree(TrackingTable[hash]); TrackingTable[hash] = NULL; + TrackingTableUsedSlots--; } void trackingInvalidateKeysOnFlush(int dbid) { From f850201c6407d5504b08f2630f525a353a7ef740 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 12:10:51 +0200 Subject: [PATCH 17/35] Client side caching: don't hash the key if not needed. --- src/tracking.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index 5938efade..7ee4d0b2c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -156,12 +156,15 @@ void sendTrackingMessage(client *c, long long hash) { /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is - * to send a notification to every client that may have keys about such . */ + * to send a notification to every client that may have keys about such caching + * slot. */ void trackingInvalidateKey(robj *keyobj) { + if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; + sds sdskey = keyobj->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable == NULL || TrackingTable[hash] == NULL) return; + if (TrackingTable[hash] == NULL) return; raxIterator ri; raxStart(&ri,TrackingTable[hash]); From 842b44dc46cc143fb28f8cadbff1fda9c81e09e2 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 12:29:54 +0200 Subject: [PATCH 18/35] Client side caching: call the invalidation functions always. Otherwise what happens is that the tracking table will never get garbage collected if there are no longer clients with tracking enabled. Now the invalidation function immediately checks if there is any table allocated, otherwise it returns ASAP, so the overhead when the feature is not used should be near zero. --- src/db.c | 4 ++-- src/expire.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db.c b/src/db.c index 568e0b8de..cc31a117b 100644 --- a/src/db.c +++ b/src/db.c @@ -412,12 +412,12 @@ long long dbTotalServerKeyCount() { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); - if (server.tracking_clients) trackingInvalidateKey(key); + trackingInvalidateKey(key); } void signalFlushedDb(int dbid) { touchWatchedKeysOnFlush(dbid); - if (server.tracking_clients) trackingInvalidateKeysOnFlush(dbid); + trackingInvalidateKeysOnFlush(dbid); } /*----------------------------------------------------------------------------- diff --git a/src/expire.c b/src/expire.c index b23117a3c..598b27f96 100644 --- a/src/expire.c +++ b/src/expire.c @@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); - if (server.tracking_clients) trackingInvalidateKey(keyobj); + trackingInvalidateKey(keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; From 7d3992a80aa7ba744aa446bc7292c7b7a5616e0b Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 12:31:46 +0200 Subject: [PATCH 19/35] Client side caching: reclaim the tracking table on FLUSHALL. --- src/tracking.c | 52 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index 7ee4d0b2c..dc2934831 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -185,17 +185,53 @@ void trackingInvalidateKey(robj *keyobj) { TrackingTableUsedSlots--; } +/* This function is called when one or all the Redis databases are flushed + * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for + * each DB but are global: currently what we do is sending a special + * notification to clients with tracking enabled, invalidating the caching + * slot "-1", which means, "all the keys", in order to avoid flooding clients + * with many invalidation messages for all the keys they may hold. + * + * However trying to flush the tracking table here is very costly: + * we need scanning 16 million caching slots in the table to check + * if they are used, this introduces a big delay. So what we do is to really + * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead + * we just send the invalidation message to all the clients, but don't + * flush the table: it will slowly get garbage collected as more keys + * are modified in the used caching slots. */ void trackingInvalidateKeysOnFlush(int dbid) { UNUSED(dbid); - if (server.tracking_clients == 0) return; - listNode *ln; - listIter li; - listRewind(server.clients,&li); - while ((ln = listNext(&li)) != NULL) { - client *c = listNodeValue(ln); - if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,-1); + if (server.tracking_clients) { + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + sendTrackingMessage(c,-1); + } + } + } + + /* In case of FLUSHALL, reclaim all the memory used by tracking. */ + if (dbid == -1 && TrackingTable) { + for (int j = 0; j < TRACKING_TABLE_SIZE; j++) { + if (TrackingTable[j] != NULL) { + raxFree(TrackingTable[j]); + TrackingTable[j] = NULL; + TrackingTableUsedSlots--; + if (TrackingTableUsedSlots == 0) break; + } + } + + /* If there are no clients with tracking enabled, we can even + * reclaim the memory used by the table itself. The code assumes + * the table is allocated only if there is at least one client alive + * with tracking enabled. */ + if (server.tracking_clients == 0) { + zfree(TrackingTable); + TrackingTable = NULL; } } } From 436d02dd7bbbacc3dc2132db30bb061ee108da8f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 12:37:43 +0200 Subject: [PATCH 20/35] Move signalFlushedDb() into a better place. Now that the call also invalidates client side caching slots, it is important that after an internal flush operation we both send the notifications to the clients and, at the same time, are able to reclaim the memory of the tracking table. This may even fix a few edge cases related to MULTI/EXEC + WATCH during resync, not sure, but in general looks more correct. --- src/db.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/db.c b/src/db.c index cc31a117b..3d8b844e8 100644 --- a/src/db.c +++ b/src/db.c @@ -378,6 +378,7 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( } } if (dbnum == -1) flushSlaveKeysWithExpireList(); + signalFlushedDb(dbnum); return removed; } @@ -453,7 +454,6 @@ void flushdbCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(c->db->id); server.dirty += emptyDb(c->db->id,flags,NULL); addReply(c,shared.ok); } @@ -465,7 +465,6 @@ void flushallCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(-1); server.dirty += emptyDb(-1,flags,NULL); addReply(c,shared.ok); if (server.rdb_child_pid != -1) killRDBChild(); From 09c06698e99e2745d93e8d4595fba58b3763b22f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 18:45:47 +0200 Subject: [PATCH 21/35] Client side caching: redis-cli ability to enable tracking. This is extremely useful in order to simulate an high load of requests about different keys, and force Redis to track a lot of informations about several clients, to simulate real world workloads. --- src/redis-benchmark.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 1d16fa4ee..2df41580b 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -104,6 +104,7 @@ static struct config { int is_fetching_slots; int is_updating_slots; int slots_last_update; + int enable_tracking; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; @@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port, goto fail; } - if(config.auth){ + if(config.auth) { void *authReply = NULL; redisAppendCommand(c, "AUTH %s", config.auth); if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; @@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } + if (config.enable_tracking) { + char *buf = NULL; + int len = redisFormatCommand(&buf, "CLIENT TRACKING on"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + /* If a DB number different than zero is selected, prefix our request * buffer with the SELECT command, that will be discarded the first * time the replies are received, so if the client is reused the @@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) { } else if (config.num_threads < 0) config.num_threads = 0; } else if (!strcmp(argv[i],"--cluster")) { config.cluster_mode = 1; + } else if (!strcmp(argv[i],"--enable-tracking")) { + config.enable_tracking = 1; } else if (!strcmp(argv[i],"--help")) { exit_status = 0; goto usage; @@ -1380,6 +1391,7 @@ usage: " --dbnum SELECT the specified db number (default 0)\n" " --threads Enable multi-thread mode.\n" " --cluster Enable cluster mode.\n" +" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD\n" " Using this option the benchmark will expand the string __rand_int__\n" @@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) { config.is_fetching_slots = 0; config.is_updating_slots = 0; config.slots_last_update = 0; + config.enable_tracking = 0; i = parseOptions(argc,argv); argc -= i; From c41f94d2a3d9ca33ceefcdf67cc50c949d3a9657 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 22 Jul 2019 18:59:53 +0200 Subject: [PATCH 22/35] Client side caching: split invalidation into key / slot. --- src/server.c | 4 ++++ src/server.h | 1 + src/tracking.c | 58 +++++++++++++++++++++++++++++++++----------------- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/src/server.c b/src/server.c index 4337b8f01..5e18b5465 100644 --- a/src/server.c +++ b/src/server.c @@ -3401,6 +3401,10 @@ int processCommand(client *c) { } } + /* Make sure to use a reasonable amount of memory for client side + * caching metadata. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); diff --git a/src/server.h b/src/server.h index b200a6696..fdab53830 100644 --- a/src/server.h +++ b/src/server.h @@ -1639,6 +1639,7 @@ void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); +void trackingLimitUsedSlots(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index dc2934831..1189e82d5 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -154,6 +154,30 @@ void sendTrackingMessage(client *c, long long hash) { } } +/* Invalidates a caching slot: this is actually the low level implementation + * of the API that Redis calls externally, that is trackingInvalidateKey(). */ +void trackingInvalidateSlot(uint64_t slot) { + if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; + + raxIterator ri; + raxStart(&ri,TrackingTable[slot]); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + uint64_t id; + memcpy(&id,ri.key,ri.key_len); + client *c = lookupClientByID(id); + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + sendTrackingMessage(c,slot); + } + raxStop(&ri); + + /* Free the tracking table: we'll create the radix tree and populate it + * again if more keys will be modified in this caching slot. */ + raxFree(TrackingTable[slot]); + TrackingTable[slot] = NULL; + TrackingTableUsedSlots--; +} + /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching @@ -164,25 +188,7 @@ void trackingInvalidateKey(robj *keyobj) { sds sdskey = keyobj->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) return; - - raxIterator ri; - raxStart(&ri,TrackingTable[hash]); - raxSeek(&ri,"^",NULL,0); - while(raxNext(&ri)) { - uint64_t id; - memcpy(&id,ri.key,ri.key_len); - client *c = lookupClientByID(id); - if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,hash); - } - raxStop(&ri); - - /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this hash slot. */ - raxFree(TrackingTable[hash]); - TrackingTable[hash] = NULL; - TrackingTableUsedSlots--; + trackingInvalidateSlot(hash); } /* This function is called when one or all the Redis databases are flushed @@ -235,3 +241,17 @@ void trackingInvalidateKeysOnFlush(int dbid) { } } } + +/* Tracking forces Redis to remember information about which client may have + * keys about certian caching slots. In workloads where there are a lot of + * reads, but keys are hardly modified, the amount of information we have + * to remember server side could be a lot: for each 16 millions of caching + * slots we may end with a radix tree containing many entries. + * + * So Redis allows the user to configure a maximum fill rate for the + * invalidation table. This function makes sure that we don't go over the + * specified fill rate: if we are over, we can just evict informations about + * random caching slots, and send invalidation messages to clients like if + * the key was modified. */ +void trackingLimitUsedSlots(void) { +} From 3b6aeea44cf8bdc64214a5f145da55453722a9a2 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 21 Jul 2019 17:41:03 +0300 Subject: [PATCH 23/35] Implement module api for aux data in rdb Other changes: * fix memory leak in error handling of rdb loading of type OBJ_MODULE --- runtest-moduleapi | 2 +- src/module.c | 41 ++++++ src/rdb.c | 87 ++++++++++-- src/rdb.h | 1 + src/redismodule.h | 11 +- src/server.h | 10 ++ tests/modules/Makefile | 6 +- tests/modules/testrdb.c | 229 +++++++++++++++++++++++++++++++ tests/unit/moduleapi/testrdb.tcl | 62 +++++++++ 9 files changed, 431 insertions(+), 18 deletions(-) create mode 100644 tests/modules/testrdb.c create mode 100644 tests/unit/moduleapi/testrdb.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index 84cdb9bb8..8e1c0cb23 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -13,4 +13,4 @@ then fi make -C tests/modules && \ -$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/testrdb "${@}" diff --git a/src/module.c b/src/module.c index f4f753c00..812a54e04 100644 --- a/src/module.c +++ b/src/module.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "rdb.h" #include #define REDISMODULE_CORE 1 @@ -3078,6 +3079,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + struct { + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; + } v2; } *tms = (struct typemethods*) typemethods_ptr; moduleType *mt = zcalloc(sizeof(*mt)); @@ -3089,6 +3095,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->mem_usage = tms->mem_usage; mt->digest = tms->digest; mt->free = tms->free; + if (tms->version >= 2) { + mt->aux_load = tms->v2.aux_load; + mt->aux_save = tms->v2.aux_save; + mt->aux_save_triggers = tms->v2.aux_save_triggers; + } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); return mt; @@ -3355,6 +3366,36 @@ loaderr: return 0; /* Never reached. */ } +/* Iterate over modules, and trigger rdb aux saving for the ones modules types + * who asked for it. */ +ssize_t rdbSaveModulesAux(rio *rdb, int when) { + size_t total_written = 0; + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + listIter li; + listNode *ln; + + listRewind(module->types,&li); + while((ln = listNext(&li))) { + moduleType *mt = ln->value; + if (!mt->aux_save || !(mt->aux_save_triggers & when)) + continue; + ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt); + if (ret==-1) { + dictReleaseIterator(di); + return -1; + } + total_written += ret; + } + } + + dictReleaseIterator(di); + return total_written; +} + /* -------------------------------------------------------------------------- * Key digest API (DEBUG DIGEST interface for modules types) * -------------------------------------------------------------------------- */ diff --git a/src/rdb.c b/src/rdb.c index 2118a00f4..4e00fad67 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -971,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,rdb,key); /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ @@ -980,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { io.bytes += retval; /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,key); mt->rdb_save(&io,mv->value); retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); - if (retval == -1) return -1; - io.bytes += retval; + if (retval == -1) + io.error = 1; + else + io.bytes += retval; if (io.ctx) { moduleFreeContext(io.ctx); @@ -1101,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { + /* Save a module-specific aux value. */ + RedisModuleIO io; + int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX); + + /* Write the "module" identifier as prefix, so that we'll be able + * to call the right module during loading. */ + retval = rdbSaveLen(rdb,mt->id); + if (retval == -1) return -1; + io.bytes += retval; + + /* write the 'when' so that we can provide it on loading */ + retval = rdbSaveLen(rdb,when); + if (retval == -1) return -1; + io.bytes += retval; + + /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,NULL); + mt->aux_save(&io,when); + retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); + if (retval == -1) + io.error = 1; + else + io.bytes += retval; + + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + if (io.error) + return -1; + return io.bytes; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1122,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -1183,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { di = NULL; /* So that we don't release it again on error. */ } + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; + /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; @@ -2089,15 +2128,11 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(auxval); continue; /* Read type again. */ } else if (type == RDB_OPCODE_MODULE_AUX) { - /* This is just for compatibility with the future: we have plans - * to add the ability for modules to store anything in the RDB - * file, like data that is not related to the Redis key space. - * Such data will potentially be stored both before and after the - * RDB keys-values section. For this reason since RDB version 9, - * we have the ability to read a MODULE_AUX opcode followed by an - * identifier of the module, and a serialized value in "MODULE V2" - * format. */ + /* Load module data that is not related to the Redis key space. + * Such data can be potentially be stored both before and after the + * RDB keys-values section. */ uint64_t moduleid = rdbLoadLen(rdb,NULL); + int when = rdbLoadLen(rdb,NULL); if (rioGetReadError(rdb)) goto eoferr; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -2108,10 +2143,32 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name); exit(1); } else if (!rdbCheckMode && mt != NULL) { - /* This version of Redis actually does not know what to do - * with modules AUX data... */ - serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name); - exit(1); + if (!mt->aux_load) { + /* Module doesn't support AUX. */ + serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name); + exit(1); + } + + RedisModuleIO io; + moduleInitIOContext(io,mt,rdb,NULL); + io.ver = 2; + /* Call the rdb_load method of the module providing the 10 bit + * encoding version in the lower 10 bits of the module ID. */ + if (mt->aux_load(&io,moduleid&1023, when) || io.error) { + moduleTypeNameByID(name,moduleid); + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); + exit(1); + } + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof != RDB_MODULE_OPCODE_EOF) { + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name); + exit(1); + } + continue; } else { /* RDB check mode. */ robj *aux = rdbLoadCheckModuleValue(rdb,name); diff --git a/src/rdb.h b/src/rdb.h index 0acddf9ab..40a50f7ba 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); diff --git a/src/redismodule.h b/src/redismodule.h index b9c73957b..21227fe3a 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -129,6 +129,10 @@ #define REDISMODULE_NOT_USED(V) ((void) V) +/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + /* This type represents a timer handle, and is returned when a timer is * registered and used in order to invalidate a timer. It's just a 64 bit * number, because this is how each timer is represented inside the radix tree @@ -166,6 +170,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); +typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); +typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); @@ -174,7 +180,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); -#define REDISMODULE_TYPE_METHOD_VERSION 1 +#define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { uint64_t version; RedisModuleTypeLoadFunc rdb_load; @@ -183,6 +189,9 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeMemUsageFunc mem_usage; RedisModuleTypeDigestFunc digest; RedisModuleTypeFreeFunc free; + RedisModuleTypeAuxLoadFunc aux_load; + RedisModuleTypeAuxSaveFunc aux_save; + int aux_save_triggers; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ diff --git a/src/server.h b/src/server.h index b200a6696..33522dd69 100644 --- a/src/server.h +++ b/src/server.h @@ -536,6 +536,10 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK) #define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS) +/* Bit flags for moduleTypeAuxSaveFunc */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + struct RedisModule; struct RedisModuleIO; struct RedisModuleDigest; @@ -548,6 +552,8 @@ struct redisObject; * is deleted. */ typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver); typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value); +typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when); +typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when); typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value); typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value); typedef size_t (*moduleTypeMemUsageFunc)(const void *value); @@ -564,6 +570,9 @@ typedef struct RedisModuleType { moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ } moduleType; @@ -1528,6 +1537,7 @@ void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +ssize_t rdbSaveModulesAux(rio *rdb, int when); /* Utils */ long long ustime(void); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 014d20afa..6e4574747 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -13,12 +13,16 @@ endif .SUFFIXES: .c .so .xo .o -all: commandfilter.so +all: commandfilter.so testrdb.so .c.xo: $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ commandfilter.xo: ../../src/redismodule.h +testrdb.xo: ../../src/redismodule.h commandfilter.so: commandfilter.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + +testrdb.so: testrdb.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c new file mode 100644 index 000000000..415497a2f --- /dev/null +++ b/tests/modules/testrdb.c @@ -0,0 +1,229 @@ +#include "redismodule.h" + +#include +#include + +/* Module configuration, save aux or not? */ +long long conf_aux_count = 0; + +/* Registered type */ +RedisModuleType *testrdb_type = NULL; + +/* Global values to store and persist to aux */ +RedisModuleString *before_str = NULL; +RedisModuleString *after_str = NULL; + +void *testrdb_type_load(RedisModuleIO *rdb, int encver) { + int count = RedisModule_LoadSigned(rdb); + assert(count==1); + assert(encver==1); + RedisModuleString *str = RedisModule_LoadString(rdb); + return str; +} + +void testrdb_type_save(RedisModuleIO *rdb, void *value) { + RedisModuleString *str = (RedisModuleString*)value; + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, str); +} + +void testrdb_aux_save(RedisModuleIO *rdb, int when) { + if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (conf_aux_count==0) assert(0); + if (when == REDISMODULE_AUX_BEFORE_RDB) { + if (before_str) { + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, before_str); + } else { + RedisModule_SaveSigned(rdb, 0); + } + } else { + if (after_str) { + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, after_str); + } else { + RedisModule_SaveSigned(rdb, 0); + } + } +} + +int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { + assert(encver == 1); + if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (conf_aux_count==0) assert(0); + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); + if (when == REDISMODULE_AUX_BEFORE_RDB) { + if (before_str) + RedisModule_FreeString(ctx, before_str); + before_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (count) + before_str = RedisModule_LoadString(rdb); + } else { + if (after_str) + RedisModule_FreeString(ctx, after_str); + after_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (count) + after_str = RedisModule_LoadString(rdb); + } + return REDISMODULE_OK; +} + +void testrdb_type_free(void *value) { + RedisModule_FreeString(NULL, (RedisModuleString*)value); +} + +int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + if (before_str) + RedisModule_FreeString(ctx, before_str); + before_str = argv[1]; + RedisModule_RetainString(ctx, argv[1]); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + if (argc != 1){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + if (before_str) + RedisModule_ReplyWithString(ctx, before_str); + else + RedisModule_ReplyWithStringBuffer(ctx, "", 0); + return REDISMODULE_OK; +} + +int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + if (after_str) + RedisModule_FreeString(ctx, after_str); + after_str = argv[1]; + RedisModule_RetainString(ctx, argv[1]); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + if (argc != 1){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + if (after_str) + RedisModule_ReplyWithString(ctx, after_str); + else + RedisModule_ReplyWithStringBuffer(ctx, "", 0); + return REDISMODULE_OK; +} + +int testrdb_set_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModuleString *str = RedisModule_ModuleTypeGetValue(key); + if (str) + RedisModule_FreeString(ctx, str); + RedisModule_ModuleTypeSetValue(key, testrdb_type, argv[2]); + RedisModule_RetainString(ctx, argv[2]); + RedisModule_CloseKey(key); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModuleString *str = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + RedisModule_ReplyWithString(ctx, str); + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (argc > 0) + RedisModule_StringToLongLong(argv[0], &conf_aux_count); + + if (conf_aux_count==0) { + RedisModuleTypeMethods datatype_methods = { + .version = 1, + .rdb_load = testrdb_type_load, + .rdb_save = testrdb_type_save, + .aof_rewrite = NULL, + .digest = NULL, + .free = testrdb_type_free, + }; + + testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); + if (testrdb_type == NULL) + return REDISMODULE_ERR; + } else { + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = testrdb_type_load, + .rdb_save = testrdb_type_save, + .aof_rewrite = NULL, + .digest = NULL, + .free = testrdb_type_free, + .aux_load = testrdb_aux_load, + .aux_save = testrdb_aux_save, + .aux_save_triggers = (conf_aux_count == 1 ? + REDISMODULE_AUX_AFTER_RDB : + REDISMODULE_AUX_BEFORE_RDB | REDISMODULE_AUX_AFTER_RDB) + }; + + testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); + if (testrdb_type == NULL) + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"testrdb.set.before", testrdb_set_before,"deny-oom",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.after", testrdb_get_after,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.set.key", testrdb_set_key,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.key", testrdb_get_key,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl new file mode 100644 index 000000000..22201a08e --- /dev/null +++ b/tests/unit/moduleapi/testrdb.tcl @@ -0,0 +1,62 @@ +set testmodule [file normalize tests/modules/testrdb.so] + +proc restart_and_wait {} { + catch { + r debug restart + } + + # wait for the server to come back up + set retry 50 + while {$retry} { + if {[catch { r ping }]} { + after 100 + } else { + break + } + incr retry -1 + } +} + +tags "modules" { + start_server [list overrides [list loadmodule "$testmodule"]] { + test {modules are able to persist types} { + r testrdb.set.key key1 value1 + assert_equal "value1" [r testrdb.get.key key1] + r debug reload + assert_equal "value1" [r testrdb.get.key key1] + } + + test {modules global are lost without aux} { + r testrdb.set.before global1 + assert_equal "global1" [r testrdb.get.before] + restart_and_wait + assert_equal "" [r testrdb.get.before] + } + } + + start_server [list overrides [list loadmodule "$testmodule 2"]] { + test {modules are able to persist globals before and after} { + r testrdb.set.before global1 + r testrdb.set.after global2 + assert_equal "global1" [r testrdb.get.before] + assert_equal "global2" [r testrdb.get.after] + restart_and_wait + assert_equal "global1" [r testrdb.get.before] + assert_equal "global2" [r testrdb.get.after] + } + + } + + start_server [list overrides [list loadmodule "$testmodule 1"]] { + test {modules are able to persist globals just after} { + r testrdb.set.after global2 + assert_equal "global2" [r testrdb.get.after] + restart_and_wait + assert_equal "global2" [r testrdb.get.after] + } + } + + + # TODO: test short read handling + +} From 8fe173452ab28c062ecc38e31a7b51f93c6150c7 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Sat, 6 Jul 2019 18:32:58 -0700 Subject: [PATCH 24/35] Removed unecessary creation of Redis objects --- src/cluster.c | 13 +++---------- src/config.c | 6 ++---- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index c85e3791d..e22222700 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4251,12 +4251,7 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { /* CLUSTER NODES */ - robj *o; - sds ci = clusterGenNodesDescription(0); - - o = createObject(OBJ_STRING,ci); - addReplyBulk(c,o); - decrRefCount(o); + addReplyBulkSds(c,clusterGenNodesDescription(0)); } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { /* CLUSTER MYID */ addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN); @@ -4832,7 +4827,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) { * DUMP is actually not used by Redis Cluster but it is the obvious * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { - robj *o, *dumpobj; + robj *o; rio payload; /* Check if the key is here. */ @@ -4845,9 +4840,7 @@ void dumpCommand(client *c) { createDumpPayload(&payload,o,c->argv[1]); /* Transfer to the client */ - dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); - addReplyBulk(c,dumpobj); - decrRefCount(dumpobj); + addReplyBulkSds(c,payload.io.buffer.ptr); return; } diff --git a/src/config.c b/src/config.c index 7f0e9af89..ddf31d168 100644 --- a/src/config.c +++ b/src/config.c @@ -1602,12 +1602,10 @@ void configGetCommand(client *c) { matches++; } if (stringmatch(pattern,"notify-keyspace-events",1)) { - robj *flagsobj = createObject(OBJ_STRING, - keyspaceEventsFlagsToString(server.notify_keyspace_events)); + sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events); addReplyBulkCString(c,"notify-keyspace-events"); - addReplyBulk(c,flagsobj); - decrRefCount(flagsobj); + addReplyBulkSds(c,flags); matches++; } if (stringmatch(pattern,"bind",1)) { From 9012e587ca6f7c7ef54e86a782910625fe3dc3e9 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 23 Jul 2019 15:25:00 +0800 Subject: [PATCH 25/35] Client side caching: do not reclaim tracking table if it's empty --- src/tracking.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/tracking.c b/src/tracking.c index 1189e82d5..badec69a7 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -206,8 +206,6 @@ void trackingInvalidateKey(robj *keyobj) { * flush the table: it will slowly get garbage collected as more keys * are modified in the used caching slots. */ void trackingInvalidateKeysOnFlush(int dbid) { - UNUSED(dbid); - if (server.tracking_clients) { listNode *ln; listIter li; @@ -222,12 +220,11 @@ void trackingInvalidateKeysOnFlush(int dbid) { /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { - for (int j = 0; j < TRACKING_TABLE_SIZE; j++) { + for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { if (TrackingTable[j] != NULL) { raxFree(TrackingTable[j]); TrackingTable[j] = NULL; TrackingTableUsedSlots--; - if (TrackingTableUsedSlots == 0) break; } } From 9268493e8d9a67bddff9b27f0f7b3f2cd020a227 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 23 Jul 2019 10:57:22 +0200 Subject: [PATCH 26/35] Client side caching: implement full slot limit function. --- src/server.c | 3 +++ src/server.h | 2 ++ src/tracking.c | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/src/server.c b/src/server.c index 5e18b5465..1f502e4ff 100644 --- a/src/server.c +++ b/src/server.c @@ -2403,6 +2403,9 @@ void initServerConfig(void) { /* Latency monitor */ server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; + /* Tracking. */ + server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_MAX_FILL; + /* Debugging */ server.assert_failed = ""; server.assert_file = ""; diff --git a/src/server.h b/src/server.h index fdab53830..172b1cf64 100644 --- a/src/server.h +++ b/src/server.h @@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ +#define CONFIG_DEFAULT_TRACKING_MAX_FILL 10 /* 10% tracking table max fill. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -1316,6 +1317,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ + int tracking_table_max_fill; /* Max fill percentage. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/src/tracking.c b/src/tracking.c index badec69a7..1a5fbe47c 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -251,4 +251,40 @@ void trackingInvalidateKeysOnFlush(int dbid) { * random caching slots, and send invalidation messages to clients like if * the key was modified. */ void trackingLimitUsedSlots(void) { + static unsigned int timeout_counter = 0; + + if (server.tracking_table_max_fill == 0) return; /* No limits set. */ + unsigned int max_slots = + (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Limit not reached. */ + } + + /* We have to invalidate a few slots to reach the limit again. The effort + * we do here is proportional to the number of times we entered this + * function and found that we are still over the limit. */ + int effort = 100 * (timeout_counter+1); + + /* Let's start at a random position, and perform linear probing, in order + * to improve cache locality. However once we are able to find an used + * slot, jump again randomly, in order to avoid creating big holes in the + * table (that will make this funciton use more resourced later). */ + while(effort > 0) { + unsigned int idx = rand() % TRACKING_TABLE_SIZE; + do { + effort--; + idx = (idx+1) % TRACKING_TABLE_SIZE; + if (TrackingTable[idx] != NULL) { + trackingInvalidateSlot(idx); + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Return ASAP: we are again under the limit. */ + } else { + break; /* Jump to next random position. */ + } + } + } while(effort > 0); + } + timeout_counter++; } From c98e7717bb741016b87a69901327e1fafac17425 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 23 Jul 2019 11:02:14 +0200 Subject: [PATCH 27/35] Client side caching: show tracking slots usage in INFO. --- src/server.c | 6 ++++-- src/server.h | 1 + src/tracking.c | 6 ++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/server.c b/src/server.c index 1f502e4ff..0acf8545c 100644 --- a/src/server.c +++ b/src/server.c @@ -4147,7 +4147,8 @@ sds genRedisInfoString(char *section) { "active_defrag_hits:%lld\r\n" "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" - "active_defrag_key_misses:%lld\r\n", + "active_defrag_key_misses:%lld\r\n" + "tracking_used_slots:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4173,7 +4174,8 @@ sds genRedisInfoString(char *section) { server.stat_active_defrag_hits, server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, - server.stat_active_defrag_key_misses); + server.stat_active_defrag_key_misses, + trackingGetUsedSlots()); } /* Replication */ diff --git a/src/server.h b/src/server.h index 172b1cf64..e363f03f4 100644 --- a/src/server.h +++ b/src/server.h @@ -1642,6 +1642,7 @@ void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); +unsigned long long trackingGetUsedSlots(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index 1a5fbe47c..f7f0fc755 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -288,3 +288,9 @@ void trackingLimitUsedSlots(void) { } timeout_counter++; } + +/* This is just used in order to access the amount of used slots in the + * tracking table. */ +unsigned long long trackingGetUsedSlots(void) { + return TrackingTableUsedSlots; +} From 48f4cfb6016ebc9cc08963305d316516ac84aa73 Mon Sep 17 00:00:00 2001 From: wubostc <913721086@qq.com> Date: Wed, 24 Jul 2019 16:22:26 +0800 Subject: [PATCH 28/35] Reduce the calling stack --- src/t_zset.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/t_zset.c b/src/t_zset.c index fb7078abd..2680e76a9 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); - if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); - if (sdslen(ele) > server.zset_max_ziplist_value) + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || + sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; From a67d0411e7273d9a5619db1889d1143228b0d577 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 24 Jul 2019 11:33:53 +0200 Subject: [PATCH 29/35] Remove experimental warning from diskless replication. --- redis.conf | 4 ---- 1 file changed, 4 deletions(-) diff --git a/redis.conf b/redis.conf index 74b6c018f..a3c98341f 100644 --- a/redis.conf +++ b/redis.conf @@ -336,10 +336,6 @@ replica-read-only yes # Replication SYNC strategy: disk or socket. # -# ------------------------------------------------------- -# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY -# ------------------------------------------------------- -# # New replicas and reconnecting replicas that are not able to continue the replication # process just receiving differences, need to do what is called a "full # synchronization". An RDB file is transmitted from the master to the replicas. From 32efd9adf8d9ea2005495433f77fdbd4fac09c3a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 24 Jul 2019 11:35:01 +0200 Subject: [PATCH 30/35] Client side caching: config option for table fill rate. --- src/config.c | 18 ++++++++++++++++-- src/server.c | 2 +- src/server.h | 2 +- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/config.c b/src/config.c index fde00ddf5..7ffc94b35 100644 --- a/src/config.c +++ b/src/config.c @@ -686,6 +686,17 @@ void loadServerConfigFromString(char *config) { } } else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) { server.slowlog_max_len = strtoll(argv[1],NULL,10); + } else if (!strcasecmp(argv[0],"tracking-table-max-fill") && + argc == 2) + { + server.tracking_table_max_fill = strtoll(argv[1],NULL,10); + if (server.tracking_table_max_fill > 100 || + server.tracking_table_max_fill < 0) + { + err = "The tracking table fill percentage must be an " + "integer between 0 and 100"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"client-output-buffer-limit") && argc == 5) { @@ -1133,6 +1144,8 @@ void configSetCommand(client *c) { "slowlog-max-len",ll,0,LONG_MAX) { /* Cast to unsigned. */ server.slowlog_max_len = (unsigned long)ll; + } config_set_numerical_field( + "tracking-table-max-fill",server.tracking_table_max_fill,0,100) { } config_set_numerical_field( "latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){ } config_set_numerical_field( @@ -1338,8 +1351,8 @@ void configGetCommand(client *c) { server.slowlog_log_slower_than); config_get_numerical_field("latency-monitor-threshold", server.latency_monitor_threshold); - config_get_numerical_field("slowlog-max-len", - server.slowlog_max_len); + config_get_numerical_field("slowlog-max-len", server.slowlog_max_len); + config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill); config_get_numerical_field("port",server.port); config_get_numerical_field("cluster-announce-port",server.cluster_announce_port); config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port); @@ -2167,6 +2180,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN); rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD); rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN); + rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL); rewriteConfigNotifykeyspaceeventsOption(state); rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE); diff --git a/src/server.c b/src/server.c index 0acf8545c..eb5cef386 100644 --- a/src/server.c +++ b/src/server.c @@ -2404,7 +2404,7 @@ void initServerConfig(void) { server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; /* Tracking. */ - server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_MAX_FILL; + server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL; /* Debugging */ server.assert_failed = ""; diff --git a/src/server.h b/src/server.h index e363f03f4..399a0bbb6 100644 --- a/src/server.h +++ b/src/server.h @@ -171,7 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ -#define CONFIG_DEFAULT_TRACKING_MAX_FILL 10 /* 10% tracking table max fill. */ +#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ From 0b780a54771155ba477007ecb027e534f32376ff Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 24 Jul 2019 11:38:11 +0200 Subject: [PATCH 31/35] Example redis.conf: stay under 80 cols. --- redis.conf | 64 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/redis.conf b/redis.conf index a3c98341f..28a9889ea 100644 --- a/redis.conf +++ b/redis.conf @@ -336,9 +336,11 @@ replica-read-only yes # Replication SYNC strategy: disk or socket. # -# New replicas and reconnecting replicas that are not able to continue the replication -# process just receiving differences, need to do what is called a "full -# synchronization". An RDB file is transmitted from the master to the replicas. +# New replicas and reconnecting replicas that are not able to continue the +# replication process just receiving differences, need to do what is called a +# "full synchronization". An RDB file is transmitted from the master to the +# replicas. +# # The transmission can happen in two different ways: # # 1) Disk-backed: The Redis master creates a new process that writes the RDB @@ -348,14 +350,14 @@ replica-read-only yes # RDB file to replica sockets, without touching the disk at all. # # With disk-backed replication, while the RDB file is generated, more replicas -# can be queued and served with the RDB file as soon as the current child producing -# the RDB file finishes its work. With diskless replication instead once -# the transfer starts, new replicas arriving will be queued and a new transfer -# will start when the current one terminates. +# can be queued and served with the RDB file as soon as the current child +# producing the RDB file finishes its work. With diskless replication instead +# once the transfer starts, new replicas arriving will be queued and a new +# transfer will start when the current one terminates. # # When diskless replication is used, the master waits a configurable amount of -# time (in seconds) before starting the transfer in the hope that multiple replicas -# will arrive and the transfer can be parallelized. +# time (in seconds) before starting the transfer in the hope that multiple +# replicas will arrive and the transfer can be parallelized. # # With slow disks and fast (large bandwidth) networks, diskless replication # works better. @@ -366,8 +368,8 @@ repl-diskless-sync no # to the replicas. # # This is important since once the transfer starts, it is not possible to serve -# new replicas arriving, that will be queued for the next RDB transfer, so the server -# waits a delay in order to let more replicas arrive. +# new replicas arriving, that will be queued for the next RDB transfer, so the +# server waits a delay in order to let more replicas arrive. # # The delay is specified in seconds, and by default is 5 seconds. To disable # it entirely just set it to 0 seconds and the transfer will start ASAP. @@ -389,9 +391,9 @@ repl-diskless-sync-delay 5 # sufficient memory, if you don't have it, you risk an OOM kill. repl-diskless-load disabled -# Replicas send PINGs to server in a predefined interval. It's possible to change -# this interval with the repl_ping_replica_period option. The default value is 10 -# seconds. +# Replicas send PINGs to server in a predefined interval. It's possible to +# change this interval with the repl_ping_replica_period option. The default +# value is 10 seconds. # # repl-ping-replica-period 10 @@ -423,10 +425,10 @@ repl-diskless-load disabled repl-disable-tcp-nodelay no # Set the replication backlog size. The backlog is a buffer that accumulates -# replica data when replicas are disconnected for some time, so that when a replica -# wants to reconnect again, often a full resync is not needed, but a partial -# resync is enough, just passing the portion of data the replica missed while -# disconnected. +# replica data when replicas are disconnected for some time, so that when a +# replica wants to reconnect again, often a full resync is not needed, but a +# partial resync is enough, just passing the portion of data the replica +# missed while disconnected. # # The bigger the replication backlog, the longer the time the replica can be # disconnected and later be able to perform a partial resynchronization. @@ -448,13 +450,13 @@ repl-disable-tcp-nodelay no # # repl-backlog-ttl 3600 -# The replica priority is an integer number published by Redis in the INFO output. -# It is used by Redis Sentinel in order to select a replica to promote into a -# master if the master is no longer working correctly. +# The replica priority is an integer number published by Redis in the INFO +# output. It is used by Redis Sentinel in order to select a replica to promote +# into a master if the master is no longer working correctly. # # A replica with a low priority number is considered better for promotion, so -# for instance if there are three replicas with priority 10, 100, 25 Sentinel will -# pick the one with priority 10, that is the lowest. +# for instance if there are three replicas with priority 10, 100, 25 Sentinel +# will pick the one with priority 10, that is the lowest. # # However a special priority of 0 marks the replica as not able to perform the # role of master, so a replica with priority of 0 will never be selected by @@ -743,17 +745,17 @@ replica-priority 100 # DEL commands to the replica as keys evict in the master side. # # This behavior ensures that masters and replicas stay consistent, and is usually -# what you want, however if your replica is writable, or you want the replica to have -# a different memory setting, and you are sure all the writes performed to the -# replica are idempotent, then you may change this default (but be sure to understand -# what you are doing). +# what you want, however if your replica is writable, or you want the replica +# to have a different memory setting, and you are sure all the writes performed +# to the replica are idempotent, then you may change this default (but be sure +# to understand what you are doing). # # Note that since the replica by default does not evict, it may end using more # memory than the one set via maxmemory (there are certain buffers that may -# be larger on the replica, or data structures may sometimes take more memory and so -# forth). So make sure you monitor your replicas and make sure they have enough -# memory to never hit a real out-of-memory condition before the master hits -# the configured maxmemory setting. +# be larger on the replica, or data structures may sometimes take more memory +# and so forth). So make sure you monitor your replicas and make sure they +# have enough memory to never hit a real out-of-memory condition before the +# master hits the configured maxmemory setting. # # replica-ignore-maxmemory yes From e53a26b5d8cf8529064f841a4587a54b1f77c4e4 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 24 Jul 2019 11:45:10 +0200 Subject: [PATCH 32/35] Client side caching: document tracking-table-max-fill in redis.conf. --- redis.conf | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/redis.conf b/redis.conf index 28a9889ea..c242609ec 100644 --- a/redis.conf +++ b/redis.conf @@ -516,6 +516,39 @@ replica-priority 100 # replica-announce-ip 5.5.5.5 # replica-announce-port 1234 +############################### KEYS TRACKING ################################# + +# Redis implements server assisted support for client side caching of values. +# This is implemented using an invalidation table that remembers, using +# 16 millions of slots, what clients may have certain subsets of keys. In turn +# this is used in order to send invalidation messages to clients. Please +# to understand more about the feature check this page: +# +# https://redis.io/topics/client-side-caching +# +# When tracking is enabled for a client, all the read only queries are assumed +# to be cached: this will force Redis to store information in the invalidation +# table. When keys are modified, such information is flushed away, and +# invalidation messages are sent to the clients. However if the workload is +# heavily dominated by reads, Redis could use more and more memory in order +# to track the keys fetched by many clients. +# +# For this reason it is possible to configure a maximum fill value for the +# invalidation table. By default it is set to 10%, and once this limit is +# reached, Redis will start to evict caching slots in the invalidation table +# even if keys are not modified, just to reclaim memory: this will in turn +# force the clients to invalidate the cached values. Basically the table +# maximum fill rate is a trade off between the memory you want to spend server +# side to track information about who cached what, and the ability of clients +# to retain cached objects in memory. +# +# If you set the value to 0, it means there are no limits, and all the 16 +# millions of caching slots can be used at the same time. In the "stats" +# INFO section, you can find information about the amount of caching slots +# used at every given moment. +# +# tracking-table-max-fill 10 + ################################## SECURITY ################################### # Warning: since Redis is pretty fast an outside user can try up to From 0714581e0cca93c0034d6ad1c1d72847c1e57ffe Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 25 Jul 2019 19:17:58 +0200 Subject: [PATCH 33/35] Mark diskless loads as experimental in redis.conf. --- redis.conf | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/redis.conf b/redis.conf index c242609ec..50ba823ac 100644 --- a/redis.conf +++ b/redis.conf @@ -375,15 +375,25 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 -# Replica can load the rdb it reads from the replication link directly from the -# socket, or store the rdb to a file and read that file after it was completely +# ----------------------------------------------------------------------------- +# WARNING: RDB diskless load is experimental. Since in this setup the replica +# does not immediately store an RDB on disk, it may cause data loss during +# failovers. RDB diskless load + Redis modules not handling I/O reads may also +# cause Redis to abort in case of I/O errors during the initial synchronization +# stage with the master. Use only if your do what you are doing. +# ----------------------------------------------------------------------------- +# +# Replica can load the RDB it reads from the replication link directly from the +# socket, or store the RDB to a file and read that file after it was completely # recived from the master. +# # In many cases the disk is slower than the network, and storing and loading -# the rdb file may increase replication time (and even increase the master's +# the RDB file may increase replication time (and even increase the master's # Copy on Write memory and salve buffers). -# However, parsing the rdb file directly from the socket may mean that we have -# to flush the contents of the current database before the full rdb was received. -# for this reason we have the following options: +# However, parsing the RDB file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was +# received. For this reason we have the following options: +# # "disabled" - Don't use diskless load (store the rdb file to the disk first) # "on-empty-db" - Use diskless load only when it is completely safe. # "swapdb" - Keep a copy of the current db contents in RAM while parsing From d659654f53c276a5a96e8559793ffdb9051957fd Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 29 Jul 2019 18:11:52 -0400 Subject: [PATCH 34/35] Fix HLL corruption bug --- src/hyperloglog.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/hyperloglog.c b/src/hyperloglog.c index e01ea6042..e0557f985 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -701,6 +701,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) { first += span; } if (span == 0) return -1; /* Invalid format. */ + if (span >= end) return -1; /* Invalid format. */ next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1; if (next >= end) next = NULL; From e4b3c8bbc3d01176dee434a6ae8e8e10bb74703a Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 30 Jul 2019 11:20:51 +0200 Subject: [PATCH 35/35] emptyDbGeneric(): call signalFlushDb() before deleting the keys. This was broken since a refactoring performed recently by myself. --- src/db.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 3d8b844e8..95eaf04e9 100644 --- a/src/db.c +++ b/src/db.c @@ -353,6 +353,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( return -1; } + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + int startdb, enddb; if (dbnum == -1) { startdb = 0; @@ -378,7 +383,6 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( } } if (dbnum == -1) flushSlaveKeysWithExpireList(); - signalFlushedDb(dbnum); return removed; }