diff --git a/CONTRIBUTING b/CONTRIBUTING index f57de3fd9..5b0f02953 100644 --- a/CONTRIBUTING +++ b/CONTRIBUTING @@ -8,7 +8,9 @@ each source file that you contribute. # IMPORTANT: HOW TO USE REDIS GITHUB ISSUES * Github issues SHOULD ONLY BE USED to report bugs, and for DETAILED feature - requests. Everything else belongs to the Redis Google Group. + requests. Everything else belongs to the Redis Google Group: + + https://groups.google.com/forum/m/#!forum/Redis-db PLEASE DO NOT POST GENERAL QUESTIONS that are not about bugs or suspected bugs in the Github issues system. We'll be very happy to help you and provide diff --git a/README.md b/README.md index 42ab47853..8dbad7dbf 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ top comment inside `server.c`. After the command operates in some way, it returns a reply to the client, usually using `addReply()` or a similar function defined inside `networking.c`. -There are tons of commands implementations inside th Redis source code +There are tons of commands implementations inside the Redis source code that can serve as examples of actual commands implementations. To write a few toy commands can be a good exercise to familiarize with the code base. diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index c829ac60c..540c168e5 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -215,4 +215,32 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra, return arena_ralloc_no_move(tsdn, ptr, oldsize, size, extra, zero); } +JEMALLOC_ALWAYS_INLINE int +iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) { + int defrag = 0; + rtree_ctx_t rtree_ctx_fallback; + rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback); + szind_t szind; + bool is_slab; + rtree_szind_slab_read(tsdn, &extents_rtree, rtree_ctx, (uintptr_t)ptr, true, &szind, &is_slab); + if (likely(is_slab)) { + /* Small allocation. */ + extent_t *slab = iealloc(tsdn, ptr); + arena_t *arena = extent_arena_get(slab); + szind_t binind = extent_szind_get(slab); + bin_t *bin = &arena->bins[binind]; + malloc_mutex_lock(tsdn, &bin->lock); + /* don't bother moving allocations from the slab currently used for new allocations */ + if (slab != bin->slabcur) { + const bin_info_t *bin_info = &bin_infos[binind]; + size_t availregs = bin_info->nregs * bin->stats.curslabs; + *bin_util = (bin->stats.curregs<<16) / availregs; + *run_util = ((bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs; + defrag = 1; + } + malloc_mutex_unlock(tsdn, &bin->lock); + } + return defrag; +} + #endif /* JEMALLOC_INTERNAL_INLINES_C_H */ diff --git a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in index aee55438c..daf9e571b 100644 --- a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in +++ b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in @@ -120,3 +120,7 @@ # define JEMALLOC_RESTRICT_RETURN # define JEMALLOC_ALLOCATOR #endif + +/* This version of Jemalloc, modified for Redis, has the je_get_defrag_hint() + * function. */ +#define JEMALLOC_FRAG_HINT diff --git a/deps/jemalloc/src/jemalloc.c b/deps/jemalloc/src/jemalloc.c index f93c16fa3..5b936cb48 100644 --- a/deps/jemalloc/src/jemalloc.c +++ b/deps/jemalloc/src/jemalloc.c @@ -3324,3 +3324,14 @@ jemalloc_postfork_child(void) { } /******************************************************************************/ + +/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation. + * returns 0 if the allocation is in the currently active run, + * or when it is not causing any frag issue (large or huge bin) + * returns the bin utilization and run utilization both in fixed point 16:16. + * If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */ +JEMALLOC_EXPORT int JEMALLOC_NOTHROW +get_defrag_hint(void* ptr, int *bin_util, int *run_util) { + assert(ptr != NULL); + return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util); +} diff --git a/deps/lua/src/lua_cmsgpack.c b/deps/lua/src/lua_cmsgpack.c index 90a388f3f..892154793 100644 --- a/deps/lua/src/lua_cmsgpack.c +++ b/deps/lua/src/lua_cmsgpack.c @@ -385,6 +385,7 @@ void mp_encode_lua_table_as_array(lua_State *L, mp_buf *buf, int level) { #endif mp_encode_array(L,buf,len); + luaL_checkstack(L, 1, "in function mp_encode_lua_table_as_array"); for (j = 1; j <= len; j++) { lua_pushnumber(L,j); lua_gettable(L,-2); @@ -400,6 +401,7 @@ void mp_encode_lua_table_as_map(lua_State *L, mp_buf *buf, int level) { * Lua API, we need to iterate a first time. Note that an alternative * would be to do a single run, and then hack the buffer to insert the * map opcodes for message pack. Too hackish for this lib. */ + luaL_checkstack(L, 3, "in function mp_encode_lua_table_as_map"); lua_pushnil(L); while(lua_next(L,-2)) { lua_pop(L,1); /* remove value, keep key for next iteration. */ @@ -515,10 +517,14 @@ int mp_pack(lua_State *L) { if (nargs == 0) return luaL_argerror(L, 0, "MessagePack pack needs input."); + if (!lua_checkstack(L, nargs)) + return luaL_argerror(L, 0, "Too many arguments for MessagePack pack."); + buf = mp_buf_new(L); for(i = 1; i <= nargs; i++) { /* Copy argument i to top of stack for _encode processing; * the encode function pops it from the stack when complete. */ + luaL_checkstack(L, 1, "in function mp_check"); lua_pushvalue(L, i); mp_encode_lua_type(L,buf,0); @@ -547,6 +553,7 @@ void mp_decode_to_lua_array(lua_State *L, mp_cur *c, size_t len) { int index = 1; lua_newtable(L); + luaL_checkstack(L, 1, "in function mp_decode_to_lua_array"); while(len--) { lua_pushnumber(L,index++); mp_decode_to_lua_type(L,c); @@ -821,6 +828,9 @@ int mp_unpack_full(lua_State *L, int limit, int offset) { * subtract the entire buffer size from the unprocessed size * to get our next start offset */ int offset = len - c.left; + + luaL_checkstack(L, 1, "in function mp_unpack_full"); + /* Return offset -1 when we have have processed the entire buffer. */ lua_pushinteger(L, c.left == 0 ? -1 : offset); /* Results are returned with the arg elements still diff --git a/deps/lua/src/lua_struct.c b/deps/lua/src/lua_struct.c index a602bb430..4d5f027b8 100644 --- a/deps/lua/src/lua_struct.c +++ b/deps/lua/src/lua_struct.c @@ -1,7 +1,7 @@ /* ** {====================================================== ** Library for packing/unpacking structures. -** $Id: struct.c,v 1.4 2012/07/04 18:54:29 roberto Exp $ +** $Id: struct.c,v 1.7 2018/05/11 22:04:31 roberto Exp $ ** See Copyright Notice at the end of this file ** ======================================================= */ @@ -15,8 +15,8 @@ ** h/H - signed/unsigned short ** l/L - signed/unsigned long ** T - size_t -** i/In - signed/unsigned integer with size `n' (default is size of int) -** cn - sequence of `n' chars (from/to a string); when packing, n==0 means +** i/In - signed/unsigned integer with size 'n' (default is size of int) +** cn - sequence of 'n' chars (from/to a string); when packing, n==0 means the whole string; when unpacking, n==0 means use the previous read number as the string length ** s - zero-terminated string @@ -89,14 +89,12 @@ typedef struct Header { } Header; -static int getnum (lua_State *L, const char **fmt, int df) { +static int getnum (const char **fmt, int df) { if (!isdigit(**fmt)) /* no number? */ return df; /* return default value */ else { int a = 0; do { - if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0'))) - luaL_error(L, "integral size overflow"); a = a*10 + *((*fmt)++) - '0'; } while (isdigit(**fmt)); return a; @@ -117,9 +115,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) { case 'f': return sizeof(float); case 'd': return sizeof(double); case 'x': return 1; - case 'c': return getnum(L, fmt, 1); + case 'c': return getnum(fmt, 1); case 'i': case 'I': { - int sz = getnum(L, fmt, sizeof(int)); + int sz = getnum(fmt, sizeof(int)); if (sz > MAXINTSIZE) luaL_error(L, "integral size %d is larger than limit of %d", sz, MAXINTSIZE); @@ -152,7 +150,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt, case '>': h->endian = BIG; return; case '<': h->endian = LITTLE; return; case '!': { - int a = getnum(L, fmt, MAXALIGN); + int a = getnum(fmt, MAXALIGN); if (!isp2(a)) luaL_error(L, "alignment %d is not a power of 2", a); h->align = a; @@ -295,21 +293,26 @@ static int b_unpack (lua_State *L) { const char *fmt = luaL_checkstring(L, 1); size_t ld; const char *data = luaL_checklstring(L, 2, &ld); - size_t pos = luaL_optinteger(L, 3, 1) - 1; + size_t pos = luaL_optinteger(L, 3, 1); + luaL_argcheck(L, pos > 0, 3, "offset must be 1 or greater"); + pos--; /* Lua indexes are 1-based, but here we want 0-based for C + * pointer math. */ + int n = 0; /* number of results */ defaultoptions(&h); - lua_settop(L, 2); while (*fmt) { int opt = *fmt++; size_t size = optsize(L, opt, &fmt); pos += gettoalign(pos, &h, opt, size); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); - luaL_checkstack(L, 1, "too many results"); + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); + /* stack space for item + next position */ + luaL_checkstack(L, 2, "too many results"); switch (opt) { case 'b': case 'B': case 'h': case 'H': case 'l': case 'L': case 'T': case 'i': case 'I': { /* integer types */ int issigned = islower(opt); lua_Number res = getinteger(data+pos, h.endian, issigned, size); - lua_pushnumber(L, res); + lua_pushnumber(L, res); n++; break; } case 'x': { @@ -319,25 +322,26 @@ static int b_unpack (lua_State *L) { float f; memcpy(&f, data+pos, size); correctbytes((char *)&f, sizeof(f), h.endian); - lua_pushnumber(L, f); + lua_pushnumber(L, f); n++; break; } case 'd': { double d; memcpy(&d, data+pos, size); correctbytes((char *)&d, sizeof(d), h.endian); - lua_pushnumber(L, d); + lua_pushnumber(L, d); n++; break; } case 'c': { if (size == 0) { - if (!lua_isnumber(L, -1)) - luaL_error(L, "format `c0' needs a previous size"); + if (n == 0 || !lua_isnumber(L, -1)) + luaL_error(L, "format 'c0' needs a previous size"); size = lua_tonumber(L, -1); - lua_pop(L, 1); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); + lua_pop(L, 1); n--; + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); } - lua_pushlstring(L, data+pos, size); + lua_pushlstring(L, data+pos, size); n++; break; } case 's': { @@ -345,15 +349,15 @@ static int b_unpack (lua_State *L) { if (e == NULL) luaL_error(L, "unfinished string in data"); size = (e - (data+pos)) + 1; - lua_pushlstring(L, data+pos, size - 1); + lua_pushlstring(L, data+pos, size - 1); n++; break; } default: controloptions(L, opt, &fmt, &h); } pos += size; } - lua_pushinteger(L, pos + 1); - return lua_gettop(L) - 2; + lua_pushinteger(L, pos + 1); /* next position */ + return n + 1; } @@ -399,7 +403,7 @@ LUALIB_API int luaopen_struct (lua_State *L) { /****************************************************************************** -* Copyright (C) 2010-2012 Lua.org, PUC-Rio. All rights reserved. +* Copyright (C) 2010-2018 Lua.org, PUC-Rio. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the diff --git a/redis.conf b/redis.conf index f5b7d5fed..7c550a532 100644 --- a/redis.conf +++ b/redis.conf @@ -1106,6 +1106,17 @@ zset-max-ziplist-value 64 # composed of many HyperLogLogs with cardinality in the 0 - 15000 range. hll-sparse-max-bytes 3000 +# Streams macro node max size / items. The stream data structure is a radix +# tree of big nodes that encode multiple items inside. Using this configuration +# it is possible to configure how big a single node can be in bytes, and the +# maximum number of items it may contain before switching to a new node when +# appending new stream entries. If any of the following settings are set to +# zero, the limit is ignored, so for instance it is possible to set just a +# max entires limit by setting max-bytes to 0 and max-entries to the desired +# value. +stream-node-max-bytes 4096 +stream-node-max-entries 100 + # Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in # order to help rehashing the main Redis hash table (the one mapping top-level # keys to values). The hash table implementation Redis uses (see dict.c) @@ -1200,6 +1211,12 @@ hz 10 # big latency spikes. aof-rewrite-incremental-fsync yes +# When redis saves RDB file, if the following option is enabled +# the file will be fsync-ed every 32 MB of data generated. This is useful +# in order to commit the file to the disk more incrementally and avoid +# big latency spikes. +rdb-save-incremental-fsync yes + # Redis LFU eviction (see maxmemory setting) can be tuned. However it is a good # idea to start with the default settings and only change them after investigating # how to improve the performances and how the keys LFU change over time, which diff --git a/sentinel.conf b/sentinel.conf index 0e1b266ed..3703c7394 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -194,3 +194,31 @@ sentinel failover-timeout mymaster 180000 # # sentinel client-reconfig-script mymaster /var/redis/reconfig.sh +# SECURITY +# +# By default SENTINEL SET will not be able to change the notification-script +# and client-reconfig-script at runtime. This avoids a trivial security issue +# where clients can set the script to anything and trigger a failover in order +# to get the program executed. + +sentinel deny-scripts-reconfig yes + +# REDIS COMMANDS RENAMING +# +# Sometimes the Redis server has certain commands, that are needed for Sentinel +# to work correctly, renamed to unguessable strings. This is often the case +# of CONFIG and SLAVEOF in the context of providers that provide Redis as +# a service, and don't want the customers to reconfigure the instances outside +# of the administration console. +# +# In such case it is possible to tell Sentinel to use different command names +# instead of the normal ones. For example if the master "mymaster", and the +# associated slaves, have "CONFIG" all renamed to "GUESSME", I could use: +# +# sentinel rename-command mymaster CONFIG GUESSME +# +# After such configuration is set, every time Sentinel would use CONFIG it will +# use GUESSME instead. Note that there is no actual need to respect the command +# case, so writing "config guessme" is the same in the example above. +# +# SENTINEL SET can also be used in order to perform this configuration at runtime. diff --git a/src/aof.c b/src/aof.c index 0aa081f00..594c3f666 100644 --- a/src/aof.c +++ b/src/aof.c @@ -228,7 +228,7 @@ static void killAppendOnlyChild(void) { void stopAppendOnly(void) { serverAssert(server.aof_state != AOF_OFF); flushAppendOnlyFile(1); - aof_fsync(server.aof_fd); + redis_fsync(server.aof_fd); close(server.aof_fd); server.aof_fd = -1; @@ -476,10 +476,10 @@ void flushAppendOnlyFile(int force) { /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { - /* aof_fsync is defined as fdatasync() for Linux in order to avoid + /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); - aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ + redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; @@ -1221,7 +1221,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; size_t processed = 0; - long long now = mstime(); int j; for (j = 0; j < server.dbnum; j++) { @@ -1247,9 +1246,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { expiretime = getExpire(db,&key); - /* If this key is already expired skip it */ - if (expiretime != -1 && expiretime < now) continue; - /* Save the key and associated value */ if (o->type == OBJ_STRING) { /* Emit a SET command */ @@ -1322,7 +1318,7 @@ int rewriteAppendOnlyFile(char *filename) { rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) - rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); + rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); if (server.aof_use_rdb_preamble) { int error; @@ -1690,7 +1686,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { oldfd = server.aof_fd; server.aof_fd = newfd; if (server.aof_fsync == AOF_FSYNC_ALWAYS) - aof_fsync(newfd); + redis_fsync(newfd); else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) aof_background_fsync(newfd); server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ diff --git a/src/bio.c b/src/bio.c index da11f7b86..0c92d053b 100644 --- a/src/bio.c +++ b/src/bio.c @@ -187,7 +187,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { - aof_fsync((long)job->arg1); + redis_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. diff --git a/src/blocked.c b/src/blocked.c index 023fba0cd..e0dd56724 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -314,8 +314,9 @@ void handleClientsBlockedOnKeys(void) { if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients); + unsigned long zcard = zsetLength(o); - while(numclients--) { + while(numclients-- && zcard) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; @@ -332,6 +333,7 @@ void handleClientsBlockedOnKeys(void) { ? ZSET_MIN : ZSET_MAX; unblockClient(receiver); genericZpopCommand(receiver,&rl->key,1,where,1,NULL); + zcard--; /* Replicate the command. */ robj *argv[2]; @@ -396,12 +398,6 @@ void handleClientsBlockedOnKeys(void) { 1); } - /* Note that after we unblock the client, 'gt' - * and other receiver->bpop stuff are no longer - * valid, so we must do the setup above before - * this call. */ - unblockClient(receiver); - /* Emit the two elements sub-array consisting of * the name of the stream and the data we * extracted from it. Wrapped in a single-item @@ -417,6 +413,12 @@ void handleClientsBlockedOnKeys(void) { streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, 0, group, consumer, 0, &pi); + + /* Note that after we unblock the client, 'gt' + * and other receiver->bpop stuff are no longer + * valid, so we must do the setup above before + * this call. */ + unblockClient(receiver); } } } diff --git a/src/cluster.c b/src/cluster.c index 0635d7c07..0ee63a6cd 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2120,7 +2120,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); if (nwritten <= 0) { serverLog(LL_DEBUG,"I/O error writing to node link: %s", - strerror(errno)); + (nwritten == -1) ? strerror(errno) : "short write"); handleLinkIOError(link); return; } @@ -2377,7 +2377,7 @@ void clusterSendPing(clusterLink *link, int type) { * same time. * * Since we have non-voting slaves that lower the probability of an entry - * to feature our node, we set the number of entires per packet as + * to feature our node, we set the number of entries per packet as * 10% of the total nodes we have. */ wanted = floor(dictSize(server.cluster->nodes)/10); if (wanted < 3) wanted = 3; @@ -4178,27 +4178,27 @@ void clusterCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"addslots [slot ...] -- Assign slots to current node.", -"bumpepoch -- Advance the cluster config epoch.", -"count-failure-reports -- Return number of failure reports for .", -"countkeysinslot - Return the number of keys in .", -"delslots [slot ...] -- Delete slots information from current node.", -"failover [force|takeover] -- Promote current slave node to being a master.", -"forget -- Remove a node from the cluster.", -"getkeysinslot -- Return key names stored by current node in a slot.", -"flushslots -- Delete current node own slots information.", -"info - Return onformation about the cluster.", -"keyslot -- Return the hash slot for .", -"meet [bus-port] -- Connect nodes into a working cluster.", -"myid -- Return the node id.", -"nodes -- Return cluster configuration seen by node. Output format:", +"ADDSLOTS [slot ...] -- Assign slots to current node.", +"BUMPEPOCH -- Advance the cluster config epoch.", +"COUNT-failure-reports -- Return number of failure reports for .", +"COUNTKEYSINSLOT - Return the number of keys in .", +"DELSLOTS [slot ...] -- Delete slots information from current node.", +"FAILOVER [force|takeover] -- Promote current slave node to being a master.", +"FORGET -- Remove a node from the cluster.", +"GETKEYSINSLOT -- Return key names stored by current node in a slot.", +"FLUSHSLOTS -- Delete current node own slots information.", +"INFO - Return onformation about the cluster.", +"KEYSLOT -- Return the hash slot for .", +"MEET [bus-port] -- Connect nodes into a working cluster.", +"MYID -- Return the node id.", +"NODES -- Return cluster configuration seen by node. Output format:", " ... ", -"replicate -- Configure current node as slave to .", -"reset [hard|soft] -- Reset current node (default: soft).", -"set-config-epoch - Set config epoch of current node.", -"setslot (importing|migrating|stable|node ) -- Set slot state.", -"slaves -- Return slaves.", -"slots -- Return information about slots range mappings. Each range is made of:", +"REPLICATE -- Configure current node as slave to .", +"RESET [hard|soft] -- Reset current node (default: soft).", +"SET-config-epoch - Set config epoch of current node.", +"SETSLOT (importing|migrating|stable|node ) -- Set slot state.", +"SLAVES -- Return slaves.", +"SLOTS -- Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL }; @@ -4746,8 +4746,7 @@ NULL clusterReset(hard); addReply(c,shared.ok); } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLUSTER HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } @@ -4835,15 +4834,39 @@ void dumpCommand(client *c) { /* RESTORE key ttl serialized-value [REPLACE] */ void restoreCommand(client *c) { - long long ttl; + long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; rio payload; - int j, type, replace = 0; + int j, type, replace = 0, absttl = 0; robj *obj; /* Parse additional options */ for (j = 4; j < c->argc; j++) { + int additional = c->argc-j-1; if (!strcasecmp(c->argv[j]->ptr,"replace")) { replace = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) { + absttl = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 && + lfu_freq == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL) + != C_OK) return; + if (lru_idle < 0) { + addReplyError(c,"Invalid IDLETIME value, must be >= 0"); + return; + } + lru_clock = LRU_CLOCK(); + j++; /* Consume additional arg. */ + } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 && + lru_idle == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL) + != C_OK) return; + if (lfu_freq < 0 || lfu_freq > 255) { + addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255"); + return; + } + j++; /* Consume additional arg. */ } else { addReply(c,shared.syntaxerr); return; @@ -4884,7 +4907,11 @@ void restoreCommand(client *c) { /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); - if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl); + if (ttl) { + if (!absttl) ttl+=mstime(); + setExpire(c,c->db,c->argv[1],ttl); + } + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; @@ -5589,7 +5616,11 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * longer handles, the client is sent a redirection error, and the function * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { - if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_LIST) { + if (c->flags & CLIENT_BLOCKED && + (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM)) + { dictEntry *de; dictIterator *di; diff --git a/src/config.c b/src/config.c index 5bc6aa2ed..aedef4888 100644 --- a/src/config.c +++ b/src/config.c @@ -390,7 +390,7 @@ void loadServerConfigFromString(char *config) { } } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { zfree(server.masterauth); - server.masterauth = zstrdup(argv[1]); + server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) { if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -431,6 +431,11 @@ void loadServerConfigFromString(char *config) { if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + if (server.active_defrag_enabled) { +#ifndef HAVE_DEFRAG + err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr; +#endif + } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -483,6 +488,13 @@ void loadServerConfigFromString(char *config) { yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"rdb-save-incremental-fsync") && + argc == 2) + { + if ((server.rdb_save_incremental_fsync = + yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"aof-load-truncated") && argc == 2) { if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -496,7 +508,7 @@ void loadServerConfigFromString(char *config) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; goto loaderr; } - server.requirepass = zstrdup(argv[1]); + server.requirepass = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) { zfree(server.pidfile); server.pidfile = zstrdup(argv[1]); @@ -509,14 +521,16 @@ void loadServerConfigFromString(char *config) { server.rdb_filename = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) { server.active_defrag_threshold_lower = atoi(argv[1]); - if (server.active_defrag_threshold_lower < 0) { - err = "active-defrag-threshold-lower must be 0 or greater"; + if (server.active_defrag_threshold_lower < 0 || + server.active_defrag_threshold_lower > 1000) { + err = "active-defrag-threshold-lower must be between 0 and 1000"; goto loaderr; } } else if (!strcasecmp(argv[0],"active-defrag-threshold-upper") && argc == 2) { server.active_defrag_threshold_upper = atoi(argv[1]); - if (server.active_defrag_threshold_upper < 0) { - err = "active-defrag-threshold-upper must be 0 or greater"; + if (server.active_defrag_threshold_upper < 0 || + server.active_defrag_threshold_upper > 1000) { + err = "active-defrag-threshold-upper must be between 0 and 1000"; goto loaderr; } } else if (!strcasecmp(argv[0],"active-defrag-ignore-bytes") && argc == 2) { @@ -547,6 +561,10 @@ void loadServerConfigFromString(char *config) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { server.hash_max_ziplist_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"stream-node-max-bytes") && argc == 2) { + server.stream_node_max_bytes = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"stream-node-max-entries") && argc == 2) { + server.stream_node_max_entries = atoi(argv[1]); } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ /* DEAD OPTION */ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) { @@ -1015,6 +1033,8 @@ void configSetCommand(client *c) { "cluster-slave-no-failover",server.cluster_slave_no_failover) { } config_set_bool_field( "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) { + } config_set_bool_field( + "rdb-save-incremental-fsync",server.rdb_save_incremental_fsync) { } config_set_bool_field( "aof-load-truncated",server.aof_load_truncated) { } config_set_bool_field( @@ -1056,15 +1076,15 @@ void configSetCommand(client *c) { /* Numerical fields. * config_set_numerical_field(name,var,min,max) */ } config_set_numerical_field( - "tcp-keepalive",server.tcpkeepalive,0,LLONG_MAX) { + "tcp-keepalive",server.tcpkeepalive,0,INT_MAX) { } config_set_numerical_field( - "maxmemory-samples",server.maxmemory_samples,1,LLONG_MAX) { + "maxmemory-samples",server.maxmemory_samples,1,INT_MAX) { } config_set_numerical_field( - "lfu-log-factor",server.lfu_log_factor,0,LLONG_MAX) { + "lfu-log-factor",server.lfu_log_factor,0,INT_MAX) { } config_set_numerical_field( - "lfu-decay-time",server.lfu_decay_time,0,LLONG_MAX) { + "lfu-decay-time",server.lfu_decay_time,0,INT_MAX) { } config_set_numerical_field( - "timeout",server.maxidletime,0,LONG_MAX) { + "timeout",server.maxidletime,0,INT_MAX) { } config_set_numerical_field( "active-defrag-threshold-lower",server.active_defrag_threshold_lower,0,1000) { } config_set_numerical_field( @@ -1076,52 +1096,56 @@ void configSetCommand(client *c) { } config_set_numerical_field( "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( - "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LLONG_MAX) { + "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) { } config_set_numerical_field( - "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){ + "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){ } config_set_numerical_field( - "hash-max-ziplist-entries",server.hash_max_ziplist_entries,0,LLONG_MAX) { + "hash-max-ziplist-entries",server.hash_max_ziplist_entries,0,LONG_MAX) { } config_set_numerical_field( - "hash-max-ziplist-value",server.hash_max_ziplist_value,0,LLONG_MAX) { + "hash-max-ziplist-value",server.hash_max_ziplist_value,0,LONG_MAX) { + } config_set_numerical_field( + "stream-node-max-bytes",server.stream_node_max_bytes,0,LONG_MAX) { + } config_set_numerical_field( + "stream-node-max-entries",server.stream_node_max_entries,0,LLONG_MAX) { } config_set_numerical_field( "list-max-ziplist-size",server.list_max_ziplist_size,INT_MIN,INT_MAX) { } config_set_numerical_field( "list-compress-depth",server.list_compress_depth,0,INT_MAX) { } config_set_numerical_field( - "set-max-intset-entries",server.set_max_intset_entries,0,LLONG_MAX) { + "set-max-intset-entries",server.set_max_intset_entries,0,LONG_MAX) { } config_set_numerical_field( - "zset-max-ziplist-entries",server.zset_max_ziplist_entries,0,LLONG_MAX) { + "zset-max-ziplist-entries",server.zset_max_ziplist_entries,0,LONG_MAX) { } config_set_numerical_field( - "zset-max-ziplist-value",server.zset_max_ziplist_value,0,LLONG_MAX) { + "zset-max-ziplist-value",server.zset_max_ziplist_value,0,LONG_MAX) { } config_set_numerical_field( - "hll-sparse-max-bytes",server.hll_sparse_max_bytes,0,LLONG_MAX) { + "hll-sparse-max-bytes",server.hll_sparse_max_bytes,0,LONG_MAX) { } config_set_numerical_field( - "lua-time-limit",server.lua_time_limit,0,LLONG_MAX) { + "lua-time-limit",server.lua_time_limit,0,LONG_MAX) { } config_set_numerical_field( "slowlog-log-slower-than",server.slowlog_log_slower_than,0,LLONG_MAX) { } config_set_numerical_field( - "slowlog-max-len",ll,0,LLONG_MAX) { + "slowlog-max-len",ll,0,LONG_MAX) { /* Cast to unsigned. */ - server.slowlog_max_len = (unsigned)ll; + server.slowlog_max_len = (unsigned long)ll; } config_set_numerical_field( "latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){ } config_set_numerical_field( - "repl-ping-slave-period",server.repl_ping_slave_period,1,LLONG_MAX) { + "repl-ping-slave-period",server.repl_ping_slave_period,1,INT_MAX) { } config_set_numerical_field( - "repl-timeout",server.repl_timeout,1,LLONG_MAX) { + "repl-timeout",server.repl_timeout,1,INT_MAX) { } config_set_numerical_field( - "repl-backlog-ttl",server.repl_backlog_time_limit,0,LLONG_MAX) { + "repl-backlog-ttl",server.repl_backlog_time_limit,0,LONG_MAX) { } config_set_numerical_field( - "repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,LLONG_MAX) { + "repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,INT_MAX) { } config_set_numerical_field( - "slave-priority",server.slave_priority,0,LLONG_MAX) { + "slave-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( - "min-slaves-to-write",server.repl_min_slaves_to_write,0,LLONG_MAX) { + "min-slaves-to-write",server.repl_min_slaves_to_write,0,INT_MAX) { refreshGoodSlavesCount(); } config_set_numerical_field( - "min-slaves-max-lag",server.repl_min_slaves_max_lag,0,LLONG_MAX) { + "min-slaves-max-lag",server.repl_min_slaves_max_lag,0,INT_MAX) { refreshGoodSlavesCount(); } config_set_numerical_field( "cluster-node-timeout",server.cluster_node_timeout,0,LLONG_MAX) { @@ -1130,17 +1154,17 @@ void configSetCommand(client *c) { } config_set_numerical_field( "cluster-announce-bus-port",server.cluster_announce_bus_port,0,65535) { } config_set_numerical_field( - "cluster-migration-barrier",server.cluster_migration_barrier,0,LLONG_MAX){ + "cluster-migration-barrier",server.cluster_migration_barrier,0,INT_MAX){ } config_set_numerical_field( - "cluster-slave-validity-factor",server.cluster_slave_validity_factor,0,LLONG_MAX) { + "cluster-slave-validity-factor",server.cluster_slave_validity_factor,0,INT_MAX) { } config_set_numerical_field( - "hz",server.hz,0,LLONG_MAX) { + "hz",server.hz,0,INT_MAX) { /* Hz is more an hint from the user, so we accept values out of range * but cap them to reasonable values. */ if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ; if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ; } config_set_numerical_field( - "watchdog-period",ll,0,LLONG_MAX) { + "watchdog-period",ll,0,INT_MAX) { if (ll) enableWatchdog(ll); else @@ -1267,6 +1291,10 @@ void configGetCommand(client *c) { server.hash_max_ziplist_entries); config_get_numerical_field("hash-max-ziplist-value", server.hash_max_ziplist_value); + config_get_numerical_field("stream-node-max-bytes", + server.stream_node_max_bytes); + config_get_numerical_field("stream-node-max-entries", + server.stream_node_max_entries); config_get_numerical_field("list-max-ziplist-size", server.list_max_ziplist_size); config_get_numerical_field("list-compress-depth", @@ -1333,6 +1361,8 @@ void configGetCommand(client *c) { server.repl_diskless_sync); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); + config_get_bool_field("rdb-save-incremental-fsync", + server.rdb_save_incremental_fsync); config_get_bool_field("aof-load-truncated", server.aof_load_truncated); config_get_bool_field("aof-use-rdb-preamble", @@ -2056,6 +2086,8 @@ int rewriteConfig(char *path) { rewriteConfigNotifykeyspaceeventsOption(state); rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE); + rewriteConfigNumericalOption(state,"stream-node-max-bytes",server.stream_node_max_bytes,OBJ_STREAM_NODE_MAX_BYTES); + rewriteConfigNumericalOption(state,"stream-node-max-entries",server.stream_node_max_entries,OBJ_STREAM_NODE_MAX_ENTRIES); rewriteConfigNumericalOption(state,"list-max-ziplist-size",server.list_max_ziplist_size,OBJ_LIST_MAX_ZIPLIST_SIZE); rewriteConfigNumericalOption(state,"list-compress-depth",server.list_compress_depth,OBJ_LIST_COMPRESS_DEPTH); rewriteConfigNumericalOption(state,"set-max-intset-entries",server.set_max_intset_entries,OBJ_SET_MAX_INTSET_ENTRIES); @@ -2068,6 +2100,7 @@ int rewriteConfig(char *path) { rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); + rewriteConfigYesNoOption(state,"rdb-save-incremental-fsync",server.rdb_save_incremental_fsync,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC); rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); @@ -2107,10 +2140,10 @@ void configCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"get -- Return parameters matching the glob-like and their values.", -"set -- Set parameter to value.", -"resetstat -- Reset statistics reported by INFO.", -"rewrite -- Rewrite the configuration file.", +"GET -- Return parameters matching the glob-like and their values.", +"SET -- Set parameter to value.", +"RESETSTAT -- Reset statistics reported by INFO.", +"REWRITE -- Rewrite the configuration file.", NULL }; addReplyHelp(c, help); @@ -2135,8 +2168,7 @@ NULL addReply(c,shared.ok); } } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CONFIG HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } diff --git a/src/config.h b/src/config.h index c23f1c789..ee3ad508e 100644 --- a/src/config.h +++ b/src/config.h @@ -87,11 +87,11 @@ #endif #endif -/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */ +/* Define redis_fsync to fdatasync() in Linux and fsync() for all the rest */ #ifdef __linux__ -#define aof_fsync fdatasync +#define redis_fsync fdatasync #else -#define aof_fsync fsync +#define redis_fsync fsync #endif /* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use diff --git a/src/db.c b/src/db.c index 6c039e129..151e948c9 100644 --- a/src/db.c +++ b/src/db.c @@ -90,7 +90,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * LOOKUP_NONE (or zero): no special flags are passed. * LOOKUP_NOTOUCH: don't alter the last access time of the key. * - * Note: this function also returns NULL is the key is logically expired + * Note: this function also returns NULL if the key is logically expired * but still existing, in case this is a slave, since this API is called only * for read operations. Even if the key expiry is master-driven, we can * correctly report a key is expired on slaves even if the master is lagging @@ -223,6 +223,8 @@ int dbExists(redisDb *db, robj *key) { * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { dictEntry *de; + int maxtries = 100; + int allvolatile = dictSize(db->dict) == dictSize(db->expires); while(1) { sds key; @@ -234,6 +236,17 @@ robj *dbRandomKey(redisDb *db) { key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); if (dictFind(db->expires,key)) { + if (allvolatile && server.masterhost && --maxtries == 0) { + /* If the DB is composed only of keys with an expire set, + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ + return keyobj; + } if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); continue; /* search for another key. This expired. */ @@ -467,8 +480,7 @@ void existsCommand(client *c) { int j; for (j = 1; j < c->argc; j++) { - expireIfNeeded(c->db,c->argv[j]); - if (dbExists(c->db,c->argv[j])) count++; + if (lookupKeyRead(c->db,c->argv[j])) count++; } addReplyLongLong(c,count); } @@ -942,16 +954,18 @@ void moveCommand(client *c) { } /* Helper function for dbSwapDatabases(): scans the list of keys that have - * one or more blocked clients for B[LR]POP or other list blocking commands - * and signal the keys are ready if they are lists. See the comment where - * the function is used for more info. */ + * one or more blocked clients for B[LR]POP or other blocking commands + * and signal the keys as ready if they are of the right type. See the comment + * where the function is used for more info. */ void scanDatabaseForReadyLists(redisDb *db) { dictEntry *de; dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); - if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM)) + if (value && (value->type == OBJ_LIST || + value->type == OBJ_STREAM || + value->type == OBJ_ZSET)) signalKeyAsReady(db, key); } dictReleaseIterator(di); @@ -1226,7 +1240,7 @@ int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *nu num = atoi(argv[2]->ptr); /* Sanity check. Don't return any key if the command is going to * reply with syntax error. */ - if (num > (argc-3)) { + if (num < 1 || num > (argc-3)) { *numkeys = 0; return NULL; } @@ -1255,7 +1269,7 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) num = atoi(argv[2]->ptr); /* Sanity check. Don't return any key if the command is going to * reply with syntax error. */ - if (num > (argc-3)) { + if (num <= 0 || num > (argc-3)) { *numkeys = 0; return NULL; } @@ -1384,23 +1398,37 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk } /* XREAD [BLOCK ] [COUNT ] [GROUP ] - * [RETRY ] STREAMS key_1 key_2 ... key_N - * ID_1 ID_2 ... ID_N */ + * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { - int i, num, *keys; + int i, num = 0, *keys; UNUSED(cmd); - /* We need to seek the last argument that contains "STREAMS", because other - * arguments before may contain it (for example the group name). */ + /* We need to parse the options of the command in order to seek the first + * "STREAMS" string which is actually the option. This is needed because + * "STREAMS" could also be the name of the consumer group and even the + * name of the stream key. */ int streams_pos = -1; for (i = 1; i < argc; i++) { char *arg = argv[i]->ptr; - if (!strcasecmp(arg, "streams")) streams_pos = i; + if (!strcasecmp(arg, "block")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "count")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "group")) { + i += 2; /* Skip option argument. */ + } else if (!strcasecmp(arg, "noack")) { + /* Nothing to do. */ + } else if (!strcasecmp(arg, "streams")) { + streams_pos = i; + break; + } else { + break; /* Syntax error. */ + } } if (streams_pos != -1) num = argc - streams_pos - 1; /* Syntax error. */ - if (streams_pos == -1 || num % 2 != 0) { + if (streams_pos == -1 || num == 0 || num % 2 != 0) { *numkeys = 0; return NULL; } @@ -1408,7 +1436,7 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) there are also the IDs, one per key. */ keys = zmalloc(sizeof(int) * num); - for (i = streams_pos+1; i < argc; i++) keys[i-streams_pos-1] = i; + for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i; *numkeys = num; return keys; } diff --git a/src/debug.c b/src/debug.c index f239eea5a..32be3c59c 100644 --- a/src/debug.c +++ b/src/debug.c @@ -285,25 +285,26 @@ void computeDatasetDigest(unsigned char *final) { void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"assert -- Crash by assertion failed.", -"change-repl-id -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", -"crash-and-recovery -- Hard crash and restart after delay.", -"digest -- Outputs an hex signature representing the current DB content.", -"htstats -- Return hash table statistics of the specified Redis database.", -"loadaof -- Flush the AOF buffers on disk and reload the AOF in memory.", -"lua-always-replicate-commands (0|1) -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", -"object -- Show low level info about key and associated value.", -"panic -- Crash the server simulating a panic.", -"populate [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", -"reload -- Save the RDB on disk and reload it back in memory.", -"restart -- Graceful restart: save config, db, restart.", -"sdslen -- Show low level SDS string info representing key and value.", -"segfault -- Crash the server with sigsegv.", -"set-active-expire (0|1) -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", -"sleep -- Stop the server for . Decimals allowed.", -"structsize -- Return the size of different Redis core C structures.", -"ziplist -- Show low level info about the ziplist encoding.", -"error -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", +"ASSERT -- Crash by assertion failed.", +"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", +"CRASH-AND-RECOVER -- Hard crash and restart after delay.", +"DIGEST -- Output a hex signature representing the current DB content.", +"ERROR -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", +"HTSTATS -- Return hash table statistics of the specified Redis database.", +"HTSTATS-KEY -- Like htstats but for the hash table stored as key's value.", +"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", +"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", +"OBJECT -- Show low level info about key and associated value.", +"PANIC -- Crash the server simulating a panic.", +"POPULATE [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", +"RELOAD -- Save the RDB on disk and reload it back in memory.", +"RESTART -- Graceful restart: save config, db, restart.", +"SDSLEN -- Show low level SDS string info representing key and value.", +"SEGFAULT -- Crash the server with sigsegv.", +"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", +"SLEEP -- Stop the server for . Decimals allowed.", +"STRUCTSIZE -- Return the size of different Redis core C structures.", +"ZIPLIST -- Show low level info about the ziplist encoding.", NULL }; addReplyHelp(c, help); @@ -347,7 +348,7 @@ NULL serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) { - if (server.aof_state == AOF_ON) flushAppendOnlyFile(1); + if (server.aof_state != AOF_OFF) flushAppendOnlyFile(1); emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); if (loadAppendOnlyFile(server.aof_filename) != C_OK) { addReply(c,shared.err); @@ -547,14 +548,41 @@ NULL stats = sdscat(stats,buf); addReplyBulkSds(c,stats); + } else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) { + robj *o; + dict *ht = NULL; + + if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) + == NULL) return; + + /* Get the hash table reference from the object, if possible. */ + switch (o->encoding) { + case OBJ_ENCODING_SKIPLIST: + { + zset *zs = o->ptr; + ht = zs->dict; + } + break; + case OBJ_ENCODING_HT: + ht = o->ptr; + break; + } + + if (ht == NULL) { + addReplyError(c,"The value stored at the specified key is not " + "represented using an hash table"); + } else { + char buf[4096]; + dictGetStats(buf,sizeof(buf),ht); + addReplyBulkCString(c,buf); + } } else if (!strcasecmp(c->argv[1]->ptr,"change-repl-id") && c->argc == 2) { serverLog(LL_WARNING,"Changing replication IDs after receiving DEBUG change-repl-id"); changeReplicationId(); clearReplicationId2(); addReply(c,shared.ok); } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try DEBUG HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } @@ -1048,7 +1076,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) { infostring = genRedisInfoString("all"); serverLogRaw(LL_WARNING|LL_RAW, infostring); serverLogRaw(LL_WARNING|LL_RAW, "\n------ CLIENT LIST OUTPUT ------\n"); - clients = getAllClientsInfoString(); + clients = getAllClientsInfoString(-1); serverLogRaw(LL_WARNING|LL_RAW, clients); sdsfree(infostring); sdsfree(clients); diff --git a/src/defrag.c b/src/defrag.c index aae72adcb..b25fceb1e 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) { return defragged; } +/* Defrag callback for radix tree iterator, called for each node, + * used in order to defrag the nodes allocations. */ +int defragRaxNode(raxNode **noderef) { + raxNode *newnode = activeDefragAlloc(*noderef); + if (newnode) { + *noderef = newnode; + return 1; + } + return 0; +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { + static unsigned char last[sizeof(streamID)]; + raxIterator ri; + long iterations = 0; + if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) { + *cursor = 0; + return 0; + } + + stream *s = ob->ptr; + raxStart(&ri,s->rax); + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + defragRaxNode(&s->rax->head); + /* assign the iterator node callback before the seek, so that the + * initial nodes that are processed till the first item are covered */ + ri.node_cb = defragRaxNode; + raxSeek(&ri,"^",NULL,0); + } else { + /* if cursor is non-zero, we seek to the static 'last' */ + if (!raxSeek(&ri,">", last, sizeof(last))) { + *cursor = 0; + return 0; + } + /* assign the iterator node callback after the seek, so that the + * initial nodes that are processed till now aren't covered */ + ri.node_cb = defragRaxNode; + } + + (*cursor)++; + while (raxNext(&ri)) { + void *newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), (*defragged)++; + if (++iterations > 16) { + if (ustime() > endtime) { + serverAssert(ri.key_len==sizeof(last)); + memcpy(last,ri.key,ri.key_len); + raxStop(&ri); + return 1; + } + iterations = 0; + } + } + raxStop(&ri); + *cursor = 0; + return 0; +} + +/* optional callback used defrag each rax element (not including the element pointer itself) */ +typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); + +/* defrag radix tree including: + * 1) rax struct + * 2) rax nodes + * 3) rax entry data (only if defrag_data is specified) + * 4) call a callback per element, and allow the callback to return a new pointer for the element */ +long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { + long defragged = 0; + raxIterator ri; + rax* rax; + if ((rax = activeDefragAlloc(*raxref))) + defragged++, *raxref = rax; + rax = *raxref; + raxStart(&ri,rax); + ri.node_cb = defragRaxNode; + defragRaxNode(&rax->head); + raxSeek(&ri,"^",NULL,0); + while (raxNext(&ri)) { + void *newdata = NULL; + if (element_cb) + newdata = element_cb(&ri, element_cb_data, &defragged); + if (defrag_data && !newdata) + newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), defragged++; + } + raxStop(&ri); + return defragged; +} + +typedef struct { + streamCG *cg; + streamConsumer *c; +} PendingEntryContext; + +void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { + UNUSED(defragged); + PendingEntryContext *ctx = privdata; + streamNACK *nack = ri->data, *newnack; + nack->consumer = ctx->c; /* update nack pointer to consumer */ + newnack = activeDefragAlloc(nack); + if (newnack) { + /* update consumer group pointer to the nack */ + void *prev; + raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); + serverAssert(prev==nack); + /* note: we don't increment 'defragged' that's done by the caller */ + } + return newnack; +} + +void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { + streamConsumer *c = ri->data; + streamCG *cg = privdata; + void *newc = activeDefragAlloc(c); + if (newc) { + /* note: we don't increment 'defragged' that's done by the caller */ + c = newc; + } + sds newsds = activeDefragSds(c->name); + if (newsds) + (*defragged)++, c->name = newsds; + if (c->pel) { + PendingEntryContext pel_ctx = {cg, c}; + *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); + } + return newc; /* returns NULL if c was not defragged */ +} + +void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { + streamCG *cg = ri->data; + UNUSED(privdata); + if (cg->consumers) + *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); + if (cg->pel) + *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL); + return NULL; +} + +long defragStream(redisDb *db, dictEntry *kde) { + long defragged = 0; + robj *ob = dictGetVal(kde); + serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); + stream *s = ob->ptr, *news; + + /* handle the main struct */ + if ((news = activeDefragAlloc(s))) + defragged++, ob->ptr = s = news; + + if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { + rax *newrax = activeDefragAlloc(s->rax); + if (newrax) + defragged++, s->rax = newrax; + defragLater(db, kde); + } else + defragged += defragRadixTree(&s->rax, 1, NULL, NULL); + + if (s->cgroups) + defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); + return defragged; +} + /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ @@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown hash encoding"); } + } else if (ob->type == OBJ_STREAM) { + defragged += defragStream(db, de); } else if (ob->type == OBJ_MODULE) { /* Currently defragmenting modules private data types * is not supported. */ @@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { server.stat_active_defrag_scanned++; } -/* Defrag scan callback for for each hash table bicket, +/* Defrag scan callback for each hash table bicket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ @@ -728,27 +895,29 @@ long defragOtherGlobals() { return defragged; } -unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) { - long defragged = 0; +/* returns 0 more work may or may not be needed (see non-zero cursor), + * and 1 if time is up and more work is needed. */ +int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - defragged += scanLaterList(ob); - cursor = 0; /* list has no scan, we must finish it in one go */ + server.stat_active_defrag_hits += scanLaterList(ob); + *cursor = 0; /* list has no scan, we must finish it in one go */ } else if (ob->type == OBJ_SET) { - defragged += scanLaterSet(ob, &cursor); + server.stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { - defragged += scanLaterZset(ob, &cursor); + server.stat_active_defrag_hits += scanLaterZset(ob, cursor); } else if (ob->type == OBJ_HASH) { - defragged += scanLaterHash(ob, &cursor); + server.stat_active_defrag_hits += scanLaterHash(ob, cursor); + } else if (ob->type == OBJ_STREAM) { + return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); } else { - cursor = 0; /* object type may have changed since we schedule it for later */ + *cursor = 0; /* object type may have changed since we schedule it for later */ } } else { - cursor = 0; /* object may have been deleted already */ + *cursor = 0; /* object may have been deleted already */ } - server.stat_active_defrag_hits += defragged; - return cursor; + return 0; } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ @@ -788,17 +957,22 @@ int defragLaterStep(redisDb *db, long long endtime) { dictEntry *de = dictFind(db->dict, current_key); key_defragged = server.stat_active_defrag_hits; do { - cursor = defragLaterItem(de, cursor); + int quit = 0; + if (defragLaterItem(de, &cursor, endtime)) + quit = 1; /* time is up, we didn't finish all the work */ + + /* Don't start a new BIG key in this loop, this is because the + * next key can be a list, and scanLaterList must be done in once cycle */ + if (!cursor) + quit = 1; /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!cursor || (++iterations > 16 || + * check if we reached the time limit. */ + if (quit || (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (!cursor || ustime() > endtime) { + if (quit || ustime() > endtime) { if(key_defragged != server.stat_active_defrag_hits) server.stat_active_defrag_key_hits++; else diff --git a/src/dict.c b/src/dict.c index 97e636805..c52f5bf8d 100644 --- a/src/dict.c +++ b/src/dict.c @@ -146,14 +146,14 @@ int dictResize(dict *d) /* Expand or create the hash table */ int dictExpand(dict *d, unsigned long size) { - dictht n; /* the new hash table */ - unsigned long realsize = _dictNextPower(size); - /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; + dictht n; /* the new hash table */ + unsigned long realsize = _dictNextPower(size); + /* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR; @@ -858,6 +858,15 @@ unsigned long dictScan(dict *d, de = next; } + /* Set unmasked bits so incrementing the reversed cursor + * operates on the masked bits */ + v |= ~m0; + + /* Increment the reverse cursor */ + v = rev(v); + v++; + v = rev(v); + } else { t0 = &d->ht[0]; t1 = &d->ht[1]; @@ -892,22 +901,16 @@ unsigned long dictScan(dict *d, de = next; } - /* Increment bits not covered by the smaller mask */ - v = (((v | m0) + 1) & ~m0) | (v & m0); + /* Increment the reverse cursor not covered by the smaller mask.*/ + v |= ~m1; + v = rev(v); + v++; + v = rev(v); /* Continue while bits covered by mask difference is non-zero */ } while (v & (m0 ^ m1)); } - /* Set unmasked bits so incrementing the reversed cursor - * operates on the masked bits of the smaller table */ - v |= ~m0; - - /* Increment the reverse cursor */ - v = rev(v); - v++; - v = rev(v); - return v; } diff --git a/src/endianconv.h b/src/endianconv.h index 08f553136..d0c8e5b38 100644 --- a/src/endianconv.h +++ b/src/endianconv.h @@ -46,9 +46,9 @@ uint64_t intrev64(uint64_t v); /* variants of the function doing the actual convertion only if the target * host is big endian */ #if (BYTE_ORDER == LITTLE_ENDIAN) -#define memrev16ifbe(p) -#define memrev32ifbe(p) -#define memrev64ifbe(p) +#define memrev16ifbe(p) ((void)(0)) +#define memrev32ifbe(p) ((void)(0)) +#define memrev64ifbe(p) ((void)(0)) #define intrev16ifbe(v) (v) #define intrev32ifbe(v) (v) #define intrev64ifbe(v) (v) diff --git a/src/expire.c b/src/expire.c index ce7882e4c..0b92ee3fe 100644 --- a/src/expire.c +++ b/src/expire.c @@ -112,7 +112,7 @@ void activeExpireCycle(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limt. Also don't repeat a fast cycle for the same period + * for time limit. Also don't repeat a fast cycle for the same period * as the fast cycle total duration itself. */ if (!timelimit_exit) return; if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; diff --git a/src/geohash.c b/src/geohash.c index 1ae7a7e05..b40282e76 100644 --- a/src/geohash.c +++ b/src/geohash.c @@ -144,8 +144,8 @@ int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range, (longitude - long_range->min) / (long_range->max - long_range->min); /* convert to fixed point based on the step size */ - lat_offset *= (1 << step); - long_offset *= (1 << step); + lat_offset *= (1ULL << step); + long_offset *= (1ULL << step); hash->bits = interleave64(lat_offset, long_offset); return 1; } diff --git a/src/help.h b/src/help.h index 5f927c303..c89f1f44b 100644 --- a/src/help.h +++ b/src/help.h @@ -1,4 +1,4 @@ -/* Automatically generated by utils/generate-command-help.rb, do not edit. */ +/* Automatically generated by generate-command-help.rb, do not edit. */ #ifndef __REDIS_HELP_H #define __REDIS_HELP_H @@ -17,7 +17,8 @@ static char *commandGroups[] = { "scripting", "hyperloglog", "cluster", - "geo" + "geo", + "stream" }; struct commandHelp { @@ -82,6 +83,16 @@ struct commandHelp { "Pop a value from a list, push it to another list and return it; or block until one is available", 2, "2.2.0" }, + { "BZPOPMAX", + "key [key ...] timeout", + "Remove and return the member with the highest score from one or more sorted sets, or block until one is available", + 4, + "5.0.0" }, + { "BZPOPMIN", + "key [key ...] timeout", + "Remove and return the member with the lowest score from one or more sorted sets, or block until one is available", + 4, + "5.0.0" }, { "CLIENT GETNAME", "-", "Get the current connection name", @@ -318,12 +329,12 @@ struct commandHelp { 0, "1.2.0" }, { "FLUSHALL", - "-", + "[ASYNC]", "Remove all keys from all databases", 9, "1.0.0" }, { "FLUSHDB", - "-", + "[ASYNC]", "Remove all keys from the current database", 9, "1.0.0" }, @@ -532,6 +543,36 @@ struct commandHelp { "Trim a list to the specified range", 2, "1.0.0" }, + { "MEMORY DOCTOR", + "-", + "Outputs memory problems report", + 9, + "4.0.0" }, + { "MEMORY HELP", + "-", + "Show helpful text about the different subcommands", + 9, + "4.0.0" }, + { "MEMORY MALLOC-STATS", + "-", + "Show allocator internal stats", + 9, + "4.0.0" }, + { "MEMORY PURGE", + "-", + "Ask the allocator to release memory", + 9, + "4.0.0" }, + { "MEMORY STATS", + "-", + "Show memory usage details", + 9, + "4.0.0" }, + { "MEMORY USAGE", + "key [SAMPLES count]", + "Estimate the memory usage of a key", + 9, + "4.0.0" }, { "MGET", "key [key ...]", "Get the values of all the given keys", @@ -723,7 +764,7 @@ struct commandHelp { 10, "3.2.0" }, { "SCRIPT EXISTS", - "script [script ...]", + "sha1 [sha1 ...]", "Check existence of scripts in the script cache.", 10, "2.6.0" }, @@ -758,7 +799,7 @@ struct commandHelp { 8, "1.0.0" }, { "SET", - "key value [EX seconds] [PX milliseconds] [NX|XX]", + "key value [expiration EX seconds|PX milliseconds] [NX|XX]", "Set the string value of a key", 1, "1.0.0" }, @@ -867,6 +908,11 @@ struct commandHelp { "Add multiple sets and store the resulting set in a key", 3, "1.0.0" }, + { "SWAPDB", + "index index", + "Swaps two Redis databases", + 8, + "4.0.0" }, { "SYNC", "-", "Internal command used for replication", @@ -877,6 +923,11 @@ struct commandHelp { "Return the current server time", 9, "2.6.0" }, + { "TOUCH", + "key [key ...]", + "Alters the last access time of a key(s). Returns the number of existing keys specified.", + 0, + "3.2.1" }, { "TTL", "key", "Get the time to live for a key", @@ -887,6 +938,11 @@ struct commandHelp { "Determine the type stored at key", 0, "1.0.0" }, + { "UNLINK", + "key [key ...]", + "Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.", + 0, + "4.0.0" }, { "UNSUBSCRIBE", "[channel [channel ...]]", "Stop listening for messages posted to the given channels", @@ -907,6 +963,41 @@ struct commandHelp { "Watch the given keys to determine execution of the MULTI/EXEC block", 7, "2.2.0" }, + { "XADD", + "key ID field string [field string ...]", + "Appends a new entry to a stream", + 14, + "5.0.0" }, + { "XLEN", + "key", + "Return the number of entires in a stream", + 14, + "5.0.0" }, + { "XPENDING", + "key group [start end count] [consumer]", + "Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.", + 14, + "5.0.0" }, + { "XRANGE", + "key start end [COUNT count]", + "Return a range of elements in a stream, with IDs matching the specified IDs interval", + 14, + "5.0.0" }, + { "XREAD", + "[COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]", + "Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.", + 14, + "5.0.0" }, + { "XREADGROUP", + "GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]", + "Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.", + 14, + "5.0.0" }, + { "XREVRANGE", + "key end start [COUNT count]", + "Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE", + 14, + "5.0.0" }, { "ZADD", "key [NX|XX] [CH] [INCR] score member [score member ...]", "Add one or more members to a sorted set, or update its score if it already exists", @@ -937,6 +1028,16 @@ struct commandHelp { "Count the number of members in a sorted set between a given lexicographical range", 4, "2.8.9" }, + { "ZPOPMAX", + "key [count]", + "Remove and return members with the highest scores in a sorted set", + 4, + "5.0.0" }, + { "ZPOPMIN", + "key [count]", + "Remove and return members with the lowest scores in a sorted set", + 4, + "5.0.0" }, { "ZRANGE", "key start stop [WITHSCORES]", "Return a range of members in a sorted set, by index", diff --git a/src/hyperloglog.c b/src/hyperloglog.c index 0670c1cf5..01a409bb5 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -429,14 +429,14 @@ uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) { } switch(len & 7) { - case 7: h ^= (uint64_t)data[6] << 48; - case 6: h ^= (uint64_t)data[5] << 40; - case 5: h ^= (uint64_t)data[4] << 32; - case 4: h ^= (uint64_t)data[3] << 24; - case 3: h ^= (uint64_t)data[2] << 16; - case 2: h ^= (uint64_t)data[1] << 8; + case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */ case 1: h ^= (uint64_t)data[0]; - h *= m; + h *= m; /* fall-thru */ }; h ^= h >> r; diff --git a/src/lzf_d.c b/src/lzf_d.c index c32be8e87..93f43c27c 100644 --- a/src/lzf_d.c +++ b/src/lzf_d.c @@ -86,6 +86,8 @@ lzf_decompress (const void *const in_data, unsigned int in_len, #ifdef lzf_movsb lzf_movsb (op, ip, ctrl); #else +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wimplicit-fallthrough" switch (ctrl) { case 32: *op++ = *ip++; case 31: *op++ = *ip++; case 30: *op++ = *ip++; case 29: *op++ = *ip++; @@ -97,6 +99,7 @@ lzf_decompress (const void *const in_data, unsigned int in_len, case 8: *op++ = *ip++; case 7: *op++ = *ip++; case 6: *op++ = *ip++; case 5: *op++ = *ip++; case 4: *op++ = *ip++; case 3: *op++ = *ip++; case 2: *op++ = *ip++; case 1: *op++ = *ip++; } +#pragma GCC diagnostic pop #endif } else /* back reference */ @@ -163,17 +166,17 @@ lzf_decompress (const void *const in_data, unsigned int in_len, break; - case 9: *op++ = *ref++; - case 8: *op++ = *ref++; - case 7: *op++ = *ref++; - case 6: *op++ = *ref++; - case 5: *op++ = *ref++; - case 4: *op++ = *ref++; - case 3: *op++ = *ref++; - case 2: *op++ = *ref++; - case 1: *op++ = *ref++; + case 9: *op++ = *ref++; /* fall-thru */ + case 8: *op++ = *ref++; /* fall-thru */ + case 7: *op++ = *ref++; /* fall-thru */ + case 6: *op++ = *ref++; /* fall-thru */ + case 5: *op++ = *ref++; /* fall-thru */ + case 4: *op++ = *ref++; /* fall-thru */ + case 3: *op++ = *ref++; /* fall-thru */ + case 2: *op++ = *ref++; /* fall-thru */ + case 1: *op++ = *ref++; /* fall-thru */ case 0: *op++ = *ref++; /* two octets more */ - *op++ = *ref++; + *op++ = *ref++; /* fall-thru */ } #endif } diff --git a/src/module.c b/src/module.c index cb03ad2cd..fc88e6a28 100644 --- a/src/module.c +++ b/src/module.c @@ -2239,6 +2239,9 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * to avoid a useless copy. */ if (flags & REDISMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD; + + robj *argv[2] = {field,value}; + hashTypeTryConversion(key->value,argv,0,1); updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags); /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(), @@ -3396,7 +3399,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li * * If the specified log level is invalid, verbose is used by default. * There is a fixed limit to the length of the log line this function is able - * to emit, this limti is not specified but is guaranteed to be more than + * to emit, this limit is not specified but is guaranteed to be more than * a few lines of text. */ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { @@ -4499,7 +4502,15 @@ int moduleUnload(sds name) { * MODULE LOAD [args...] */ void moduleCommand(client *c) { char *subcmd = c->argv[1]->ptr; - + if (c->argc == 2 && !strcasecmp(subcmd,"help")) { + const char *help[] = { +"LIST -- Return a list of loaded modules.", +"LOAD [arg ...] -- Load a module library from .", +"UNLOAD -- Unload a module.", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(subcmd,"load") && c->argc >= 3) { robj **argv = NULL; int argc = 0; @@ -4548,7 +4559,8 @@ void moduleCommand(client *c) { } dictReleaseIterator(di); } else { - addReply(c,shared.syntaxerr); + addReplySubcommandSyntaxError(c); + return; } } diff --git a/src/networking.c b/src/networking.c index 00558974e..8aec27280 100644 --- a/src/networking.c +++ b/src/networking.c @@ -75,6 +75,8 @@ void linkClient(client *c) { * this way removing the client in unlinkClient() will not require * a linear scan, but just a constant time operation. */ c->client_list_node = listLast(server.clients); + uint64_t id = htonu64(c->id); + raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); } client *createClient(int fd) { @@ -247,7 +249,7 @@ void _addReplyStringToList(client *c, const char *s, size_t len) { /* Append to this object when possible. If tail == NULL it was * set via addDeferredMultiBulkLength(). */ - if (tail && sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES) { + if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) { tail = sdscatlen(tail,s,len); listNodeValue(ln) = tail; c->reply_bytes += len; @@ -560,6 +562,18 @@ void addReplyHelp(client *c, const char **help) { setDeferredMultiBulkLength(c,blenp,blen); } +/* Add a suggestive error reply. + * This function is typically invoked by from commands that support + * subcommands in response to an unknown subcommand or argument error. */ +void addReplySubcommandSyntaxError(client *c) { + sds cmd = sdsnew((char*) c->argv[0]->ptr); + sdstoupper(cmd); + addReplyErrorFormat(c, + "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.", + c->argv[1]->ptr,cmd); + sdsfree(cmd); +} + /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ @@ -720,6 +734,8 @@ void unlinkClient(client *c) { if (c->fd != -1) { /* Remove from the list of active clients. */ if (c->client_list_node) { + uint64_t id = htonu64(c->id); + raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL); listDelNode(server.clients,c->client_list_node); c->client_list_node = NULL; } @@ -864,6 +880,15 @@ void freeClientsInAsyncFreeQueue(void) { } } +/* Return a client by ID, or NULL if the client ID is not in the set + * of registered clients. Note that "fake clients", created with -1 as FD, + * are not registered clients. */ +client *lookupClientByID(uint64_t id) { + id = htonu64(id); + client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); + return (c == raxNotFound) ? NULL : c; +} + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed. */ int writeToClient(int fd, client *c, int handler_installed) { @@ -1493,6 +1518,7 @@ sds catClientInfoString(sds s, client *client) { *p++ = 'S'; } if (client->flags & CLIENT_MASTER) *p++ = 'M'; + if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; @@ -1531,7 +1557,7 @@ sds catClientInfoString(sds s, client *client) { client->lastcmd ? client->lastcmd->name : "NULL"); } -sds getAllClientsInfoString(void) { +sds getAllClientsInfoString(int type) { listNode *ln; listIter li; client *client; @@ -1540,6 +1566,7 @@ sds getAllClientsInfoString(void) { listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { client = listNodeValue(ln); + if (type != -1 && getClientType(client) != type) continue; o = catClientInfoString(o,client); o = sdscatlen(o,"\n",1); } @@ -1553,22 +1580,40 @@ void clientCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"getname -- Return the name of the current connection.", -"kill -- Kill connection made from .", +"id -- Return the ID of the current connection.", +"getname -- Return the name of the current connection.", +"kill -- Kill connection made from .", "kill