diff --git a/README.md b/README.md index 2b4eeb19b..6c9435b53 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,8 @@ for Ubuntu and Debian systems: % cd utils % ./install_server.sh +_Note_: `install_server.sh` will not work on Mac OSX; it is built for Linux only. + The script will ask you a few questions and will setup everything you need to run Redis properly as a background daemon that will start again on system reboots. diff --git a/src/Makefile b/src/Makefile index 9da1da8d3..93cfdc28e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -310,3 +310,6 @@ install: all $(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN) $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN) @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME) + +uninstall: + rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)} diff --git a/src/aof.c b/src/aof.c index 46ae58324..cafcf961c 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1611,6 +1611,9 @@ void aofRemoveTempFile(pid_t childpid) { snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); unlink(tmpfile); + + snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid); + unlink(tmpfile); } /* Update the server.aof_current_size field explicitly using stat(2) diff --git a/src/blocked.c b/src/blocked.c index f9e196626..1db657869 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -77,10 +77,18 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb * is zero. */ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { long long tval; + long double ftval; - if (getLongLongFromObjectOrReply(c,object,&tval, - "timeout is not an integer or out of range") != C_OK) - return C_ERR; + if (unit == UNIT_SECONDS) { + if (getLongDoubleFromObjectOrReply(c,object,&ftval, + "timeout is not an float or out of range") != C_OK) + return C_ERR; + tval = (long long) (ftval * 1000.0); + } else { + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != C_OK) + return C_ERR; + } if (tval < 0) { addReplyError(c,"timeout is negative"); @@ -88,7 +96,6 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int } if (tval > 0) { - if (unit == UNIT_SECONDS) tval *= 1000; tval += mstime(); } *timeout = tval; diff --git a/src/cluster.c b/src/cluster.c index 1a3a348b5..50a9ae687 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -3031,6 +3031,7 @@ void clusterHandleSlaveFailover(void) { if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } serverLog(LL_WARNING, "Start of election delayed for %lld milliseconds " diff --git a/src/module.c b/src/module.c index 81982ba76..d3d122638 100644 --- a/src/module.c +++ b/src/module.c @@ -1359,6 +1359,9 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction * + * * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication + * link by the MASTER + * * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master * * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave @@ -1391,6 +1394,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { flags |= REDISMODULE_CTX_FLAGS_LUA; if (ctx->client->flags & CLIENT_MULTI) flags |= REDISMODULE_CTX_FLAGS_MULTI; + /* Module command recieved from MASTER, is replicated. */ + if (ctx->client->flags & CLIENT_MASTER) + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; } if (server.cluster_enabled) diff --git a/src/rdb.c b/src/rdb.c index b800ee481..52dddf210 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1965,7 +1965,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } } else if (!strcasecmp(auxkey->ptr,"redis-ver")) { serverLog(LL_NOTICE,"Loading RDB produced by version %s", - auxval->ptr); + (char*)auxval->ptr); } else if (!strcasecmp(auxkey->ptr,"ctime")) { time_t age = time(NULL)-strtol(auxval->ptr,NULL,10); if (age < 0) age = 0; diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index e0c62f9c1..2c53bc936 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -247,11 +247,11 @@ static redisConfig *getRedisConfig(const char *ip, int port, c = redisConnect(ip, port); else c = redisConnectUnix(hostsocket); - if (c->err) { + if (c == NULL || c->err) { fprintf(stderr,"Could not connect to Redis at "); - if (hostsocket == NULL) - fprintf(stderr,"%s:%d: %s\n",ip,port,c->errstr); - else fprintf(stderr,"%s: %s\n",hostsocket,c->errstr); + char *err = (c != NULL ? c->errstr : ""); + if (hostsocket == NULL) fprintf(stderr,"%s:%d: %s\n",ip,port,err); + else fprintf(stderr,"%s: %s\n",hostsocket,err); goto fail; } redisAppendCommand(c, "CONFIG GET %s", "save"); @@ -276,18 +276,16 @@ static redisConfig *getRedisConfig(const char *ip, int port, case 1: cfg->appendonly = sdsnew(value); break; } } - if (reply) freeReplyObject(reply); - if (c) redisFree(c); + freeReplyObject(reply); + redisFree(c); return cfg; fail: - if (reply) freeReplyObject(reply); - if (c) redisFree(c); - zfree(cfg); fprintf(stderr, "ERROR: failed to fetch CONFIG from "); - if (c->connection_type == REDIS_CONN_TCP) - fprintf(stderr, "%s:%d\n", c->tcp.host, c->tcp.port); - else if (c->connection_type == REDIS_CONN_UNIX) - fprintf(stderr, "%s\n", c->unix_sock.path); + if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port); + else fprintf(stderr, "%s\n", hostsocket); + freeReplyObject(reply); + redisFree(c); + zfree(cfg); return NULL; } static void freeRedisConfig(redisConfig *cfg) { @@ -345,7 +343,9 @@ static void randomizeClientKey(client c) { for (i = 0; i < c->randlen; i++) { char *p = c->randptr[i]+11; - size_t r = random() % config.randomkeys_keyspacelen; + size_t r = 0; + if (config.randomkeys_keyspacelen != 0) + r = random() % config.randomkeys_keyspacelen; size_t j; for (j = 0; j < 12; j++) { @@ -453,27 +453,27 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } - if (config.cluster_mode && is_err && c->cluster_node && - (!strncmp(r->str,"MOVED",5) || - !strcmp(r->str,"ASK"))) - { - /* Try to update slots configuration if the key of the - * command is using the slot hash tag. */ - if (c->staglen && !fetchClusterSlotsConfiguration(c)) - exit(1); - /* - clusterNode *node = c->cluster_node; - assert(node); - if (++node->current_slot_index >= node->slots_count) { - if (config.showerrors) { - fprintf(stderr, "WARN: No more available slots in " - "node %s:%d\n", node->ip, node->port); - } - freeReplyObject(reply); - freeClient(c); - return; + /* Try to update slots configuration if reply error is + * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command + * contain(s) the slot hash tag. */ + if (is_err && c->cluster_node && c->staglen) { + int fetch_slots = 0, do_wait = 0; + if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) + fetch_slots = 1; + else if (!strncmp(r->str,"CLUSTERDOWN",11)) { + /* Usually the cluster is able to recover itself after + * a CLUSTERDOWN error, so try to sleep one second + * before requesting the new configuration. */ + fetch_slots = 1; + do_wait = 1; + printf("Error from server %s:%d: %s\n", + c->cluster_node->ip, + c->cluster_node->port, + r->str); } - */ + if (do_wait) sleep(1); + if (fetch_slots && !fetchClusterSlotsConfiguration(c)) + exit(1); } freeReplyObject(reply); @@ -823,22 +823,37 @@ static void showLatencyReport(void) { } } -static void benchmark(char *title, char *cmd, int len) { +static void initBenchmarkThreads() { int i; + if (config.threads) freeBenchmarkThreads(); + config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*)); + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *thread = createBenchmarkThread(i); + config.threads[i] = thread; + } +} + +static void startBenchmarkThreads() { + int i; + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *t = config.threads[i]; + if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ + fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); + exit(1); + } + } + for (i = 0; i < config.num_threads; i++) + pthread_join(config.threads[i]->thread, NULL); +} + +static void benchmark(char *title, char *cmd, int len) { client c; config.title = title; config.requests_issued = 0; config.requests_finished = 0; - if (config.num_threads) { - if (config.threads) freeBenchmarkThreads(); - config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*)); - for (i = 0; i < config.num_threads; i++) { - benchmarkThread *thread = createBenchmarkThread(i); - config.threads[i] = thread; - } - } + if (config.num_threads) initBenchmarkThreads(); int thread_id = config.num_threads > 0 ? 0 : -1; c = createClient(cmd,len,NULL,thread_id); @@ -846,17 +861,7 @@ static void benchmark(char *title, char *cmd, int len) { config.start = mstime(); if (!config.num_threads) aeMain(config.el); - else { - for (i = 0; i < config.num_threads; i++) { - benchmarkThread *t = config.threads[i]; - if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ - fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); - exit(1); - } - } - for (i = 0; i < config.num_threads; i++) - pthread_join(config.threads[i]->thread, NULL); - } + else startBenchmarkThreads(); config.totlatency = mstime()-config.start; showLatencyReport(); @@ -1289,6 +1294,11 @@ int parseOptions(int argc, const char **argv) { if (config.pipeline <= 0) config.pipeline=1; } else if (!strcmp(argv[i],"-r")) { if (lastarg) goto invalid; + const char *next = argv[++i], *p = next; + if (*p == '-') { + p++; + if (*p < '0' || *p > '9') goto invalid; + } config.randomkeys = 1; config.randomkeys_keyspacelen = atoi(argv[++i]); if (config.randomkeys_keyspacelen < 0) @@ -1552,9 +1562,15 @@ int main(int argc, const char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - c = createClient("",0,NULL,-1); /* will never receive a reply */ + int thread_id = -1, use_threads = (config.num_threads > 0); + if (use_threads) { + thread_id = 0; + initBenchmarkThreads(); + } + c = createClient("",0,NULL,thread_id); /* will never receive a reply */ createMissingClients(c); - aeMain(config.el); + if (use_threads) startBenchmarkThreads(); + else aeMain(config.el); /* and will wait for every */ } diff --git a/src/redis-cli.c b/src/redis-cli.c index 0e52c16be..e363a2795 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2268,7 +2268,7 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) { static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) { assert(config.cluster_manager_command.backup_dir); sds filename = sdsnew(config.cluster_manager_command.backup_dir); - if (filename[sdslen(filename)] - 1 != '/') + if (filename[sdslen(filename) - 1] != '/') filename = sdscat(filename, "/"); filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip, node->port, node->name); @@ -2726,12 +2726,42 @@ static sds clusterManagerNodeGetJSON(clusterManagerNode *node, slots, node->slots_count, flags, - node->current_epoch + (unsigned long long)node->current_epoch ); if (error_count > 0) { json = sdscatprintf(json, ",\n \"cluster_errors\": %lu", error_count); } + if (node->migrating_count > 0 && node->migrating != NULL) { + int i = 0; + sds migrating = sdsempty(); + for (; i < node->migrating_count; i += 2) { + sds slot = node->migrating[i]; + sds dest = node->migrating[i + 1]; + if (slot && dest) { + if (sdslen(migrating) > 0) migrating = sdscat(migrating, ","); + migrating = sdscatfmt(migrating, "\"%S\": \"%S\"", slot, dest); + } + } + if (sdslen(migrating) > 0) + json = sdscatfmt(json, ",\n \"migrating\": {%S}", migrating); + sdsfree(migrating); + } + if (node->importing_count > 0 && node->importing != NULL) { + int i = 0; + sds importing = sdsempty(); + for (; i < node->importing_count; i += 2) { + sds slot = node->importing[i]; + sds from = node->importing[i + 1]; + if (slot && from) { + if (sdslen(importing) > 0) importing = sdscat(importing, ","); + importing = sdscatfmt(importing, "\"%S\": \"%S\"", slot, from); + } + } + if (sdslen(importing) > 0) + json = sdscatfmt(json, ",\n \"importing\": {%S}", importing); + sdsfree(importing); + } json = sdscat(json, "\n }"); sdsfree(replicate); sdsfree(slots); @@ -2841,7 +2871,7 @@ static void clusterManagerShowClusterInfo(void) { replicas++; } redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE"); - if (reply != NULL || reply->type == REDIS_REPLY_INTEGER) + if (reply != NULL && reply->type == REDIS_REPLY_INTEGER) dbsize = reply->integer; if (dbsize < 0) { char *err = ""; diff --git a/src/redismodule.h b/src/redismodule.h index d18c38881..540f8e3db 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -85,6 +85,9 @@ #define REDISMODULE_CTX_FLAGS_OOM (1<<10) /* Less than 25% of memory available according to maxmemory. */ #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11) +/* The command was sent over the replication link. */ +#define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12) + #define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ #define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */ diff --git a/src/server.c b/src/server.c index 9de579815..712cda1bd 100644 --- a/src/server.c +++ b/src/server.c @@ -480,11 +480,11 @@ struct redisCommand redisCommandTable[] = { "write fast @sortedset", 0,NULL,1,1,1,0,0,0}, - {"bzpopmin",bzpopminCommand,-2, + {"bzpopmin",bzpopminCommand,-3, "write no-script fast @sortedset @blocking", 0,NULL,1,-2,1,0,0,0}, - {"bzpopmax",bzpopmaxCommand,-2, + {"bzpopmax",bzpopmaxCommand,-3, "write no-script fast @sortedset @blocking", 0,NULL,1,-2,1,0,0,0}, diff --git a/src/t_set.c b/src/t_set.c index 290a83e6d..cbe55aaa4 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1064,7 +1064,8 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, sdsfree(ele); } setTypeReleaseIterator(si); - decrRefCount(dstset); + server.lazyfree_lazy_server_del ? freeObjAsync(dstset) : + decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ diff --git a/src/t_stream.c b/src/t_stream.c index 1a5acac42..f4ace87a2 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1737,14 +1737,17 @@ NULL /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { o = lookupKeyWrite(c->db,c->argv[2]); - if (o) s = o->ptr; + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; + } grpname = c->argv[3]->ptr; } /* Check for missing key/group. */ if (c->argc >= 4 && !mkstream) { /* At this point key must exist, or there is an error. */ - if (o == NULL) { + if (s == NULL) { addReplyError(c, "The XGROUP subcommand requires the key to exist. " "Note that for CREATE you may want to use the MKSTREAM " @@ -1752,8 +1755,6 @@ NULL return; } - if (checkType(c,o,OBJ_STREAM)) return; - /* Certain subcommands require the group to exist. */ if ((cg = streamLookupCG(s,grpname)) == NULL && (!strcasecmp(opt,"SETID") || @@ -1781,7 +1782,8 @@ NULL } /* Handle the MKSTREAM option now that the command can no longer fail. */ - if (s == NULL && mkstream) { + if (s == NULL) { + serverAssert(mkstream); o = createStreamObject(); dbAdd(c->db,c->argv[2],o); s = o->ptr; @@ -2279,8 +2281,13 @@ void xclaimCommand(client *c) { /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given. */ - if (retrycount >= 0) nack->delivery_count = retrycount; + /* Set the delivery attempts counter if given, otherwise + * autoincrement unless JUSTID option provided */ + if (retrycount >= 0) { + nack->delivery_count = retrycount; + } else if (!justid) { + nack->delivery_count++; + } /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */ diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 1557082a2..676896a75 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -436,8 +436,11 @@ start_server { test "$pop: with non-integer timeout" { set rd [redis_deferring_client] - $rd $pop blist1 1.1 - assert_error "ERR*not an integer*" {$rd read} + r del blist1 + $rd $pop blist1 0.1 + r rpush blist1 foo + assert_equal {blist1 foo} [$rd read] + assert_equal 0 [r exists blist1] } test "$pop: with zero timeout should block indefinitely" { diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 13981cc22..34d4061c2 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -195,6 +195,49 @@ start_server { assert_equal "" [lindex $reply 0] } + test {XCLAIM without JUSTID increments delivery count} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + + # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client3 10 $id1 JUSTID + ] + assert {[llength $reply] == 1} + assert {[lindex $reply 0] eq $id1} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] diff --git a/utils/install_server.sh b/utils/install_server.sh index 7eb341417..8e5753bc6 100755 --- a/utils/install_server.sh +++ b/utils/install_server.sh @@ -43,6 +43,9 @@ # # /!\ This script should be run as root # +# NOTE: This script will not work on Mac OSX. +# It supports Debian and Ubuntu Linux. +# ################################################################################ die () {