diff --git a/README.md b/README.md index 6c9435b53..3442659e6 100644 --- a/README.md +++ b/README.md @@ -406,7 +406,7 @@ replicas, or to continue the replication after a disconnection. Other C files --- -* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c` and `t_zset.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. +* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. * `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand. * `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information. * `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel. diff --git a/redis.conf b/redis.conf index 74b6c018f..50ba823ac 100644 --- a/redis.conf +++ b/redis.conf @@ -336,13 +336,11 @@ replica-read-only yes # Replication SYNC strategy: disk or socket. # -# ------------------------------------------------------- -# WARNING: DISKLESS REPLICATION IS EXPERIMENTAL CURRENTLY -# ------------------------------------------------------- +# New replicas and reconnecting replicas that are not able to continue the +# replication process just receiving differences, need to do what is called a +# "full synchronization". An RDB file is transmitted from the master to the +# replicas. # -# New replicas and reconnecting replicas that are not able to continue the replication -# process just receiving differences, need to do what is called a "full -# synchronization". An RDB file is transmitted from the master to the replicas. # The transmission can happen in two different ways: # # 1) Disk-backed: The Redis master creates a new process that writes the RDB @@ -352,14 +350,14 @@ replica-read-only yes # RDB file to replica sockets, without touching the disk at all. # # With disk-backed replication, while the RDB file is generated, more replicas -# can be queued and served with the RDB file as soon as the current child producing -# the RDB file finishes its work. With diskless replication instead once -# the transfer starts, new replicas arriving will be queued and a new transfer -# will start when the current one terminates. +# can be queued and served with the RDB file as soon as the current child +# producing the RDB file finishes its work. With diskless replication instead +# once the transfer starts, new replicas arriving will be queued and a new +# transfer will start when the current one terminates. # # When diskless replication is used, the master waits a configurable amount of -# time (in seconds) before starting the transfer in the hope that multiple replicas -# will arrive and the transfer can be parallelized. +# time (in seconds) before starting the transfer in the hope that multiple +# replicas will arrive and the transfer can be parallelized. # # With slow disks and fast (large bandwidth) networks, diskless replication # works better. @@ -370,22 +368,32 @@ repl-diskless-sync no # to the replicas. # # This is important since once the transfer starts, it is not possible to serve -# new replicas arriving, that will be queued for the next RDB transfer, so the server -# waits a delay in order to let more replicas arrive. +# new replicas arriving, that will be queued for the next RDB transfer, so the +# server waits a delay in order to let more replicas arrive. # # The delay is specified in seconds, and by default is 5 seconds. To disable # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 -# Replica can load the rdb it reads from the replication link directly from the -# socket, or store the rdb to a file and read that file after it was completely +# ----------------------------------------------------------------------------- +# WARNING: RDB diskless load is experimental. Since in this setup the replica +# does not immediately store an RDB on disk, it may cause data loss during +# failovers. RDB diskless load + Redis modules not handling I/O reads may also +# cause Redis to abort in case of I/O errors during the initial synchronization +# stage with the master. Use only if your do what you are doing. +# ----------------------------------------------------------------------------- +# +# Replica can load the RDB it reads from the replication link directly from the +# socket, or store the RDB to a file and read that file after it was completely # recived from the master. +# # In many cases the disk is slower than the network, and storing and loading -# the rdb file may increase replication time (and even increase the master's +# the RDB file may increase replication time (and even increase the master's # Copy on Write memory and salve buffers). -# However, parsing the rdb file directly from the socket may mean that we have -# to flush the contents of the current database before the full rdb was received. -# for this reason we have the following options: +# However, parsing the RDB file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was +# received. For this reason we have the following options: +# # "disabled" - Don't use diskless load (store the rdb file to the disk first) # "on-empty-db" - Use diskless load only when it is completely safe. # "swapdb" - Keep a copy of the current db contents in RAM while parsing @@ -393,9 +401,9 @@ repl-diskless-sync-delay 5 # sufficient memory, if you don't have it, you risk an OOM kill. repl-diskless-load disabled -# Replicas send PINGs to server in a predefined interval. It's possible to change -# this interval with the repl_ping_replica_period option. The default value is 10 -# seconds. +# Replicas send PINGs to server in a predefined interval. It's possible to +# change this interval with the repl_ping_replica_period option. The default +# value is 10 seconds. # # repl-ping-replica-period 10 @@ -427,10 +435,10 @@ repl-diskless-load disabled repl-disable-tcp-nodelay no # Set the replication backlog size. The backlog is a buffer that accumulates -# replica data when replicas are disconnected for some time, so that when a replica -# wants to reconnect again, often a full resync is not needed, but a partial -# resync is enough, just passing the portion of data the replica missed while -# disconnected. +# replica data when replicas are disconnected for some time, so that when a +# replica wants to reconnect again, often a full resync is not needed, but a +# partial resync is enough, just passing the portion of data the replica +# missed while disconnected. # # The bigger the replication backlog, the longer the time the replica can be # disconnected and later be able to perform a partial resynchronization. @@ -452,13 +460,13 @@ repl-disable-tcp-nodelay no # # repl-backlog-ttl 3600 -# The replica priority is an integer number published by Redis in the INFO output. -# It is used by Redis Sentinel in order to select a replica to promote into a -# master if the master is no longer working correctly. +# The replica priority is an integer number published by Redis in the INFO +# output. It is used by Redis Sentinel in order to select a replica to promote +# into a master if the master is no longer working correctly. # # A replica with a low priority number is considered better for promotion, so -# for instance if there are three replicas with priority 10, 100, 25 Sentinel will -# pick the one with priority 10, that is the lowest. +# for instance if there are three replicas with priority 10, 100, 25 Sentinel +# will pick the one with priority 10, that is the lowest. # # However a special priority of 0 marks the replica as not able to perform the # role of master, so a replica with priority of 0 will never be selected by @@ -518,6 +526,39 @@ replica-priority 100 # replica-announce-ip 5.5.5.5 # replica-announce-port 1234 +############################### KEYS TRACKING ################################# + +# Redis implements server assisted support for client side caching of values. +# This is implemented using an invalidation table that remembers, using +# 16 millions of slots, what clients may have certain subsets of keys. In turn +# this is used in order to send invalidation messages to clients. Please +# to understand more about the feature check this page: +# +# https://redis.io/topics/client-side-caching +# +# When tracking is enabled for a client, all the read only queries are assumed +# to be cached: this will force Redis to store information in the invalidation +# table. When keys are modified, such information is flushed away, and +# invalidation messages are sent to the clients. However if the workload is +# heavily dominated by reads, Redis could use more and more memory in order +# to track the keys fetched by many clients. +# +# For this reason it is possible to configure a maximum fill value for the +# invalidation table. By default it is set to 10%, and once this limit is +# reached, Redis will start to evict caching slots in the invalidation table +# even if keys are not modified, just to reclaim memory: this will in turn +# force the clients to invalidate the cached values. Basically the table +# maximum fill rate is a trade off between the memory you want to spend server +# side to track information about who cached what, and the ability of clients +# to retain cached objects in memory. +# +# If you set the value to 0, it means there are no limits, and all the 16 +# millions of caching slots can be used at the same time. In the "stats" +# INFO section, you can find information about the amount of caching slots +# used at every given moment. +# +# tracking-table-max-fill 10 + ################################## SECURITY ################################### # Warning: since Redis is pretty fast an outside user can try up to @@ -747,17 +788,17 @@ replica-priority 100 # DEL commands to the replica as keys evict in the master side. # # This behavior ensures that masters and replicas stay consistent, and is usually -# what you want, however if your replica is writable, or you want the replica to have -# a different memory setting, and you are sure all the writes performed to the -# replica are idempotent, then you may change this default (but be sure to understand -# what you are doing). +# what you want, however if your replica is writable, or you want the replica +# to have a different memory setting, and you are sure all the writes performed +# to the replica are idempotent, then you may change this default (but be sure +# to understand what you are doing). # # Note that since the replica by default does not evict, it may end using more # memory than the one set via maxmemory (there are certain buffers that may -# be larger on the replica, or data structures may sometimes take more memory and so -# forth). So make sure you monitor your replicas and make sure they have enough -# memory to never hit a real out-of-memory condition before the master hits -# the configured maxmemory setting. +# be larger on the replica, or data structures may sometimes take more memory +# and so forth). So make sure you monitor your replicas and make sure they +# have enough memory to never hit a real out-of-memory condition before the +# master hits the configured maxmemory setting. # # replica-ignore-maxmemory yes diff --git a/runtest-moduleapi b/runtest-moduleapi index 84cdb9bb8..8e1c0cb23 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -13,4 +13,4 @@ then fi make -C tests/modules && \ -$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/testrdb "${@}" diff --git a/src/aof.c b/src/aof.c index 565ee8073..ae9c4bb68 100644 --- a/src/aof.c +++ b/src/aof.c @@ -303,9 +303,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) { nwritten = write(fd, buf, len); if (nwritten < 0) { - if (errno == EINTR) { - continue; - } + if (errno == EINTR) continue; return totwritten ? totwritten : -1; } diff --git a/src/cluster.c b/src/cluster.c index c85e3791d..e22222700 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4251,12 +4251,7 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { /* CLUSTER NODES */ - robj *o; - sds ci = clusterGenNodesDescription(0); - - o = createObject(OBJ_STRING,ci); - addReplyBulk(c,o); - decrRefCount(o); + addReplyBulkSds(c,clusterGenNodesDescription(0)); } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { /* CLUSTER MYID */ addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN); @@ -4832,7 +4827,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) { * DUMP is actually not used by Redis Cluster but it is the obvious * complement of RESTORE and can be useful for different applications. */ void dumpCommand(client *c) { - robj *o, *dumpobj; + robj *o; rio payload; /* Check if the key is here. */ @@ -4845,9 +4840,7 @@ void dumpCommand(client *c) { createDumpPayload(&payload,o,c->argv[1]); /* Transfer to the client */ - dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); - addReplyBulk(c,dumpobj); - decrRefCount(dumpobj); + addReplyBulkSds(c,payload.io.buffer.ptr); return; } diff --git a/src/config.c b/src/config.c index fde00ddf5..a72df2e78 100644 --- a/src/config.c +++ b/src/config.c @@ -686,6 +686,17 @@ void loadServerConfigFromString(char *config) { } } else if (!strcasecmp(argv[0],"slowlog-max-len") && argc == 2) { server.slowlog_max_len = strtoll(argv[1],NULL,10); + } else if (!strcasecmp(argv[0],"tracking-table-max-fill") && + argc == 2) + { + server.tracking_table_max_fill = strtoll(argv[1],NULL,10); + if (server.tracking_table_max_fill > 100 || + server.tracking_table_max_fill < 0) + { + err = "The tracking table fill percentage must be an " + "integer between 0 and 100"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"client-output-buffer-limit") && argc == 5) { @@ -1133,6 +1144,8 @@ void configSetCommand(client *c) { "slowlog-max-len",ll,0,LONG_MAX) { /* Cast to unsigned. */ server.slowlog_max_len = (unsigned long)ll; + } config_set_numerical_field( + "tracking-table-max-fill",server.tracking_table_max_fill,0,100) { } config_set_numerical_field( "latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){ } config_set_numerical_field( @@ -1338,8 +1351,8 @@ void configGetCommand(client *c) { server.slowlog_log_slower_than); config_get_numerical_field("latency-monitor-threshold", server.latency_monitor_threshold); - config_get_numerical_field("slowlog-max-len", - server.slowlog_max_len); + config_get_numerical_field("slowlog-max-len", server.slowlog_max_len); + config_get_numerical_field("tracking-table-max-fill", server.tracking_table_max_fill); config_get_numerical_field("port",server.port); config_get_numerical_field("cluster-announce-port",server.cluster_announce_port); config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port); @@ -1470,12 +1483,10 @@ void configGetCommand(client *c) { matches++; } if (stringmatch(pattern,"notify-keyspace-events",1)) { - robj *flagsobj = createObject(OBJ_STRING, - keyspaceEventsFlagsToString(server.notify_keyspace_events)); + sds flags = keyspaceEventsFlagsToString(server.notify_keyspace_events); addReplyBulkCString(c,"notify-keyspace-events"); - addReplyBulk(c,flagsobj); - decrRefCount(flagsobj); + addReplyBulkSds(c,flags); matches++; } if (stringmatch(pattern,"bind",1)) { @@ -2167,6 +2178,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"slowlog-log-slower-than",server.slowlog_log_slower_than,CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN); rewriteConfigNumericalOption(state,"latency-monitor-threshold",server.latency_monitor_threshold,CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD); rewriteConfigNumericalOption(state,"slowlog-max-len",server.slowlog_max_len,CONFIG_DEFAULT_SLOWLOG_MAX_LEN); + rewriteConfigNumericalOption(state,"tracking-table-max-fill",server.tracking_table_max_fill,CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL); rewriteConfigNotifykeyspaceeventsOption(state); rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE); diff --git a/src/db.c b/src/db.c index 51f5a12b4..95eaf04e9 100644 --- a/src/db.c +++ b/src/db.c @@ -353,6 +353,11 @@ long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)( return -1; } + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum); + int startdb, enddb; if (dbnum == -1) { startdb = 0; @@ -412,11 +417,12 @@ long long dbTotalServerKeyCount() { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); - if (server.tracking_clients) trackingInvalidateKey(key); + trackingInvalidateKey(key); } void signalFlushedDb(int dbid) { touchWatchedKeysOnFlush(dbid); + trackingInvalidateKeysOnFlush(dbid); } /*----------------------------------------------------------------------------- @@ -452,7 +458,6 @@ void flushdbCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(c->db->id); server.dirty += emptyDb(c->db->id,flags,NULL); addReply(c,shared.ok); } @@ -464,7 +469,6 @@ void flushallCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - signalFlushedDb(-1); server.dirty += emptyDb(-1,flags,NULL); addReply(c,shared.ok); if (server.rdb_child_pid != -1) killRDBChild(); diff --git a/src/expire.c b/src/expire.c index b23117a3c..598b27f96 100644 --- a/src/expire.c +++ b/src/expire.c @@ -64,7 +64,7 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); - if (server.tracking_clients) trackingInvalidateKey(keyobj); + trackingInvalidateKey(keyobj); decrRefCount(keyobj); server.stat_expiredkeys++; return 1; diff --git a/src/hyperloglog.c b/src/hyperloglog.c index e01ea6042..e0557f985 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -701,6 +701,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) { first += span; } if (span == 0) return -1; /* Invalid format. */ + if (span >= end) return -1; /* Invalid format. */ next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1; if (next >= end) next = NULL; diff --git a/src/module.c b/src/module.c index 36384c018..e5a1ea1ed 100644 --- a/src/module.c +++ b/src/module.c @@ -29,6 +29,7 @@ #include "server.h" #include "cluster.h" +#include "rdb.h" #include #define REDISMODULE_CORE 1 @@ -3092,6 +3093,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + struct { + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; + } v2; } *tms = (struct typemethods*) typemethods_ptr; moduleType *mt = zcalloc(sizeof(*mt)); @@ -3103,6 +3109,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->mem_usage = tms->mem_usage; mt->digest = tms->digest; mt->free = tms->free; + if (tms->version >= 2) { + mt->aux_load = tms->v2.aux_load; + mt->aux_save = tms->v2.aux_save; + mt->aux_save_triggers = tms->v2.aux_save_triggers; + } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); return mt; @@ -3405,6 +3416,36 @@ loaderr: return 0; } +/* Iterate over modules, and trigger rdb aux saving for the ones modules types + * who asked for it. */ +ssize_t rdbSaveModulesAux(rio *rdb, int when) { + size_t total_written = 0; + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + listIter li; + listNode *ln; + + listRewind(module->types,&li); + while((ln = listNext(&li))) { + moduleType *mt = ln->value; + if (!mt->aux_save || !(mt->aux_save_triggers & when)) + continue; + ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt); + if (ret==-1) { + dictReleaseIterator(di); + return -1; + } + total_written += ret; + } + } + + dictReleaseIterator(di); + return total_written; +} + /* -------------------------------------------------------------------------- * Key digest API (DEBUG DIGEST interface for modules types) * -------------------------------------------------------------------------- */ @@ -3565,7 +3606,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li if (level < server.verbosity) return; - name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); + name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module"); vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); serverLogRaw(level,msg); } @@ -3583,13 +3624,15 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li * There is a fixed limit to the length of the log line this function is able * to emit, this limit is not specified but is guaranteed to be more than * a few lines of text. + * + * The ctx argument may be NULL if cannot be provided in the context of the + * caller for instance threads or callbacks, in which case a generic "module" + * will be used instead of the module name. */ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { - if (!ctx->module) return; /* Can only log if module is initialized */ - va_list ap; va_start(ap, fmt); - RM_LogRaw(ctx->module,levelstr,fmt,ap); + RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap); va_end(ap); } diff --git a/src/rdb.c b/src/rdb.c index c566378fb..4e00fad67 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -42,31 +42,35 @@ #include #include -#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +/* This macro is called when the internal RDB stracture is corrupt */ +#define rdbExitReportCorruptRDB(...) rdbReportError(1, __LINE__,__VA_ARGS__) +/* This macro is called when RDB read failed (possibly a short read) */ +#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__) char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); -void rdbCheckThenExit(int linenum, char *reason, ...) { +void rdbReportError(int corruption_error, int linenum, char *reason, ...) { va_list ap; char msg[1024]; int len; len = snprintf(msg,sizeof(msg), - "Internal error in RDB reading function at rdb.c:%d -> ", linenum); + "Internal error in RDB reading offset %llu, function at rdb.c:%d -> ", + (unsigned long long)server.loading_loaded_bytes, linenum); va_start(ap,reason); vsnprintf(msg+len,sizeof(msg)-len,reason,ap); va_end(ap); if (!rdbCheckMode) { - serverLog(LL_WARNING, "%s", msg); - if (rdbFileBeingLoaded) { + if (rdbFileBeingLoaded || corruption_error) { + serverLog(LL_WARNING, "%s", msg); char *argv[2] = {"",rdbFileBeingLoaded}; redis_check_rdb_main(2,argv,NULL); } else { - serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + serverLog(LL_WARNING, "%s. Failure loading rdb format from socket, assuming connection error, resuming operation.", msg); return; } } else { @@ -82,18 +86,6 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) { return len; } -/* This is just a wrapper for the low level function rioRead() that will - * automatically abort if it is not possible to read the specified amount - * of bytes. */ -void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { - if (rioRead(rdb,buf,len) == 0) { - rdbExitReportCorruptRDB( - "Impossible to read %llu bytes in rdbLoadRaw()", - (unsigned long long) len); - return; /* Not reached. */ - } -} - int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } @@ -109,10 +101,12 @@ int rdbLoadType(rio *rdb) { /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS - * opcode. */ + * opcode. On error -1 is returned, however this could be a valid time, so + * to check for loading errors the caller should call rioGetReadError() after + * calling this function. */ time_t rdbLoadTime(rio *rdb) { int32_t t32; - rdbLoadRaw(rdb,&t32,4); + if (rioRead(rdb,&t32,4) == 0) return -1; return (time_t)t32; } @@ -132,10 +126,14 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { * after upgrading to Redis version 5 they will no longer be able to load their * own old RDB files. Because of that, we instead fix the function only for new * RDB versions, and load older RDB versions as we used to do in the past, - * allowing big endian systems to load their own old RDB files. */ + * allowing big endian systems to load their own old RDB files. + * + * On I/O error the function returns LLONG_MAX, however if this is also a + * valid stored value, the caller should use rioGetReadError() to check for + * errors after calling this function. */ long long rdbLoadMillisecondTime(rio *rdb, int rdbver) { int64_t t64; - rdbLoadRaw(rdb,&t64,8); + if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX; if (rdbver >= 9) /* Check the top comment of this function. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ return (long long)t64; @@ -284,8 +282,8 @@ void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) { v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); val = (int32_t)v; } else { - val = 0; /* anti-warning */ rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype); + return NULL; /* Never reached. */ } if (plain || sds) { char buf[LONG_STR_SIZE], *p; @@ -388,8 +386,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) { /* Load the compressed representation and uncompress it to target. */ if (rioRead(rdb,c,clen) == 0) goto err; if (lzf_decompress(c,clen,val,len) == 0) { - if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string"); - goto err; + rdbExitReportCorruptRDB("Invalid LZF compressed string"); } zfree(c); @@ -503,6 +500,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { return rdbLoadLzfStringObject(rdb,flags,lenptr); default: rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len); + return NULL; /* Never reached. */ } } @@ -973,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,rdb,key); /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ @@ -982,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { io.bytes += retval; /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,key); mt->rdb_save(&io,mv->value); retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); - if (retval == -1) return -1; - io.bytes += retval; + if (retval == -1) + io.error = 1; + else + io.bytes += retval; if (io.ctx) { moduleFreeContext(io.ctx); @@ -1103,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { + /* Save a module-specific aux value. */ + RedisModuleIO io; + int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX); + + /* Write the "module" identifier as prefix, so that we'll be able + * to call the right module during loading. */ + retval = rdbSaveLen(rdb,mt->id); + if (retval == -1) return -1; + io.bytes += retval; + + /* write the 'when' so that we can provide it on loading */ + retval = rdbSaveLen(rdb,when); + if (retval == -1) return -1; + io.bytes += retval; + + /* Then write the module-specific representation + EOF marker. */ + moduleInitIOContext(io,mt,rdb,NULL); + mt->aux_save(&io,when); + retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); + if (retval == -1) + io.error = 1; + else + io.bytes += retval; + + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + if (io.error) + return -1; + return io.bytes; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1124,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -1185,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { di = NULL; /* So that we don't release it again on error. */ } + if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; + /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; @@ -1644,6 +1681,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { hashTypeConvert(o, OBJ_ENCODING_HT); break; default: + /* totally unreachable */ rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } @@ -1651,6 +1689,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); + if (listpacks == RDB_LENERR) { + rdbReportReadError("Stream listpacks len loading failed."); + decrRefCount(o); + return NULL; + } while(listpacks--) { /* Get the master ID, the one we'll use as key of the radix tree @@ -1658,7 +1701,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (nodekey == NULL) { - rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + rdbReportReadError("Stream master ID loading failed: invalid encoding or I/O error."); + decrRefCount(o); + return NULL; } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " @@ -1668,7 +1713,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Load the listpack. */ unsigned char *lp = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (lp == NULL) return NULL; + if (lp == NULL) { + rdbReportReadError("Stream listpacks loading failed."); + sdsfree(nodekey); + decrRefCount(o); + return NULL; + } unsigned char *first = lpFirst(lp); if (first == NULL) { /* Serialized listpacks should never be empty, since on @@ -1686,12 +1736,24 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } /* Load total number of items inside the stream. */ s->length = rdbLoadLen(rdb,NULL); + /* Load the last entry ID. */ s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream object metadata loading failed."); + decrRefCount(o); + return NULL; + } + /* Consumer groups loading */ - size_t cgroups_count = rdbLoadLen(rdb,NULL); + uint64_t cgroups_count = rdbLoadLen(rdb,NULL); + if (cgroups_count == RDB_LENERR) { + rdbReportReadError("Stream cgroup count loading failed."); + decrRefCount(o); + return NULL; + } while(cgroups_count--) { /* Get the consumer group name and ID. We can then create the * consumer group ASAP and populate its structure as @@ -1699,11 +1761,21 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { streamID cg_id; sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cgname == NULL) { - rdbExitReportCorruptRDB( + rdbReportReadError( "Error reading the consumer group name from Stream"); + decrRefCount(o); + return NULL; } + cg_id.ms = rdbLoadLen(rdb,NULL); cg_id.seq = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream cgroup ID loading failed."); + sdsfree(cgname); + decrRefCount(o); + return NULL; + } + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); if (cgroup == NULL) rdbExitReportCorruptRDB("Duplicated consumer group name %s", @@ -1715,13 +1787,28 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { * owner, since consumers for this group and their messages will * be read as a next step. So for now leave them not resolved * and later populate it. */ - size_t pel_size = rdbLoadLen(rdb,NULL); + uint64_t pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError("Stream PEL size loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError("Stream PEL ID loading failed."); + decrRefCount(o); + return NULL; + } streamNACK *nack = streamCreateNACK(NULL); nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); nack->delivery_count = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream PEL NACK loading failed."); + decrRefCount(o); + streamFreeNACK(nack); + return NULL; + } if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) rdbExitReportCorruptRDB("Duplicated gobal PEL entry " "loading stream consumer group"); @@ -1729,24 +1816,47 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Now that we loaded our global PEL, we need to load the * consumers and their local PELs. */ - size_t consumers_num = rdbLoadLen(rdb,NULL); + uint64_t consumers_num = rdbLoadLen(rdb,NULL); + if (consumers_num == RDB_LENERR) { + rdbReportReadError("Stream consumers num loading failed."); + decrRefCount(o); + return NULL; + } while(consumers_num--) { sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); if (cname == NULL) { - rdbExitReportCorruptRDB( - "Error reading the consumer name from Stream group"); + rdbReportReadError( + "Error reading the consumer name from Stream group."); + decrRefCount(o); + return NULL; } streamConsumer *consumer = streamLookupConsumer(cgroup,cname, 1); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream short read reading seen time."); + decrRefCount(o); + return NULL; + } /* Load the PEL about entries owned by this specific * consumer. */ pel_size = rdbLoadLen(rdb,NULL); + if (pel_size == RDB_LENERR) { + rdbReportReadError( + "Stream consumer PEL num loading failed."); + decrRefCount(o); + return NULL; + } while(pel_size--) { unsigned char rawid[sizeof(streamID)]; - rdbLoadRaw(rdb,rawid,sizeof(rawid)); + if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { + rdbReportReadError( + "Stream short read reading PEL streamID."); + decrRefCount(o); + return NULL; + } streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); if (nack == raxNotFound) rdbExitReportCorruptRDB("Consumer entry not found in " @@ -1765,6 +1875,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) return NULL; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; @@ -1792,6 +1903,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Module v2 serialization has an EOF mark at the end. */ if (io.ver == 2) { uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof == RDB_LENERR) { + o = createModuleObject(mt,ptr); /* creating just in order to easily destroy */ + decrRefCount(o); + return NULL; + } if (eof != RDB_MODULE_OPCODE_EOF) { serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name); exit(1); @@ -1805,7 +1921,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } o = createModuleObject(mt,ptr); } else { - rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); + rdbReportReadError("Unknown RDB encoding type %d",rdbtype); + return NULL; } return o; } @@ -1904,11 +2021,13 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * load the actual type, and continue. */ expiretime = rdbLoadTime(rdb); expiretime *= 1000; + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ expiretime = rdbLoadMillisecondTime(rdb,rdbver); + if (rioGetReadError(rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ @@ -2009,15 +2128,12 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(auxval); continue; /* Read type again. */ } else if (type == RDB_OPCODE_MODULE_AUX) { - /* This is just for compatibility with the future: we have plans - * to add the ability for modules to store anything in the RDB - * file, like data that is not related to the Redis key space. - * Such data will potentially be stored both before and after the - * RDB keys-values section. For this reason since RDB version 9, - * we have the ability to read a MODULE_AUX opcode followed by an - * identifier of the module, and a serialized value in "MODULE V2" - * format. */ + /* Load module data that is not related to the Redis key space. + * Such data can be potentially be stored both before and after the + * RDB keys-values section. */ uint64_t moduleid = rdbLoadLen(rdb,NULL); + int when = rdbLoadLen(rdb,NULL); + if (rioGetReadError(rdb)) goto eoferr; moduleType *mt = moduleTypeLookupModuleByID(moduleid); char name[10]; moduleTypeNameByID(name,moduleid); @@ -2027,14 +2143,37 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name); exit(1); } else if (!rdbCheckMode && mt != NULL) { - /* This version of Redis actually does not know what to do - * with modules AUX data... */ - serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name); - exit(1); + if (!mt->aux_load) { + /* Module doesn't support AUX. */ + serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name); + exit(1); + } + + RedisModuleIO io; + moduleInitIOContext(io,mt,rdb,NULL); + io.ver = 2; + /* Call the rdb_load method of the module providing the 10 bit + * encoding version in the lower 10 bits of the module ID. */ + if (mt->aux_load(&io,moduleid&1023, when) || io.error) { + moduleTypeNameByID(name,moduleid); + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name); + exit(1); + } + if (io.ctx) { + moduleFreeContext(io.ctx); + zfree(io.ctx); + } + uint64_t eof = rdbLoadLen(rdb,NULL); + if (eof != RDB_MODULE_OPCODE_EOF) { + serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name); + exit(1); + } + continue; } else { /* RDB check mode. */ robj *aux = rdbLoadCheckModuleValue(rdb,name); decrRefCount(aux); + continue; /* Read next opcode. */ } } @@ -2088,10 +2227,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } return C_OK; -eoferr: /* unexpected end of file is handled here with a fatal exit */ - serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); - rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); - return C_ERR; /* Just to avoid warning */ + /* Unexpected end of file is handled here calling rdbReportReadError(): + * this will in turn either abort Redis in most cases, or if we are loading + * the RDB file from a socket during initial SYNC (diskless replica mode), + * we'll report the error to the caller, so that we can retry. */ +eoferr: + serverLog(LL_WARNING, + "Short read or OOM loading DB. Unrecoverable error, aborting now."); + rdbReportReadError("Unexpected EOF reading RDB file"); + return C_ERR; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The diff --git a/src/rdb.h b/src/rdb.h index 0acddf9ab..40a50f7ba 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); +ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadStringObject(rio *rdb); ssize_t rdbSaveStringObject(rio *rdb, robj *obj); ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len); diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 1d16fa4ee..2df41580b 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -104,6 +104,7 @@ static struct config { int is_fetching_slots; int is_updating_slots; int slots_last_update; + int enable_tracking; /* Thread mutexes to be used as fallbacks by atomicvar.h */ pthread_mutex_t requests_issued_mutex; pthread_mutex_t requests_finished_mutex; @@ -255,7 +256,7 @@ static redisConfig *getRedisConfig(const char *ip, int port, goto fail; } - if(config.auth){ + if(config.auth) { void *authReply = NULL; redisAppendCommand(c, "AUTH %s", config.auth); if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; @@ -633,6 +634,14 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) { c->prefix_pending++; } + if (config.enable_tracking) { + char *buf = NULL; + int len = redisFormatCommand(&buf, "CLIENT TRACKING on"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + /* If a DB number different than zero is selected, prefix our request * buffer with the SELECT command, that will be discarded the first * time the replies are received, so if the client is reused the @@ -1350,6 +1359,8 @@ int parseOptions(int argc, const char **argv) { } else if (config.num_threads < 0) config.num_threads = 0; } else if (!strcmp(argv[i],"--cluster")) { config.cluster_mode = 1; + } else if (!strcmp(argv[i],"--enable-tracking")) { + config.enable_tracking = 1; } else if (!strcmp(argv[i],"--help")) { exit_status = 0; goto usage; @@ -1380,6 +1391,7 @@ usage: " --dbnum SELECT the specified db number (default 0)\n" " --threads Enable multi-thread mode.\n" " --cluster Enable cluster mode.\n" +" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n" " -k 1=keep alive 0=reconnect (default 1)\n" " -r Use random keys for SET/GET/INCR, random values for SADD\n" " Using this option the benchmark will expand the string __rand_int__\n" @@ -1504,6 +1516,7 @@ int main(int argc, const char **argv) { config.is_fetching_slots = 0; config.is_updating_slots = 0; config.slots_last_update = 0; + config.enable_tracking = 0; i = parseOptions(argc,argv); argc -= i; diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index e2d71b5a5..5e7415046 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -216,14 +216,16 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; + expiretime = rdbLoadTime(&rdb); expiretime *= 1000; + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; - if ((expiretime = rdbLoadMillisecondTime(&rdb, rdbver)) == -1) goto eoferr; + expiretime = rdbLoadMillisecondTime(&rdb, rdbver); + if (rioGetReadError(&rdb)) goto eoferr; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_FREQ) { /* FREQ: LFU frequency. */ diff --git a/src/redismodule.h b/src/redismodule.h index f0f27c067..e08aa16d4 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -129,6 +129,10 @@ #define REDISMODULE_NOT_USED(V) ((void) V) +/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + /* This type represents a timer handle, and is returned when a timer is * registered and used in order to invalidate a timer. It's just a 64 bit * number, because this is how each timer is represented inside the radix tree @@ -169,6 +173,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); +typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); +typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); @@ -177,7 +183,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); -#define REDISMODULE_TYPE_METHOD_VERSION 1 +#define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { uint64_t version; RedisModuleTypeLoadFunc rdb_load; @@ -186,6 +192,9 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeMemUsageFunc mem_usage; RedisModuleTypeDigestFunc digest; RedisModuleTypeFreeFunc free; + RedisModuleTypeAuxLoadFunc aux_load; + RedisModuleTypeAuxSaveFunc aux_save; + int aux_save_triggers; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ diff --git a/src/rio.c b/src/rio.c index 5359bc3d6..bdbc5d0e9 100644 --- a/src/rio.c +++ b/src/rio.c @@ -92,6 +92,7 @@ static const rio rioBufferIO = { rioBufferFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -145,6 +146,7 @@ static const rio rioFileIO = { rioFileFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -239,6 +241,7 @@ static const rio rioFdIO = { rioFdFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ @@ -374,6 +377,7 @@ static const rio rioFdsetIO = { rioFdsetFlush, NULL, /* update_checksum */ 0, /* current checksum */ + 0, /* flags */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ diff --git a/src/rio.h b/src/rio.h index beea06888..6199bb039 100644 --- a/src/rio.h +++ b/src/rio.h @@ -1,6 +1,6 @@ /* * Copyright (c) 2009-2012, Pieter Noordhuis - * Copyright (c) 2009-2012, Salvatore Sanfilippo + * Copyright (c) 2009-2019, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,6 +36,9 @@ #include #include "sds.h" +#define RIO_FLAG_READ_ERROR (1<<0) +#define RIO_FLAG_WRITE_ERROR (1<<1) + struct _rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return @@ -51,8 +54,8 @@ struct _rio { * computation. */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); - /* The current checksum */ - uint64_t cksum; + /* The current checksum and flags (see RIO_FLAG_*) */ + uint64_t cksum, flags; /* number of bytes read or written */ size_t processed_bytes; @@ -99,11 +102,14 @@ typedef struct _rio rio; * if needed. */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { + if (r->flags & RIO_FLAG_WRITE_ERROR) return 0; while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); - if (r->write(r,buf,bytes_to_write) == 0) + if (r->write(r,buf,bytes_to_write) == 0) { + r->flags |= RIO_FLAG_WRITE_ERROR; return 0; + } buf = (char*)buf + bytes_to_write; len -= bytes_to_write; r->processed_bytes += bytes_to_write; @@ -112,10 +118,13 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { } static inline size_t rioRead(rio *r, void *buf, size_t len) { + if (r->flags & RIO_FLAG_READ_ERROR) return 0; while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; - if (r->read(r,buf,bytes_to_read) == 0) + if (r->read(r,buf,bytes_to_read) == 0) { + r->flags |= RIO_FLAG_READ_ERROR; return 0; + } if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); buf = (char*)buf + bytes_to_read; len -= bytes_to_read; @@ -132,6 +141,22 @@ static inline int rioFlush(rio *r) { return r->flush(r); } +/* This function allows to know if there was a read error in any past + * operation, since the rio stream was created or since the last call + * to rioClearError(). */ +static inline int rioGetReadError(rio *r) { + return (r->flags & RIO_FLAG_READ_ERROR) != 0; +} + +/* Like rioGetReadError() but for write errors. */ +static inline int rioGetWriteError(rio *r) { + return (r->flags & RIO_FLAG_READ_ERROR) != 0; +} + +static inline void rioClearErrors(rio *r) { + r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR); +} + void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFd(rio *r, int fd, size_t read_limit); diff --git a/src/server.c b/src/server.c index 4337b8f01..eb5cef386 100644 --- a/src/server.c +++ b/src/server.c @@ -2403,6 +2403,9 @@ void initServerConfig(void) { /* Latency monitor */ server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; + /* Tracking. */ + server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL; + /* Debugging */ server.assert_failed = ""; server.assert_file = ""; @@ -3401,6 +3404,10 @@ int processCommand(client *c) { } } + /* Make sure to use a reasonable amount of memory for client side + * caching metadata. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); @@ -4140,7 +4147,8 @@ sds genRedisInfoString(char *section) { "active_defrag_hits:%lld\r\n" "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" - "active_defrag_key_misses:%lld\r\n", + "active_defrag_key_misses:%lld\r\n" + "tracking_used_slots:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4166,7 +4174,8 @@ sds genRedisInfoString(char *section) { server.stat_active_defrag_hits, server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, - server.stat_active_defrag_key_misses); + server.stat_active_defrag_key_misses, + trackingGetUsedSlots()); } /* Replication */ diff --git a/src/server.h b/src/server.h index 5991cfa6c..a7f38a7a9 100644 --- a/src/server.h +++ b/src/server.h @@ -171,6 +171,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ +#define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -536,6 +537,10 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK) #define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS) +/* Bit flags for moduleTypeAuxSaveFunc */ +#define REDISMODULE_AUX_BEFORE_RDB (1<<0) +#define REDISMODULE_AUX_AFTER_RDB (1<<1) + struct RedisModule; struct RedisModuleIO; struct RedisModuleDigest; @@ -548,6 +553,8 @@ struct redisObject; * is deleted. */ typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver); typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value); +typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when); +typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when); typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value); typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value); typedef size_t (*moduleTypeMemUsageFunc)(const void *value); @@ -564,6 +571,9 @@ typedef struct RedisModuleType { moduleTypeMemUsageFunc mem_usage; moduleTypeDigestFunc digest; moduleTypeFreeFunc free; + moduleTypeAuxLoadFunc aux_load; + moduleTypeAuxSaveFunc aux_save; + int aux_save_triggers; char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ } moduleType; @@ -1316,6 +1326,7 @@ struct redisServer { list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Client side caching. */ unsigned int tracking_clients; /* # of clients with tracking enabled.*/ + int tracking_table_max_fill; /* Max fill percentage. */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -1528,6 +1539,7 @@ void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); /* Utils */ @@ -1639,6 +1651,9 @@ void enableTracking(client *c, uint64_t redirect_to); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); +void trackingInvalidateKeysOnFlush(int dbid); +void trackingLimitUsedSlots(void); +unsigned long long trackingGetUsedSlots(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/stream.h b/src/stream.h index ef08753b5..8ae90ce77 100644 --- a/src/stream.h +++ b/src/stream.h @@ -109,5 +109,6 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); +void streamFreeNACK(streamNACK *na); #endif diff --git a/src/t_zset.c b/src/t_zset.c index fb7078abd..2680e76a9 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1357,9 +1357,8 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); - if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); - if (sdslen(ele) > server.zset_max_ziplist_value) + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || + sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (newscore) *newscore = score; *flags |= ZADD_ADDED; diff --git a/src/tracking.c b/src/tracking.c index bbfc66a72..f7f0fc755 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -60,6 +60,7 @@ * use the most significant bits instead of the full 24 bits. */ #define TRACKING_TABLE_SIZE (1<<24) rax **TrackingTable = NULL; +unsigned long TrackingTableUsedSlots = 0; robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much @@ -109,67 +110,187 @@ void trackingRememberKeys(client *c) { sds sdskey = c->argv[idx]->ptr; uint64_t hash = crc64(0, (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable[hash] == NULL) + if (TrackingTable[hash] == NULL) { TrackingTable[hash] = raxNew(); + TrackingTableUsedSlots++; + } raxTryInsert(TrackingTable[hash], (unsigned char*)&c->id,sizeof(c->id),NULL,NULL); } getKeysFreeResult(keys); } -/* This function is called from signalModifiedKey() or other places in Redis - * when a key changes value. In the context of keys tracking, our task here is - * to send a notification to every client that may have keys about such . */ -void trackingInvalidateKey(robj *keyobj) { - sds sdskey = keyobj->ptr; - uint64_t hash = crc64(0, - (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); - if (TrackingTable == NULL || TrackingTable[hash] == NULL) return; +void sendTrackingMessage(client *c, long long hash) { + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + return; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } +} + +/* Invalidates a caching slot: this is actually the low level implementation + * of the API that Redis calls externally, that is trackingInvalidateKey(). */ +void trackingInvalidateSlot(uint64_t slot) { + if (TrackingTable == NULL || TrackingTable[slot] == NULL) return; raxIterator ri; - raxStart(&ri,TrackingTable[hash]); + raxStart(&ri,TrackingTable[slot]); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); - if (c == NULL) continue; - int using_redirection = 0; - if (c->client_tracking_redirection) { - client *redir = lookupClientByID(c->client_tracking_redirection); - if (!redir) { - /* We need to signal to the original connection that we - * are unable to send invalidation messages to the redirected - * connection, because the client no longer exist. */ - if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); - } - continue; - } - c = redir; - using_redirection = 1; - } - - /* Only send such info for clients in RESP version 3 or more. However - * if redirection is active, and the connection we redirect to is - * in Pub/Sub mode, we can support the feature with RESP 2 as well, - * by sending Pub/Sub messages in the __redis__:invalidate channel. */ - if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); - } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); - } + if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; + sendTrackingMessage(c,slot); } raxStop(&ri); /* Free the tracking table: we'll create the radix tree and populate it - * again if more keys will be modified in this hash slot. */ - raxFree(TrackingTable[hash]); - TrackingTable[hash] = NULL; + * again if more keys will be modified in this caching slot. */ + raxFree(TrackingTable[slot]); + TrackingTable[slot] = NULL; + TrackingTableUsedSlots--; +} + +/* This function is called from signalModifiedKey() or other places in Redis + * when a key changes value. In the context of keys tracking, our task here is + * to send a notification to every client that may have keys about such caching + * slot. */ +void trackingInvalidateKey(robj *keyobj) { + if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return; + + sds sdskey = keyobj->ptr; + uint64_t hash = crc64(0, + (unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1); + trackingInvalidateSlot(hash); +} + +/* This function is called when one or all the Redis databases are flushed + * (dbid == -1 in case of FLUSHALL). Caching slots are not specific for + * each DB but are global: currently what we do is sending a special + * notification to clients with tracking enabled, invalidating the caching + * slot "-1", which means, "all the keys", in order to avoid flooding clients + * with many invalidation messages for all the keys they may hold. + * + * However trying to flush the tracking table here is very costly: + * we need scanning 16 million caching slots in the table to check + * if they are used, this introduces a big delay. So what we do is to really + * flush the table in the case of FLUSHALL. When a FLUSHDB is called instead + * we just send the invalidation message to all the clients, but don't + * flush the table: it will slowly get garbage collected as more keys + * are modified in the used caching slots. */ +void trackingInvalidateKeysOnFlush(int dbid) { + if (server.tracking_clients) { + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + sendTrackingMessage(c,-1); + } + } + } + + /* In case of FLUSHALL, reclaim all the memory used by tracking. */ + if (dbid == -1 && TrackingTable) { + for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) { + if (TrackingTable[j] != NULL) { + raxFree(TrackingTable[j]); + TrackingTable[j] = NULL; + TrackingTableUsedSlots--; + } + } + + /* If there are no clients with tracking enabled, we can even + * reclaim the memory used by the table itself. The code assumes + * the table is allocated only if there is at least one client alive + * with tracking enabled. */ + if (server.tracking_clients == 0) { + zfree(TrackingTable); + TrackingTable = NULL; + } + } +} + +/* Tracking forces Redis to remember information about which client may have + * keys about certian caching slots. In workloads where there are a lot of + * reads, but keys are hardly modified, the amount of information we have + * to remember server side could be a lot: for each 16 millions of caching + * slots we may end with a radix tree containing many entries. + * + * So Redis allows the user to configure a maximum fill rate for the + * invalidation table. This function makes sure that we don't go over the + * specified fill rate: if we are over, we can just evict informations about + * random caching slots, and send invalidation messages to clients like if + * the key was modified. */ +void trackingLimitUsedSlots(void) { + static unsigned int timeout_counter = 0; + + if (server.tracking_table_max_fill == 0) return; /* No limits set. */ + unsigned int max_slots = + (TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill; + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Limit not reached. */ + } + + /* We have to invalidate a few slots to reach the limit again. The effort + * we do here is proportional to the number of times we entered this + * function and found that we are still over the limit. */ + int effort = 100 * (timeout_counter+1); + + /* Let's start at a random position, and perform linear probing, in order + * to improve cache locality. However once we are able to find an used + * slot, jump again randomly, in order to avoid creating big holes in the + * table (that will make this funciton use more resourced later). */ + while(effort > 0) { + unsigned int idx = rand() % TRACKING_TABLE_SIZE; + do { + effort--; + idx = (idx+1) % TRACKING_TABLE_SIZE; + if (TrackingTable[idx] != NULL) { + trackingInvalidateSlot(idx); + if (TrackingTableUsedSlots <= max_slots) { + timeout_counter = 0; + return; /* Return ASAP: we are again under the limit. */ + } else { + break; /* Jump to next random position. */ + } + } + } while(effort > 0); + } + timeout_counter++; +} + +/* This is just used in order to access the amount of used slots in the + * tracking table. */ +unsigned long long trackingGetUsedSlots(void) { + return TrackingTableUsedSlots; } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index d69a1761a..5d32555b0 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -192,12 +192,6 @@ foreach mdl {no yes} { set master_host [srv 0 host] set master_port [srv 0 port] set slaves {} - set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] - set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] - set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { @@ -205,6 +199,14 @@ foreach mdl {no yes} { start_server {} { lappend slaves [srv 0 client] test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # start load handles only inside the test, so that the test can be skipped + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork + # Send SLAVEOF commands to slaves [lindex $slaves 0] config set repl-diskless-load $sdl [lindex $slaves 1] config set repl-diskless-load $sdl @@ -278,9 +280,9 @@ start_server {tags {"repl"}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] - set load_handle0 [start_write_load $master_host $master_port 3] start_server {} { test "Master stream is correctly processed while the replica has a script in -BUSY state" { + set load_handle0 [start_write_load $master_host $master_port 3] set slave [srv 0 client] $slave config set lua-time-limit 500 $slave slaveof $master_host $master_port @@ -383,3 +385,84 @@ test {slave fails full sync and diskless load swapdb recoveres it} { } } } + +test {diskless loading short read} { + start_server {tags {"repl"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Set master and replica to use diskless replication + $master config set repl-diskless-sync yes + $master config set rdbcompression no + $replica config set repl-diskless-load swapdb + # Try to fill the master with all types of data types / encodings + for {set k 0} {$k < 3} {incr k} { + for {set i 0} {$i < 10} {incr i} { + r set "$k int_$i" [expr {int(rand()*10000)}] + r expire "$k int_$i" [expr {int(rand()*10000)}] + r set "$k string_$i" [string repeat A [expr {int(rand()*1000000)}]] + r hset "$k hash_small" [string repeat A [expr {int(rand()*10)}]] 0[string repeat A [expr {int(rand()*10)}]] + r hset "$k hash_large" [string repeat A [expr {int(rand()*10000)}]] [string repeat A [expr {int(rand()*1000000)}]] + r sadd "$k set_small" [string repeat A [expr {int(rand()*10)}]] + r sadd "$k set_large" [string repeat A [expr {int(rand()*1000000)}]] + r zadd "$k zset_small" [expr {rand()}] [string repeat A [expr {int(rand()*10)}]] + r zadd "$k zset_large" [expr {rand()}] [string repeat A [expr {int(rand()*1000000)}]] + r lpush "$k list_small" [string repeat A [expr {int(rand()*10)}]] + r lpush "$k list_large" [string repeat A [expr {int(rand()*1000000)}]] + for {set j 0} {$j < 10} {incr j} { + r xadd "$k stream" * foo "asdf" bar "1234" + } + r xgroup create "$k stream" "mygroup_$i" 0 + r xreadgroup GROUP "mygroup_$i" Alice COUNT 1 STREAMS "$k stream" > + } + } + + # Start the replication process... + $master config set repl-diskless-sync-delay 0 + $replica replicaof $master_host $master_port + + # kill the replication at various points + set attempts 3 + if {$::accurate} { set attempts 10 } + for {set i 0} {$i < $attempts} {incr i} { + # wait for the replica to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1 + + # add some additional random sleep so that we kill the master on a different place each time + after [expr {int(rand()*100)}] + + # kill the replica connection on the master + set killed [$master client kill type replica] + + if {[catch { + set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10] + if {$::verbose} { + puts $res + } + }]} { + puts "failed triggering short read" + # force the replica to try another full sync + $master client kill type replica + $master set asdf asdf + # the side effect of resizing the backlog is that it is flushed (16k is the min size) + $master config set repl-backlog-size [expr {16384 + $i}] + } + # wait for loading to stop (fail) + wait_for_condition 100 10 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + } + # enable fast shutdown + $master config set rdb-key-save-delay 0 + } + } +} + diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 014d20afa..6e4574747 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -13,12 +13,16 @@ endif .SUFFIXES: .c .so .xo .o -all: commandfilter.so +all: commandfilter.so testrdb.so .c.xo: $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ commandfilter.xo: ../../src/redismodule.h +testrdb.xo: ../../src/redismodule.h commandfilter.so: commandfilter.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + +testrdb.so: testrdb.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c new file mode 100644 index 000000000..415497a2f --- /dev/null +++ b/tests/modules/testrdb.c @@ -0,0 +1,229 @@ +#include "redismodule.h" + +#include +#include + +/* Module configuration, save aux or not? */ +long long conf_aux_count = 0; + +/* Registered type */ +RedisModuleType *testrdb_type = NULL; + +/* Global values to store and persist to aux */ +RedisModuleString *before_str = NULL; +RedisModuleString *after_str = NULL; + +void *testrdb_type_load(RedisModuleIO *rdb, int encver) { + int count = RedisModule_LoadSigned(rdb); + assert(count==1); + assert(encver==1); + RedisModuleString *str = RedisModule_LoadString(rdb); + return str; +} + +void testrdb_type_save(RedisModuleIO *rdb, void *value) { + RedisModuleString *str = (RedisModuleString*)value; + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, str); +} + +void testrdb_aux_save(RedisModuleIO *rdb, int when) { + if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (conf_aux_count==0) assert(0); + if (when == REDISMODULE_AUX_BEFORE_RDB) { + if (before_str) { + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, before_str); + } else { + RedisModule_SaveSigned(rdb, 0); + } + } else { + if (after_str) { + RedisModule_SaveSigned(rdb, 1); + RedisModule_SaveString(rdb, after_str); + } else { + RedisModule_SaveSigned(rdb, 0); + } + } +} + +int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { + assert(encver == 1); + if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (conf_aux_count==0) assert(0); + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); + if (when == REDISMODULE_AUX_BEFORE_RDB) { + if (before_str) + RedisModule_FreeString(ctx, before_str); + before_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (count) + before_str = RedisModule_LoadString(rdb); + } else { + if (after_str) + RedisModule_FreeString(ctx, after_str); + after_str = NULL; + int count = RedisModule_LoadSigned(rdb); + if (count) + after_str = RedisModule_LoadString(rdb); + } + return REDISMODULE_OK; +} + +void testrdb_type_free(void *value) { + RedisModule_FreeString(NULL, (RedisModuleString*)value); +} + +int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + if (before_str) + RedisModule_FreeString(ctx, before_str); + before_str = argv[1]; + RedisModule_RetainString(ctx, argv[1]); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + if (argc != 1){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + if (before_str) + RedisModule_ReplyWithString(ctx, before_str); + else + RedisModule_ReplyWithStringBuffer(ctx, "", 0); + return REDISMODULE_OK; +} + +int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + if (after_str) + RedisModule_FreeString(ctx, after_str); + after_str = argv[1]; + RedisModule_RetainString(ctx, argv[1]); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + if (argc != 1){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + if (after_str) + RedisModule_ReplyWithString(ctx, after_str); + else + RedisModule_ReplyWithStringBuffer(ctx, "", 0); + return REDISMODULE_OK; +} + +int testrdb_set_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 3){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModuleString *str = RedisModule_ModuleTypeGetValue(key); + if (str) + RedisModule_FreeString(ctx, str); + RedisModule_ModuleTypeSetValue(key, testrdb_type, argv[2]); + RedisModule_RetainString(ctx, argv[2]); + RedisModule_CloseKey(key); + RedisModule_ReplyWithLongLong(ctx, 1); + return REDISMODULE_OK; +} + +int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModuleString *str = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + RedisModule_ReplyWithString(ctx, str); + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (argc > 0) + RedisModule_StringToLongLong(argv[0], &conf_aux_count); + + if (conf_aux_count==0) { + RedisModuleTypeMethods datatype_methods = { + .version = 1, + .rdb_load = testrdb_type_load, + .rdb_save = testrdb_type_save, + .aof_rewrite = NULL, + .digest = NULL, + .free = testrdb_type_free, + }; + + testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); + if (testrdb_type == NULL) + return REDISMODULE_ERR; + } else { + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = testrdb_type_load, + .rdb_save = testrdb_type_save, + .aof_rewrite = NULL, + .digest = NULL, + .free = testrdb_type_free, + .aux_load = testrdb_aux_load, + .aux_save = testrdb_aux_save, + .aux_save_triggers = (conf_aux_count == 1 ? + REDISMODULE_AUX_AFTER_RDB : + REDISMODULE_AUX_BEFORE_RDB | REDISMODULE_AUX_AFTER_RDB) + }; + + testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); + if (testrdb_type == NULL) + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"testrdb.set.before", testrdb_set_before,"deny-oom",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.after", testrdb_get_after,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.set.key", testrdb_set_key,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"testrdb.get.key", testrdb_get_key,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 41cc5612a..c2e76afad 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -99,6 +99,25 @@ proc wait_for_ofs_sync {r1 r2} { } } +proc wait_for_log_message {srv_idx pattern last_lines maxtries delay} { + set retry $maxtries + set stdout [srv $srv_idx stdout] + while {$retry} { + set result [exec tail -$last_lines < $stdout] + set result [split $result "\n"] + foreach line $result { + if {[string match $pattern $line]} { + return $line + } + } + incr retry -1 + after $delay + } + if {$retry == 0} { + fail "log message of '$pattern' not found" + } +} + # Random integer between 0 and max (excluded). proc randomInt {max} { expr {int(rand()*$max)} diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 604697be4..49e421ee9 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -61,6 +61,7 @@ set regression_vectors { {939895 151 59.149620271823181 65.204186651485145} {1412 156 149.29737817929004 15.95807862745508} {564862 149 84.062063109158544 -65.685403922426232} + {1546032440391 16751 -1.8175081637769495 20.665668878082954} } set rv_idx 0 @@ -274,8 +275,19 @@ start_server {tags {"geo"}} { foreach place $diff { set mydist [geo_distance $lon $lat $search_lon $search_lat] set mydist [expr $mydist/1000] - if {($mydist / $radius_km) > 0.999} {incr rounding_errors} + if {($mydist / $radius_km) > 0.999} { + incr rounding_errors + continue + } + if {$mydist < $radius_m} { + # This is a false positive for redis since given the + # same points the higher precision calculation provided + # by TCL shows the point within range + incr rounding_errors + continue + } } + # Make sure this is a real error and not a rounidng issue. if {[llength $diff] == $rounding_errors} { set res $res2; # Error silenced diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl new file mode 100644 index 000000000..22201a08e --- /dev/null +++ b/tests/unit/moduleapi/testrdb.tcl @@ -0,0 +1,62 @@ +set testmodule [file normalize tests/modules/testrdb.so] + +proc restart_and_wait {} { + catch { + r debug restart + } + + # wait for the server to come back up + set retry 50 + while {$retry} { + if {[catch { r ping }]} { + after 100 + } else { + break + } + incr retry -1 + } +} + +tags "modules" { + start_server [list overrides [list loadmodule "$testmodule"]] { + test {modules are able to persist types} { + r testrdb.set.key key1 value1 + assert_equal "value1" [r testrdb.get.key key1] + r debug reload + assert_equal "value1" [r testrdb.get.key key1] + } + + test {modules global are lost without aux} { + r testrdb.set.before global1 + assert_equal "global1" [r testrdb.get.before] + restart_and_wait + assert_equal "" [r testrdb.get.before] + } + } + + start_server [list overrides [list loadmodule "$testmodule 2"]] { + test {modules are able to persist globals before and after} { + r testrdb.set.before global1 + r testrdb.set.after global2 + assert_equal "global1" [r testrdb.get.before] + assert_equal "global2" [r testrdb.get.after] + restart_and_wait + assert_equal "global1" [r testrdb.get.before] + assert_equal "global2" [r testrdb.get.after] + } + + } + + start_server [list overrides [list loadmodule "$testmodule 1"]] { + test {modules are able to persist globals just after} { + r testrdb.set.after global2 + assert_equal "global2" [r testrdb.get.after] + restart_and_wait + assert_equal "global2" [r testrdb.get.after] + } + } + + + # TODO: test short read handling + +}