diff --git a/redis.conf b/redis.conf index 0ec3321a5..39e21b5e7 100644 --- a/redis.conf +++ b/redis.conf @@ -813,11 +813,11 @@ replica-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # -# volatile-lru -> Evict using approximated LRU among the keys with an expire set. +# volatile-lru -> Evict using approximated LRU, only keys with an expire set. # allkeys-lru -> Evict any key using approximated LRU. -# volatile-lfu -> Evict using approximated LFU among the keys with an expire set. +# volatile-lfu -> Evict using approximated LFU, only keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. -# volatile-random -> Remove a random key among the ones with an expire set. +# volatile-random -> Remove a random key having an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) # noeviction -> Don't evict anything, just return an error on write operations. @@ -872,6 +872,23 @@ replica-priority 100 # # replica-ignore-maxmemory yes +# Redis reclaims expired keys in two ways: upon access when those keys are +# found to be expired, and also in background, in what is called the +# "active expire key". The key space is slowly and interactively scanned +# looking for expired keys to reclaim, so that it is possible to free memory +# of keys that are expired and will never be accessed again in a short time. +# +# The default effort of the expire cycle will try to avoid having more than +# ten percent of expired keys still in memory, and will try to avoid consuming +# more than 25% of total memory and to add latency to the system. However +# it is possible to increase the expire "effort" that is normally set to +# "1", to a greater value, up to the value "10". At its maximum value the +# system will use more CPU, longer cycles (and technically may introduce +# more latency), and will tollerate less already expired keys still present +# in the system. It's a tradeoff betweeen memory, CPU and latecy. +# +# active-expire-effort 1 + ############################# LAZY FREEING #################################### # Redis has two primitives to delete keys. One is called DEL and is a blocking diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..32d5c2b8d 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,6 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/scan \ +--single unit/moduleapi/datatype \ "${@}" diff --git a/src/adlist.h b/src/adlist.h index c954fac87..28b9016ce 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -66,7 +66,7 @@ typedef struct list { #define listSetMatchMethod(l,m) ((l)->match = (m)) #define listGetDupMethod(l) ((l)->dup) -#define listGetFree(l) ((l)->free) +#define listGetFreeMethod(l) ((l)->free) #define listGetMatchMethod(l) ((l)->match) /* Prototypes */ diff --git a/src/aof.c b/src/aof.c index dda9579e3..0ef59cfb6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1766,7 +1766,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; - server.aof_current_size = server.aof_current_size; + server.aof_fsync_offset = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ diff --git a/src/blocked.c b/src/blocked.c index 14c2ff830..20c0e760a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -514,6 +514,16 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + /* Even if we are not inside call(), increment the call depth + * in order to make sure that keys are expired against a fixed + * reference time, and not against the wallclock time. This + * way we can lookup an object multiple times (BRPOPLPUSH does + * that) without the risk of it being freed in the second + * lookup, invalidating the first one. + * See https://github.com/antirez/redis/pull/6554. */ + server.fixed_time_expire++; + updateCachedTime(0); + /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); @@ -529,6 +539,7 @@ void handleClientsBlockedOnKeys(void) { * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); } + server.fixed_time_expire--; /* Free this item. */ decrRefCount(rl->key); diff --git a/src/cluster.c b/src/cluster.c index a7d8a02c3..9e6ddb2c4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4966,7 +4966,7 @@ void restoreCommand(client *c) { if (!absttl) ttl+=mstime(); setExpire(c,c->db,c->argv[1],ttl); } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; diff --git a/src/config.c b/src/config.c index 505dabc9c..de8e8d8cf 100644 --- a/src/config.c +++ b/src/config.c @@ -256,7 +256,7 @@ void loadServerConfigFromString(char *config) { for (configYesNo *config = configs_yesno; config->name != NULL; config++) { if ((!strcasecmp(argv[0],config->name) || (config->alias && !strcasecmp(argv[0],config->alias))) && - (argc == 2)) + (argc == 2)) { if ((*(config->config) = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -580,6 +580,14 @@ void loadServerConfigFromString(char *config) { err = "active-defrag-max-scan-fields must be positive"; goto loaderr; } + } else if (!strcasecmp(argv[0],"active-expire-effort") && argc == 2) { + server.active_expire_effort = atoi(argv[1]); + if (server.active_expire_effort < 1 || + server.active_expire_effort > 10) + { + err = "active-expire-effort must be between 1 and 10"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { @@ -1165,6 +1173,8 @@ void configSetCommand(client *c) { "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) { + } config_set_numerical_field( + "active-expire-effort",server.active_expire_effort,1,10) { } config_set_numerical_field( "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){ } config_set_numerical_field( @@ -1478,6 +1488,7 @@ void configGetCommand(client *c) { config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min); config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max); config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields); + config_get_numerical_field("active-expire-effort",server.active_expire_effort); config_get_numerical_field("auto-aof-rewrite-percentage", server.aof_rewrite_perc); config_get_numerical_field("auto-aof-rewrite-min-size", @@ -2327,6 +2338,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); + rewriteConfigNumericalOption(state,"active-expire-effort",server.active_expire_effort,CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT); rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); diff --git a/src/db.c b/src/db.c index e75a4d429..aac049eb7 100644 --- a/src/db.c +++ b/src/db.c @@ -1077,10 +1077,12 @@ int dbSwapDatabases(long id1, long id2) { db1->dict = db2->dict; db1->expires = db2->expires; db1->avg_ttl = db2->avg_ttl; + db1->expires_cursor = db2->expires_cursor; db2->dict = aux.dict; db2->expires = aux.expires; db2->avg_ttl = aux.avg_ttl; + db2->expires_cursor = aux.expires_cursor; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1196,6 +1198,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { /* Check if the key is expired. */ int keyIsExpired(redisDb *db, robj *key) { mstime_t when = getExpire(db,key); + mstime_t now; if (when < 0) return 0; /* No expire for this key */ @@ -1207,8 +1210,26 @@ int keyIsExpired(redisDb *db, robj *key) { * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); + if (server.lua_caller) { + now = server.lua_time_start; + } + /* If we are in the middle of a command execution, we still want to use + * a reference time that does not change: in that case we just use the + * cached time, that we update before each call in the call() function. + * This way we avoid that commands such as RPOPLPUSH or similar, that + * may re-open the same key multiple times, can invalidate an already + * open object in a next call, if the next call will see the key expired, + * while the first did not. */ + else if (server.fixed_time_expire > 0) { + now = server.mstime; + } + /* For the other cases, we want to use the most fresh time we have. */ + else { + now = mstime(); + } + /* The key expired if the current (virtual or real) time is greater + * than the expire time of the key. */ return now > when; } diff --git a/src/expire.c b/src/expire.c index 598b27f96..b4ab9ab18 100644 --- a/src/expire.c +++ b/src/expire.c @@ -78,24 +78,63 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. * - * No more than CRON_DBS_PER_CALL databases are tested at every - * iteration. + * Every expire cycle tests multiple databases: the next call will start + * again from the next db, with the exception of exists for time limit: in that + * case we restart again from the last database we were processing. Anyway + * no more than CRON_DBS_PER_CALL databases are tested at every iteration. * - * This kind of call is used when Redis detects that timelimit_exit is - * true, so there is more work to do, and we do it more incrementally from - * the beforeSleep() function of the event loop. + * The function can perform more or less work, depending on the "type" + * argument. It can execute a "fast cycle" or a "slow cycle". The slow + * cycle is the main way we collect expired cycles: this happens with + * the "server.hz" frequency (usually 10 hertz). * - * Expire cycle type: + * However the slow cycle can exit for timeout, since it used too much time. + * For this reason the function is also invoked to perform a fast cycle + * at every event loop cycle, in the beforeSleep() function. The fast cycle + * will try to perform less work, but will do it much more often. + * + * The following are the details of the two expire cycles and their stop + * conditions: * * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION * microseconds, and is not repeated again before the same amount of time. + * The cycle will also refuse to run at all if the latest slow cycle did not + * terminate because of a time limit condition. * * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period - * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ + * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the + * fast cycle, the check of every database is interrupted once the number + * of already expired keys in the database is estimated to be lower than + * a given percentage, in order to avoid doing too much work to gain too + * little memory. + * + * The configured expire "effort" will modify the baseline parameters in + * order to do more work in both the fast and slow expire cycles. + */ + +#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ +#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which + we do extra efforts. */ void activeExpireCycle(int type) { + /* Adjust the running parameters according to the configured expire + * effort. The default effort is 1, and the maximum configurable effort + * is 10. */ + unsigned long + effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */ + config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, + config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + + 2*effort, + config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- + effort; + /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ @@ -113,10 +152,16 @@ void activeExpireCycle(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limit. Also don't repeat a fast cycle for the same period + * for time limit, unless the percentage of estimated stale keys is + * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit) return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; + if (!timelimit_exit && + server.stat_expired_stale_perc < config_cycle_acceptable_stale) + return; + + if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2) + return; + last_fast_cycle = start; } @@ -130,16 +175,16 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time - * per iteration. Since this function gets called with a frequency of + /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU + * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; + timelimit = config_cycle_slow_time_perc*1000000/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) - timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ + timelimit = config_cycle_fast_duration; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still @@ -148,7 +193,9 @@ void activeExpireCycle(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; + /* Expired and checked in a single loop. */ + unsigned long expired, sampled; + redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -172,8 +219,8 @@ void activeExpireCycle(int type) { slots = dictSlots(db->expires); now = mstime(); - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... + /* When there are less than 1% filled slots, sampling the key + * space is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; @@ -181,27 +228,58 @@ void activeExpireCycle(int type) { /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; + sampled = 0; ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; + if (num > config_keys_per_loop) + num = config_keys_per_loop; - while (num--) { - dictEntry *de; - long long ttl; + /* Here we access the low level representation of the hash table + * for speed concerns: this makes this code coupled with dict.c, + * but it hardly changed in ten years. + * + * Note that certain places of the hash table may be empty, + * so we want also a stop condition about the number of + * buckets that we scanned. However scanning for free buckets + * is very fast: we are in the cache line scanning a sequential + * array of NULL pointers, so we can scan a lot more buckets + * than keys in the same time. */ + long max_buckets = num*20; + long checked_buckets = 0; - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; + while (sampled < num && checked_buckets < max_buckets) { + for (int table = 0; table < 2; table++) { + if (table == 1 && !dictIsRehashing(db->expires)) break; + + unsigned long idx = db->expires_cursor; + idx &= db->expires->ht[table].sizemask; + dictEntry *de = db->expires->ht[table].table[idx]; + long long ttl; + + /* Scan the current bucket of the current table. */ + checked_buckets++; + while(de) { + /* Get the next entry now since this entry may get + * deleted. */ + dictEntry *e = de; + de = de->next; + + ttl = dictGetSignedIntegerVal(e)-now; + if (activeExpireCycleTryExpire(db,e,now)) expired++; + if (ttl > 0) { + /* We want the average TTL of keys yet + * not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + sampled++; + } } - total_sampled++; + db->expires_cursor++; } total_expired += expired; + total_sampled += sampled; /* Update the average TTL stats for this database. */ if (ttl_samples) { @@ -225,12 +303,14 @@ void activeExpireCycle(int type) { break; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + /* We don't repeat the cycle for the current database if there are + * an acceptable amount of stale keys (logically expired but yet + * not reclained). */ + } while ((expired*100/sampled) > config_cycle_acceptable_stale); } elapsed = ustime()-start; + server.stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh index e6d558b17..236c26c2b 100755 --- a/src/mkreleasehdr.sh +++ b/src/mkreleasehdr.sh @@ -3,7 +3,7 @@ GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l` BUILD_ID=`uname -n`"-"`date +%s` if [ -n "$SOURCE_DATE_EPOCH" ]; then - BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u %s) + BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u +%s) fi test -f release.h || touch release.h (cat release.h | grep SHA1 | grep $GIT_SHA1) && \ diff --git a/src/module.c b/src/module.c index 0ee38a5eb..d2a5248ff 100644 --- a/src/module.c +++ b/src/module.c @@ -1412,7 +1412,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c, "", 0); + addReply(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1427,8 +1427,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) return REDISMODULE_OK; } -/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded - * as the string "$-1\r\n". +/* Reply to the client with a NULL. * * The function always returns REDISMODULE_OK. */ int RM_ReplyWithNull(RedisModuleCtx *ctx) { @@ -1787,6 +1786,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before * reaching the maxmemory level. * + * * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF + * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master. * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to @@ -1886,6 +1887,18 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +/* Initialize a RedisModuleKey struct */ +static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ + kp->ctx = ctx; + kp->db = ctx->client->db; + kp->key = keyname; + incrRefCount(keyname); + kp->value = value; + kp->iter = NULL; + kp->mode = mode; + zsetKeyReset(kp); +} + /* Return an handle representing a Redis key, so that it is possible * to call other APIs with the key handle as argument to perform * operations on the key. @@ -1916,27 +1929,25 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - kp->ctx = ctx; - kp->db = ctx->client->db; - kp->key = keyname; - incrRefCount(keyname); - kp->value = value; - kp->iter = NULL; - kp->mode = mode; - zsetKeyReset(kp); + moduleInitKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -/* Close a key handle. */ -void RM_CloseKey(RedisModuleKey *key) { - if (key == NULL) return; +/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */ +static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); +} + +/* Close a key handle. */ +void RM_CloseKey(RedisModuleKey *key) { + if (key == NULL) return; + moduleCloseKey(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -1988,7 +1999,7 @@ int RM_DeleteKey(RedisModuleKey *key) { return REDISMODULE_OK; } -/* If the key is open for writing, unlink it (that is delete it in a +/* If the key is open for writing, unlink it (that is delete it in a * non-blocking way, not reclaiming memory immediately) and setup the key to * accept new writes as an empty key (that will be created on demand). * On success REDISMODULE_OK is returned. If the key is not open for @@ -3148,7 +3159,9 @@ fmterr: * On success a RedisModuleCallReply object is returned, otherwise * NULL is returned and errno is set to the following values: * - * EINVAL: command non existing, wrong arity, wrong format specifier. + * EBADF: wrong format specifier. + * EINVAL: wrong command arity. + * ENOENT: command does not exist. * EPERM: operation in Cluster instance with key in non local slot. * * This API is documented here: https://redis.io/topics/modules-intro @@ -3180,7 +3193,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) { - errno = EINVAL; + errno = EBADF; goto cleanup; } @@ -3192,7 +3205,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - errno = EINVAL; + errno = ENOENT; goto cleanup; } c->cmd = c->lastcmd = cmd; @@ -3922,6 +3935,59 @@ void RM_DigestEndSequence(RedisModuleDigest *md) { memset(md->o,0,sizeof(md->o)); } +/* Decode a serialized representation of a module data type 'mt' from string + * 'str' and return a newly allocated value, or NULL if decoding failed. + * + * This call basically reuses the 'rdb_load' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + * + * Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and + * make sure the de-serialization code properly checks and handles IO errors + * (freeing allocated buffers and returning a NULL). + * + * If this is NOT done, Redis will handle corrupted (or just truncated) serialized + * data by producing an error message and terminating the process. + */ + +void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload, str->ptr); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + + /* All RM_Save*() calls always write a version 2 compatible format, so we + * need to make sure we read the same. + */ + io.ver = 2; + return mt->rdb_load(&io,0); +} + +/* Encode a module data type 'mt' value 'data' into serialized form, and return it + * as a newly allocated RedisModuleString. + * + * This call basically reuses the 'rdb_save' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + */ + +RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload,sdsempty()); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + mt->rdb_save(&io,data); + if (io.error) { + return NULL; + } else { + robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str); + return str; + } +} + /* -------------------------------------------------------------------------- * AOF API for modules data types * -------------------------------------------------------------------------- */ @@ -5929,6 +5995,239 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/* -------------------------------------------------------------------------- + * Scanning keyspace and hashes + * -------------------------------------------------------------------------- */ + +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); +typedef struct { + RedisModuleCtx *ctx; + void* user_data; + RedisModuleScanCB fn; +} ScanCBData; + +typedef struct RedisModuleScanCursor{ + int cursor; + int done; +}RedisModuleScanCursor; + +static void moduleScanCallback(void *privdata, const dictEntry *de) { + ScanCBData *data = privdata; + sds key = dictGetKey(de); + robj* val = dictGetVal(de); + RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key)); + + /* Setup the key handle. */ + RedisModuleKey kp = {0}; + moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + + data->fn(data->ctx, keyname, &kp, data->user_data); + + moduleCloseKey(&kp); + decrRefCount(keyname); +} + +/* Create a new cursor to be used with RedisModule_Scan */ +RedisModuleScanCursor *RM_ScanCursorCreate() { + RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor)); + cursor->cursor = 0; + cursor->done = 0; + return cursor; +} + +/* Restart an existing cursor. The keys will be rescanned. */ +void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) { + cursor->cursor = 0; + cursor->done = 0; +} + +/* Destroy the cursor struct. */ +void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) { + zfree(cursor); +} + +/* Scan api that allows a module to scan all the keys and value in the selected db. + * + * Callback for scan implementation. + * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); + * - ctx - the redis module context provided to for the scan. + * - keyname - owned by the caller and need to be retained if used after this function. + * - key - holds info on the key and value, it is provided as best effort, in some cases it might + * be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too). + * when it is provided, it is owned by the caller and will be free when the callback returns. + * - privdata - the user data provided to RedisModule_Scan. + * + * The way it should be used: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * while(RedisModule_Scan(ctx, c, callback, privateData)); + * RedisModule_ScanCursorDestroy(c); + * + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); + * while(RedisModule_Scan(ctx, c, callback, privateData)){ + * RedisModule_ThreadSafeContextUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeContextLock(ctx); + * } + * RedisModule_ScanCursorDestroy(c); + * + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) { + if (cursor->done) { + errno = ENOENT; + return 0; + } + int ret = 1; + ScanCBData data = { ctx, privdata, fn }; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; + ret = 0; + } + errno = 0; + return ret; +} + +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); +typedef struct { + RedisModuleKey *key; + void* user_data; + RedisModuleScanKeyCB fn; +} ScanKeyCBData; + +static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { + ScanKeyCBData *data = privdata; + sds key = dictGetKey(de); + robj *o = data->key->value; + robj *field = createStringObject(key, sdslen(key)); + robj *value = NULL; + if (o->type == OBJ_SET) { + value = NULL; + } else if (o->type == OBJ_HASH) { + sds val = dictGetVal(de); + value = createStringObject(val, sdslen(val)); + } else if (o->type == OBJ_ZSET) { + double *val = (double*)dictGetVal(de); + value = createStringObjectFromLongDouble(*val, 0); + } + + data->fn(data->key, field, value, data->user_data); + decrRefCount(field); + if (value) decrRefCount(value); +} + +/* Scan api that allows a module to scan the elements in a hash, set or sorted set key + * + * Callback for scan implementation. + * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata); + * - key - the redis key context provided to for the scan. + * - field - field name, owned by the caller and need to be retained if used + * after this function. + * - value - value string or NULL for set type, owned by the caller and need to + * be retained if used after this function. + * - privdata - the user data provided to RedisModule_ScanKey. + * + * The way it should be used: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(key, c, callback, privateData)); + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan, and re-opening the key each time: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(ctx, c, callback, privateData)){ + * RedisModule_CloseKey(key); + * RedisModule_ThreadSafeContextUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * } + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) { + if (key == NULL || key->value == NULL) { + errno = EINVAL; + return 0; + } + dict *ht = NULL; + robj *o = key->value; + if (o->type == OBJ_SET) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_HASH) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_ZSET) { + if (o->encoding == OBJ_ENCODING_SKIPLIST) + ht = ((zset *)o->ptr)->dict; + } else { + errno = EINVAL; + return 0; + } + if (cursor->done) { + errno = ENOENT; + return 0; + } + int ret = 1; + if (ht) { + ScanKeyCBData data = { key, privdata, fn }; + cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; + ret = 0; + } + } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) { + int pos = 0; + int64_t ll; + while(intsetGet(o->ptr,pos++,&ll)) { + robj *field = createStringObjectFromLongLong(ll); + fn(key, field, NULL, privdata); + decrRefCount(field); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { + unsigned char *p = ziplistIndex(o->ptr,0); + unsigned char *vstr; + unsigned int vlen; + long long vll; + while(p) { + ziplistGet(p,&vstr,&vlen,&vll); + robj *field = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + p = ziplistNext(o->ptr,p); + ziplistGet(p,&vstr,&vlen,&vll); + robj *value = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + fn(key, field, value, privdata); + p = ziplistNext(o->ptr,p); + decrRefCount(field); + decrRefCount(value); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } + errno = 0; + return ret; +} + + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6774,38 +7073,82 @@ size_t moduleCount(void) { return dictSize(modules); } -/* Set the key LRU/LFU depending on server.maxmemory_policy. - * The lru_idle arg is idle time in seconds, and is only relevant if the - * eviction policy is LRU based. - * The lfu_freq arg is a logarithmic counter that provides an indication of - * the access frequencyonly (must be <= 255) and is only relevant if the - * eviction policy is LFU based. - * Either or both of them may be <0, in that case, nothing is set. */ -/* return value is an indication if the lru field was updated or not. */ -int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) { +/* Set the key last access time for LRU based eviction. not relevent if the + * servers's maxmemory policy is LFU based. Value is idle time in milliseconds. + * returns REDISMODULE_OK if the LRU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLRU(RedisModuleKey *key, mstime_t lru_idle) { if (!key->value) return REDISMODULE_ERR; - if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0)) + if (objectSetLRUOrLFU(key->value, -1, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0, 1)) return REDISMODULE_OK; return REDISMODULE_ERR; } -/* Gets the key LRU or LFU (depending on the current eviction policy). - * One will be set to the appropiate return value, and the other will be set to -1. - * see RedisModule_SetLRUOrLFU for units and ranges. - * return value is an indication of success. */ -int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) { - *lru_idle = *lfu_freq = -1; +/* Gets the key last access time. + * Value is idletime in milliseconds or -1 if the server's eviction policy is + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLRU(RedisModuleKey *key, mstime_t *lru_idle) { + *lru_idle = -1; if (!key->value) return REDISMODULE_ERR; - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - *lfu_freq = LFUDecrAndReturn(key->value); - } else { - *lru_idle = estimateObjectIdleTime(key->value)/1000; - } + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) + return REDISMODULE_OK; + *lru_idle = estimateObjectIdleTime(key->value); return REDISMODULE_OK; } +/* Set the key access frequency. only relevant if the server's maxmemory policy + * is LFU based. + * The frequency is a logarithmic counter that provides an indication of + * the access frequencyonly (must be <= 255). + * returns REDISMODULE_OK if the LFU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLFU(RedisModuleKey *key, long long lfu_freq) { + if (!key->value) + return REDISMODULE_ERR; + if (objectSetLRUOrLFU(key->value, lfu_freq, -1, 0, 1)) + return REDISMODULE_OK; + return REDISMODULE_ERR; +} + +/* Gets the key access frequency or -1 if the server's eviction policy is not + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { + *lfu_freq = -1; + if (!key->value) + return REDISMODULE_ERR; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) + *lfu_freq = LFUDecrAndReturn(key->value); + return REDISMODULE_OK; +} + +/* Replace the value assigned to a module type. + * + * The key must be open for writing, have an existing value, and have a moduleType + * that matches the one specified by the caller. + * + * Unlike RM_ModuleTypeSetValue() which will free the old value, this function + * simply swaps the old value with the new value. + * + * The function returns the old value, or NULL if any of the above conditions is + * not met. + */ +void *RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_value) { + if (!(key->mode & REDISMODULE_WRITE) || key->iter) + return NULL; + if (!key->value || key->value->type != OBJ_MODULE) + return NULL; + + moduleValue *mv = key->value->ptr; + if (mv->type != mt) + return NULL; + + void *old_val = mv->value; + mv->value = new_value; + return old_val; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -6898,6 +7241,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(PoolAlloc); REGISTER_API(CreateDataType); REGISTER_API(ModuleTypeSetValue); + REGISTER_API(ModuleTypeReplaceValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); REGISTER_API(IsIOError); @@ -6917,6 +7261,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(LoadFloat); REGISTER_API(SaveLongDouble); REGISTER_API(LoadLongDouble); + REGISTER_API(SaveDataTypeToString); + REGISTER_API(LoadDataTypeFromString); REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); @@ -7007,9 +7353,16 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetClientInfoById); REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); - REGISTER_API(SetLRUOrLFU); - REGISTER_API(GetLRUOrLFU); + REGISTER_API(SetLRU); + REGISTER_API(GetLRU); + REGISTER_API(SetLFU); + REGISTER_API(GetLFU); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); + REGISTER_API(ScanCursorCreate); + REGISTER_API(ScanCursorDestroy); + REGISTER_API(ScanCursorRestart); + REGISTER_API(Scan); + REGISTER_API(ScanKey); } diff --git a/src/object.c b/src/object.c index 5e9a99dec..c6d89bfa7 100644 --- a/src/object.c +++ b/src/object.c @@ -1210,7 +1210,7 @@ sds getMemoryDoctorReport(void) { * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock) { + long long lru_clock, int lru_multiplier) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); @@ -1222,7 +1222,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, * according to the LRU clock resolution this Redis * instance was compiled with (normally 1000 ms, so the * below statement will expand to lru_idle*1000/1000. */ - lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION; + lru_idle = lru_idle*lru_multiplier/LRU_CLOCK_RESOLUTION; long lru_abs = lru_clock - lru_idle; /* Absolute access time. */ /* If the LRU field underflows (since LRU it is a wrapping * clock), the best we can do is to provide a large enough LRU diff --git a/src/rax.c b/src/rax.c index be474b058..29b74ae90 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1673,6 +1673,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { * node, but will be our match, representing the key "f". * * So in that case, we don't seek backward. */ + it->data = raxGetData(it->node); } else { if (gt && !raxIteratorNextStep(it,0)) return 0; if (lt && !raxIteratorPrevStep(it,0)) return 0; @@ -1791,7 +1792,7 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key if (eq && key_len == iter->key_len) return 1; else if (lt) return iter->key_len < key_len; else if (gt) return iter->key_len > key_len; - return 0; + else return 0; /* Avoid warning, just 'eq' is handled before. */ } else if (cmp > 0) { return gt ? 1 : 0; } else /* (cmp < 0) */ { diff --git a/src/rdb.c b/src/rdb.c index b569edfea..27e1b3135 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2006,7 +2006,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* The DB can take some non trivial amount of time to load. Update * our cached time since it is used to create and update the last * interaction time with clients and for other important things. */ - updateCachedTime(); + updateCachedTime(0); if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); @@ -2239,7 +2239,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (expiretime != -1) setExpire(NULL,db,key,expiretime); /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); /* Decrement the key refcount since dbAdd() will take its * own reference. */ diff --git a/src/redismodule.h b/src/redismodule.h index 98939e93c..1d14ba799 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; +typedef struct RedisModuleScanCursor RedisModuleScanCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -409,6 +410,8 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -520,6 +523,7 @@ int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); +void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeReplaceValue)(RedisModuleKey *key, RedisModuleType *mt, void *new_value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); @@ -540,6 +544,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value) float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value); long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); +void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); @@ -586,11 +592,18 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); -int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); -int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLRU)(RedisModuleKey *key, mstime_t lru_idle); +int REDISMODULE_API_FUNC(RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq); +int REDISMODULE_API_FUNC(RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq); RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); +RedisModuleScanCursor *REDISMODULE_API_FUNC(RedisModule_ScanCursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata); +int REDISMODULE_API_FUNC(RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -732,6 +745,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(PoolAlloc); REDISMODULE_GET_API(CreateDataType); REDISMODULE_GET_API(ModuleTypeSetValue); + REDISMODULE_GET_API(ModuleTypeReplaceValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); REDISMODULE_GET_API(IsIOError); @@ -751,6 +765,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(LoadFloat); REDISMODULE_GET_API(SaveLongDouble); REDISMODULE_GET_API(LoadLongDouble); + REDISMODULE_GET_API(SaveDataTypeToString); + REDISMODULE_GET_API(LoadDataTypeFromString); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); @@ -800,11 +816,18 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); - REDISMODULE_GET_API(SetLRUOrLFU); - REDISMODULE_GET_API(GetLRUOrLFU); + REDISMODULE_GET_API(SetLRU); + REDISMODULE_GET_API(GetLRU); + REDISMODULE_GET_API(SetLFU); + REDISMODULE_GET_API(GetLFU); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); + REDISMODULE_GET_API(ScanCursorCreate); + REDISMODULE_GET_API(ScanCursorRestart); + REDISMODULE_GET_API(ScanCursorDestroy); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(ScanKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/sentinel.c b/src/sentinel.c index 0490db4e9..42c4d2467 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -3993,11 +3993,14 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - retval = redisAsyncCommand(ri->link->cc, - sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal", - sentinelInstanceMapCommand(ri,"CLIENT")); - if (retval == C_ERR) return retval; - ri->link->pending_commands++; + for (int type = 0; type < 2; type++) { + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", + sentinelInstanceMapCommand(ri,"CLIENT"), + type == 0 ? "normal" : "pubsub"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + } retval = redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s", diff --git a/src/server.c b/src/server.c index f42924764..2e9a329dd 100644 --- a/src/server.c +++ b/src/server.c @@ -1736,20 +1736,29 @@ void databasesCron(void) { /* We take a cached value of the unix time in the global state because with * virtual memory and aging there is to store the current time in objects at * every object access, and accuracy is not needed. To access a global var is - * a lot faster than calling time(NULL) */ -void updateCachedTime(void) { - server.unixtime = time(NULL); - server.mstime = mstime(); + * a lot faster than calling time(NULL). + * + * This function should be fast because it is called at every command execution + * in call(), so it is possible to decide if to update the daylight saving + * info or not using the 'update_daylight_info' argument. Normally we update + * such info only when calling this function from serverCron() but not when + * calling it from call(). */ +void updateCachedTime(int update_daylight_info) { + server.ustime = ustime(); + server.mstime = server.ustime / 1000; + server.unixtime = server.mstime / 1000; /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this * context is safe since we will never fork() while here, in the main * thread. The logging function will call a thread safe version of * localtime that has no locks. */ - struct tm tm; - time_t ut = server.unixtime; - localtime_r(&ut,&tm); - server.daylight_active = tm.tm_isdst; + if (update_daylight_info) { + struct tm tm; + time_t ut = server.unixtime; + localtime_r(&ut,&tm); + server.daylight_active = tm.tm_isdst; + } } void checkChildrenDone(void) { @@ -1838,7 +1847,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ - updateCachedTime(); + updateCachedTime(1); server.hz = server.config_hz; /* Adapt the server.hz value to the number of configured clients. If we have @@ -2258,7 +2267,7 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - updateCachedTime(); + updateCachedTime(1); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); @@ -2285,6 +2294,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.active_expire_effort = CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT; server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; @@ -2736,6 +2746,7 @@ void resetServerStats(void) { server.stat_expiredkeys = 0; server.stat_expired_stale_perc = 0; server.stat_expired_time_cap_reached_count = 0; + server.stat_expire_cycle_time_used = 0; server.stat_evictedkeys = 0; server.stat_keyspace_misses = 0; server.stat_keyspace_hits = 0; @@ -2777,6 +2788,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; + server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -2838,6 +2850,7 @@ void initServer(void) { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); + server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); @@ -3244,10 +3257,13 @@ void preventCommandReplication(client *c) { * */ void call(client *c, int flags) { - long long dirty, start, duration; + long long dirty; + ustime_t start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; + server.fixed_time_expire++; + /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && @@ -3265,7 +3281,8 @@ void call(client *c, int flags) { /* Call the command. */ dirty = server.dirty; - start = ustime(); + updateCachedTime(0); + start = server.ustime; c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; @@ -3372,6 +3389,7 @@ void call(client *c, int flags) { trackingRememberKeys(caller); } + server.fixed_time_expire--; server.stat_numcommands++; } @@ -4253,6 +4271,7 @@ sds genRedisInfoString(char *section) { "expired_keys:%lld\r\n" "expired_stale_perc:%.2f\r\n" "expired_time_cap_reached_count:%lld\r\n" + "expire_cycle_cpu_milliseconds:%lld\r\n" "evicted_keys:%lld\r\n" "keyspace_hits:%lld\r\n" "keyspace_misses:%lld\r\n" @@ -4280,6 +4299,7 @@ sds genRedisInfoString(char *section) { server.stat_expiredkeys, server.stat_expired_stale_perc*100, server.stat_expired_time_cap_reached_count, + server.stat_expire_cycle_time_used/1000, server.stat_evictedkeys, server.stat_keyspace_hits, server.stat_keyspace_misses, diff --git a/src/server.h b/src/server.h index 8063dc101..ad47fe80b 100644 --- a/src/server.h +++ b/src/server.h @@ -50,6 +50,7 @@ #include typedef long long mstime_t; /* millisecond time type. */ +typedef long long ustime_t; /* microsecond time type. */ #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -178,10 +179,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ +#define CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT 1 /* From 1 to 10. */ -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ -#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ -#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -720,6 +719,7 @@ typedef struct redisDb { dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ + unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; @@ -1133,7 +1133,8 @@ struct redisServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ - client *current_client; /* Current client, only used on crash report */ + client *current_client; /* Current client executing the command. */ + long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ @@ -1165,6 +1166,7 @@ struct redisServer { long long stat_expiredkeys; /* Number of expired keys */ double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/ + long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ long long stat_keyspace_misses; /* Number of failed lookups of keys */ @@ -1203,6 +1205,7 @@ struct redisServer { int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ + int active_expire_effort; /* From 1 (default) to 10, active effort. */ int active_defrag_enabled; int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ @@ -1400,7 +1403,8 @@ struct redisServer { _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* 'unixtime' with milliseconds resolution. */ + mstime_t mstime; /* 'unixtime' in milliseconds. */ + ustime_t ustime; /* 'unixtime' in microseconds. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1999,7 +2003,7 @@ void populateCommandTable(void); void resetCommandTableStats(void); void adjustOpenFilesLimit(void); void closeListeningSockets(int unlink_unix_socket); -void updateCachedTime(void); +void updateCachedTime(int update_daylight_info); void resetServerStats(void); void activeDefragCycle(void); unsigned int getLRUClock(void); @@ -2089,7 +2093,7 @@ robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags); robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock); + long long lru_clock, int lru_multiplier); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) void dbAdd(redisDb *db, robj *key, robj *val); diff --git a/src/t_stream.c b/src/t_stream.c index 58b59f521..0b07d7110 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1220,6 +1220,14 @@ void xaddCommand(client *c) { return; } + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + * a new stream and have streamAppendItem fail, leaving an empty key in the + * database. */ + if (id_given && id.ms == 0 && id.seq == 0) { + addReplyError(c,"The ID specified in XADD must be greater than 0-0"); + return; + } + /* Lookup the stream at key. */ robj *o; stream *s; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..f33f9e80e 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,9 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + scan.so \ + datatype.so .PHONY: all diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c new file mode 100644 index 000000000..7c39ab457 --- /dev/null +++ b/tests/modules/datatype.c @@ -0,0 +1,161 @@ +/* This module current tests a small subset but should be extended in the future + * for general ModuleDataType coverage. + */ + +#include "redismodule.h" + +static RedisModuleType *datatype = NULL; + +typedef struct { + long long intval; + RedisModuleString *strval; +} DataType; + +static void *datatype_load(RedisModuleIO *io, int encver) { + (void) encver; + + int intval = RedisModule_LoadSigned(io); + if (RedisModule_IsIOError(io)) return NULL; + + RedisModuleString *strval = RedisModule_LoadString(io); + if (RedisModule_IsIOError(io)) return NULL; + + DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType)); + dt->intval = intval; + dt->strval = strval; + return dt; +} + +static void datatype_save(RedisModuleIO *io, void *value) { + DataType *dt = (DataType *) value; + RedisModule_SaveSigned(io, dt->intval); + RedisModule_SaveString(io, dt->strval); +} + +static void datatype_free(void *value) { + if (value) { + DataType *dt = (DataType *) value; + + if (dt->strval) RedisModule_FreeString(NULL, dt->strval); + RedisModule_Free(dt); + } +} + +static int datatype_set(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + long long intval; + + if (RedisModule_StringToLongLong(argv[2], &intval) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integr value"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + DataType *dt = RedisModule_Calloc(sizeof(DataType), 1); + dt->intval = intval; + dt->strval = argv[3]; + RedisModule_RetainString(ctx, dt->strval); + + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + DataType *dt = RedisModule_LoadDataTypeFromString(argv[2], datatype); + if (!dt) { + RedisModule_ReplyWithError(ctx, "Invalid data"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_get(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, dt->intval); + RedisModule_ReplyWithString(ctx, dt->strval); + return REDISMODULE_OK; +} + +static int datatype_dump(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModuleString *reply = RedisModule_SaveDataTypeToString(ctx, dt, datatype); + if (!reply) { + RedisModule_ReplyWithError(ctx, "Failed to save"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithString(ctx, reply); + RedisModule_FreeString(ctx, reply); + return REDISMODULE_OK; +} + + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"datatype",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = datatype_load, + .rdb_save = datatype_save, + .free = datatype_free, + }; + + datatype = RedisModule_CreateDataType(ctx, "test___dt", 1, &datatype_methods); + if (datatype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.set", datatype_set,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.get", datatype_get,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.restore", datatype_restore,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.dump", datatype_dump,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/modules/misc.c b/tests/modules/misc.c index ba0710538..b5a032f60 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -81,6 +81,107 @@ final: return REDISMODULE_OK; } +int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ResetDataset(1, 0); + RedisModule_ReplyWithCString(ctx, "Ok"); + return REDISMODULE_OK; +} + +int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + long long ll = RedisModule_DbSize(ctx); + RedisModule_ReplyWithLongLong(ctx, ll); + return REDISMODULE_OK; +} + +int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *str = RedisModule_RandomKey(ctx); + RedisModule_ReplyWithString(ctx, str); + RedisModule_FreeString(ctx, str); + return REDISMODULE_OK; +} + +RedisModuleKey *open_key_or_reply(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + if (!key) { + RedisModule_ReplyWithError(ctx, "key not found"); + return NULL; + } + return key; +} + +int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + RedisModule_GetLRU(key, &lru); + RedisModule_ReplyWithLongLong(ctx, lru); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + if (RedisModule_StringToLongLong(argv[2], &lru) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid idle time"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLRU(key, lru)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_getlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + RedisModule_GetLFU(key, &lfu); + RedisModule_ReplyWithLongLong(ctx, lfu); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + if (RedisModule_StringToLongLong(argv[2], &lfu) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid freq"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLFU(key, lfu)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -93,5 +194,20 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.ld_conversion", test_ld_conv, "",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlfu", test_setlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlfu", test_getlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/modules/scan.c b/tests/modules/scan.c new file mode 100644 index 000000000..afede244b --- /dev/null +++ b/tests/modules/scan.c @@ -0,0 +1,109 @@ +#include "redismodule.h" + +#include +#include +#include + +typedef struct { + size_t nkeys; +} scan_strings_pd; + +void scan_strings_callback(RedisModuleCtx *ctx, RedisModuleString* keyname, RedisModuleKey* key, void *privdata) { + scan_strings_pd* pd = privdata; + int was_opened = 0; + if (!key) { + key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ); + was_opened = 1; + } + + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING) { + size_t len; + char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithString(ctx, keyname); + RedisModule_ReplyWithStringBuffer(ctx, data, len); + pd->nkeys++; + } + if (was_opened) + RedisModule_CloseKey(key); +} + +int scan_strings(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + scan_strings_pd pd = { + .nkeys = 0, + }; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_strings_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nkeys); + return REDISMODULE_OK; +} + +typedef struct { + RedisModuleCtx *ctx; + size_t nreplies; +} scan_key_pd; + +void scan_key_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata) { + REDISMODULE_NOT_USED(key); + scan_key_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + RedisModule_ReplyWithString(pd->ctx, field); + if (value) + RedisModule_ReplyWithString(pd->ctx, value); + else + RedisModule_ReplyWithNull(pd->ctx); + pd->nreplies++; +} + +int scan_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + scan_key_pd pd = { + .ctx = ctx, + .nreplies = 0, + }; + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (!key) { + RedisModule_ReplyWithError(ctx, "not found"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_ScanKey(key, cursor, scan_key_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nreplies); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scan_strings", scan_strings, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scan_key", scan_key, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} + + diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 8a262e8a7..7c04bb4ef 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -18,8 +18,11 @@ void *testrdb_type_load(RedisModuleIO *rdb, int encver) { RedisModuleString *str = RedisModule_LoadString(rdb); float f = RedisModule_LoadFloat(rdb); long double ld = RedisModule_LoadLongDouble(rdb); - if (RedisModule_IsIOError(rdb)) + if (RedisModule_IsIOError(rdb)) { + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); + RedisModule_FreeString(ctx, str); return NULL; + } /* Using the values only after checking for io errors. */ assert(count==1); assert(encver==1); diff --git a/tests/support/test.tcl b/tests/support/test.tcl index 2646acecd..5e8916236 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -11,28 +11,55 @@ proc fail {msg} { proc assert {condition} { if {![uplevel 1 [list expr $condition]]} { - error "assertion:Expected condition '$condition' to be true ([uplevel 1 [list subst -nocommands $condition]])" + set context "(context: [info frame -1])" + error "assertion:Expected [uplevel 1 [list subst -nocommands $condition]] $context" } } proc assert_no_match {pattern value} { if {[string match $pattern $value]} { - error "assertion:Expected '$value' to not match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to not match '$pattern' $context" } } proc assert_match {pattern value} { if {![string match $pattern $value]} { - error "assertion:Expected '$value' to match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to match '$pattern' $context" } } -proc assert_equal {expected value {detail ""}} { +proc assert_equal {value expected {detail ""}} { if {$expected ne $value} { if {$detail ne ""} { - set detail " (detail: $detail)" + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" } - error "assertion:Expected '$value' to be equal to '$expected'$detail" + error "assertion:Expected '$value' to be equal to '$expected' $detail" + } +} + +proc assert_lessthan {value expected {detail ""}} { + if {!($value < $expected)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be lessthan to '$expected' $detail" + } +} + +proc assert_range {value min max {detail ""}} { + if {!($value <= $max && $value >= $min)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be between to '$min' and '$max' $detail" } } diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl new file mode 100644 index 000000000..c1da696d3 --- /dev/null +++ b/tests/unit/moduleapi/datatype.tcl @@ -0,0 +1,27 @@ +set testmodule [file normalize tests/modules/datatype.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {DataType: Test module is sane, GET/SET work.} { + r datatype.set dtkey 100 stringval + assert {[r datatype.get dtkey] eq {100 stringval}} + } + + test {DataType: RM_SaveDataTypeToString(), RM_LoadDataTypeFromString() work} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + + r datatype.restore dtkeycopy $encoded + assert {[r datatype.get dtkeycopy] eq {-1111 MyString}} + } + + test {DataType: Handle truncated RM_LoadDataTypeFromString()} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + set truncated [string range $encoded 0 end-1] + + catch {r datatype.restore dtkeycopy $truncated} e + set e + } {*Invalid*} +} diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index 21529747b..748016f1a 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -21,4 +21,50 @@ start_server {tags {"modules"}} { assert {[string match $ld "0.00000000000000001"]} } + test {test module db commands} { + r set x foo + set key [r test.randomkey] + assert_equal $key "x" + assert_equal [r test.dbsize] 1 + r test.flushall + assert_equal [r test.dbsize] 0 + } + + test {test modle lru api} { + r config set maxmemory-policy allkeys-lru + r set x foo + set lru [r test.getlru x] + assert { $lru <= 1000 } + set was_set [r test.setlru x 100000] + assert { $was_set == 1 } + set idle [r object idletime x] + assert { $idle >= 100 } + set lru [r test.getlru x] + assert { $lru >= 100000 } + r config set maxmemory-policy allkeys-lfu + set lru [r test.getlru x] + assert { $lru == -1 } + set was_set [r test.setlru x 100000] + assert { $was_set == 0 } + } + r config set maxmemory-policy allkeys-lru + + test {test modle lfu api} { + r config set maxmemory-policy allkeys-lfu + r set x foo + set lfu [r test.getlfu x] + assert { $lfu >= 1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 1 } + set freq [r object freq x] + assert { $freq <= 100 } + set lfu [r test.getlfu x] + assert { $lfu <= 100 } + r config set maxmemory-policy allkeys-lru + set lfu [r test.getlfu x] + assert { $lfu == -1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 0 } + } + } diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl new file mode 100644 index 000000000..de1672e0a --- /dev/null +++ b/tests/unit/moduleapi/scan.tcl @@ -0,0 +1,47 @@ +set testmodule [file normalize tests/modules/scan.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module scan keyspace} { + # the module create a scan command with filtering which also return values + r set x 1 + r set y 2 + r set z 3 + r hset h f v + lsort [r scan.scan_strings] + } {{x 1} {y 2} {z 3}} + + test {Module scan hash ziplist} { + r hmset hh f1 v1 f2 v2 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2}} + + test {Module scan hash dict} { + r config set hash-max-ziplist-entries 2 + r hmset hh f3 v3 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2} {f3 v3}} + + test {Module scan zset ziplist} { + r zadd zz 1 f1 2 f2 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2}} + + test {Module scan zset dict} { + r config set zset-max-ziplist-entries 2 + r zadd zz 3 f3 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2} {f3 3}} + + test {Module scan set intset} { + r sadd ss 1 2 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}}} + + test {Module scan set dict} { + r config set set-max-intset-entries 2 + r sadd ss 3 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}} {3 {}}} +} diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index aa9c5f3a9..656bac5de 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -123,6 +123,12 @@ start_server { assert {[r xlen mystream] == $j} } + test {XADD with ID 0-0} { + r DEL otherstream + catch {r XADD otherstream 0-0 k v} err + assert {[r EXISTS otherstream] == 0} + } + test {XRANGE COUNT works as expected} { assert {[llength [r xrange mystream - + COUNT 10]] == 10} }