Merge branch 'unstable' of github.com:antirez/redis into unstable
This commit is contained in:
commit
48ce543a18
@ -166,6 +166,8 @@ for Ubuntu and Debian systems:
|
|||||||
% cd utils
|
% cd utils
|
||||||
% ./install_server.sh
|
% ./install_server.sh
|
||||||
|
|
||||||
|
_Note_: `install_server.sh` will not work on Mac OSX; it is built for Linux only.
|
||||||
|
|
||||||
The script will ask you a few questions and will setup everything you need
|
The script will ask you a few questions and will setup everything you need
|
||||||
to run Redis properly as a background daemon that will start again on
|
to run Redis properly as a background daemon that will start again on
|
||||||
system reboots.
|
system reboots.
|
||||||
|
@ -310,3 +310,6 @@ install: all
|
|||||||
$(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN)
|
$(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN)
|
||||||
$(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
|
$(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
|
||||||
@ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME)
|
@ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME)
|
||||||
|
|
||||||
|
uninstall:
|
||||||
|
rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)}
|
||||||
|
@ -1611,6 +1611,9 @@ void aofRemoveTempFile(pid_t childpid) {
|
|||||||
|
|
||||||
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
|
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
|
||||||
unlink(tmpfile);
|
unlink(tmpfile);
|
||||||
|
|
||||||
|
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid);
|
||||||
|
unlink(tmpfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Update the server.aof_current_size field explicitly using stat(2)
|
/* Update the server.aof_current_size field explicitly using stat(2)
|
||||||
|
@ -77,10 +77,18 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
* is zero. */
|
* is zero. */
|
||||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
||||||
long long tval;
|
long long tval;
|
||||||
|
long double ftval;
|
||||||
|
|
||||||
|
if (unit == UNIT_SECONDS) {
|
||||||
|
if (getLongDoubleFromObjectOrReply(c,object,&ftval,
|
||||||
|
"timeout is not an float or out of range") != C_OK)
|
||||||
|
return C_ERR;
|
||||||
|
tval = (long long) (ftval * 1000.0);
|
||||||
|
} else {
|
||||||
if (getLongLongFromObjectOrReply(c,object,&tval,
|
if (getLongLongFromObjectOrReply(c,object,&tval,
|
||||||
"timeout is not an integer or out of range") != C_OK)
|
"timeout is not an integer or out of range") != C_OK)
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
if (tval < 0) {
|
if (tval < 0) {
|
||||||
addReplyError(c,"timeout is negative");
|
addReplyError(c,"timeout is negative");
|
||||||
@ -88,7 +96,6 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (tval > 0) {
|
if (tval > 0) {
|
||||||
if (unit == UNIT_SECONDS) tval *= 1000;
|
|
||||||
tval += mstime();
|
tval += mstime();
|
||||||
}
|
}
|
||||||
*timeout = tval;
|
*timeout = tval;
|
||||||
|
@ -3031,6 +3031,7 @@ void clusterHandleSlaveFailover(void) {
|
|||||||
if (server.cluster->mf_end) {
|
if (server.cluster->mf_end) {
|
||||||
server.cluster->failover_auth_time = mstime();
|
server.cluster->failover_auth_time = mstime();
|
||||||
server.cluster->failover_auth_rank = 0;
|
server.cluster->failover_auth_rank = 0;
|
||||||
|
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
|
||||||
}
|
}
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Start of election delayed for %lld milliseconds "
|
"Start of election delayed for %lld milliseconds "
|
||||||
|
@ -1359,6 +1359,9 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
|||||||
*
|
*
|
||||||
* * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
|
* * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
|
||||||
*
|
*
|
||||||
|
* * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication
|
||||||
|
* link by the MASTER
|
||||||
|
*
|
||||||
* * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
|
* * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
|
||||||
*
|
*
|
||||||
* * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
|
* * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
|
||||||
@ -1391,6 +1394,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
|||||||
flags |= REDISMODULE_CTX_FLAGS_LUA;
|
flags |= REDISMODULE_CTX_FLAGS_LUA;
|
||||||
if (ctx->client->flags & CLIENT_MULTI)
|
if (ctx->client->flags & CLIENT_MULTI)
|
||||||
flags |= REDISMODULE_CTX_FLAGS_MULTI;
|
flags |= REDISMODULE_CTX_FLAGS_MULTI;
|
||||||
|
/* Module command recieved from MASTER, is replicated. */
|
||||||
|
if (ctx->client->flags & CLIENT_MASTER)
|
||||||
|
flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.cluster_enabled)
|
if (server.cluster_enabled)
|
||||||
|
@ -1965,7 +1965,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
}
|
}
|
||||||
} else if (!strcasecmp(auxkey->ptr,"redis-ver")) {
|
} else if (!strcasecmp(auxkey->ptr,"redis-ver")) {
|
||||||
serverLog(LL_NOTICE,"Loading RDB produced by version %s",
|
serverLog(LL_NOTICE,"Loading RDB produced by version %s",
|
||||||
auxval->ptr);
|
(char*)auxval->ptr);
|
||||||
} else if (!strcasecmp(auxkey->ptr,"ctime")) {
|
} else if (!strcasecmp(auxkey->ptr,"ctime")) {
|
||||||
time_t age = time(NULL)-strtol(auxval->ptr,NULL,10);
|
time_t age = time(NULL)-strtol(auxval->ptr,NULL,10);
|
||||||
if (age < 0) age = 0;
|
if (age < 0) age = 0;
|
||||||
|
@ -247,11 +247,11 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
|||||||
c = redisConnect(ip, port);
|
c = redisConnect(ip, port);
|
||||||
else
|
else
|
||||||
c = redisConnectUnix(hostsocket);
|
c = redisConnectUnix(hostsocket);
|
||||||
if (c->err) {
|
if (c == NULL || c->err) {
|
||||||
fprintf(stderr,"Could not connect to Redis at ");
|
fprintf(stderr,"Could not connect to Redis at ");
|
||||||
if (hostsocket == NULL)
|
char *err = (c != NULL ? c->errstr : "");
|
||||||
fprintf(stderr,"%s:%d: %s\n",ip,port,c->errstr);
|
if (hostsocket == NULL) fprintf(stderr,"%s:%d: %s\n",ip,port,err);
|
||||||
else fprintf(stderr,"%s: %s\n",hostsocket,c->errstr);
|
else fprintf(stderr,"%s: %s\n",hostsocket,err);
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
redisAppendCommand(c, "CONFIG GET %s", "save");
|
redisAppendCommand(c, "CONFIG GET %s", "save");
|
||||||
@ -276,18 +276,16 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
|||||||
case 1: cfg->appendonly = sdsnew(value); break;
|
case 1: cfg->appendonly = sdsnew(value); break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (reply) freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
if (c) redisFree(c);
|
redisFree(c);
|
||||||
return cfg;
|
return cfg;
|
||||||
fail:
|
fail:
|
||||||
if (reply) freeReplyObject(reply);
|
|
||||||
if (c) redisFree(c);
|
|
||||||
zfree(cfg);
|
|
||||||
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
|
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
|
||||||
if (c->connection_type == REDIS_CONN_TCP)
|
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
|
||||||
fprintf(stderr, "%s:%d\n", c->tcp.host, c->tcp.port);
|
else fprintf(stderr, "%s\n", hostsocket);
|
||||||
else if (c->connection_type == REDIS_CONN_UNIX)
|
freeReplyObject(reply);
|
||||||
fprintf(stderr, "%s\n", c->unix_sock.path);
|
redisFree(c);
|
||||||
|
zfree(cfg);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
static void freeRedisConfig(redisConfig *cfg) {
|
static void freeRedisConfig(redisConfig *cfg) {
|
||||||
@ -345,7 +343,9 @@ static void randomizeClientKey(client c) {
|
|||||||
|
|
||||||
for (i = 0; i < c->randlen; i++) {
|
for (i = 0; i < c->randlen; i++) {
|
||||||
char *p = c->randptr[i]+11;
|
char *p = c->randptr[i]+11;
|
||||||
size_t r = random() % config.randomkeys_keyspacelen;
|
size_t r = 0;
|
||||||
|
if (config.randomkeys_keyspacelen != 0)
|
||||||
|
r = random() % config.randomkeys_keyspacelen;
|
||||||
size_t j;
|
size_t j;
|
||||||
|
|
||||||
for (j = 0; j < 12; j++) {
|
for (j = 0; j < 12; j++) {
|
||||||
@ -453,27 +453,27 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.cluster_mode && is_err && c->cluster_node &&
|
/* Try to update slots configuration if reply error is
|
||||||
(!strncmp(r->str,"MOVED",5) ||
|
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
|
||||||
!strcmp(r->str,"ASK")))
|
* contain(s) the slot hash tag. */
|
||||||
{
|
if (is_err && c->cluster_node && c->staglen) {
|
||||||
/* Try to update slots configuration if the key of the
|
int fetch_slots = 0, do_wait = 0;
|
||||||
* command is using the slot hash tag. */
|
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
|
||||||
if (c->staglen && !fetchClusterSlotsConfiguration(c))
|
fetch_slots = 1;
|
||||||
|
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
|
||||||
|
/* Usually the cluster is able to recover itself after
|
||||||
|
* a CLUSTERDOWN error, so try to sleep one second
|
||||||
|
* before requesting the new configuration. */
|
||||||
|
fetch_slots = 1;
|
||||||
|
do_wait = 1;
|
||||||
|
printf("Error from server %s:%d: %s\n",
|
||||||
|
c->cluster_node->ip,
|
||||||
|
c->cluster_node->port,
|
||||||
|
r->str);
|
||||||
|
}
|
||||||
|
if (do_wait) sleep(1);
|
||||||
|
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
|
||||||
exit(1);
|
exit(1);
|
||||||
/*
|
|
||||||
clusterNode *node = c->cluster_node;
|
|
||||||
assert(node);
|
|
||||||
if (++node->current_slot_index >= node->slots_count) {
|
|
||||||
if (config.showerrors) {
|
|
||||||
fprintf(stderr, "WARN: No more available slots in "
|
|
||||||
"node %s:%d\n", node->ip, node->port);
|
|
||||||
}
|
|
||||||
freeReplyObject(reply);
|
|
||||||
freeClient(c);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
freeReplyObject(reply);
|
freeReplyObject(reply);
|
||||||
@ -823,15 +823,8 @@ static void showLatencyReport(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void benchmark(char *title, char *cmd, int len) {
|
static void initBenchmarkThreads() {
|
||||||
int i;
|
int i;
|
||||||
client c;
|
|
||||||
|
|
||||||
config.title = title;
|
|
||||||
config.requests_issued = 0;
|
|
||||||
config.requests_finished = 0;
|
|
||||||
|
|
||||||
if (config.num_threads) {
|
|
||||||
if (config.threads) freeBenchmarkThreads();
|
if (config.threads) freeBenchmarkThreads();
|
||||||
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
|
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
|
||||||
for (i = 0; i < config.num_threads; i++) {
|
for (i = 0; i < config.num_threads; i++) {
|
||||||
@ -840,13 +833,8 @@ static void benchmark(char *title, char *cmd, int len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int thread_id = config.num_threads > 0 ? 0 : -1;
|
static void startBenchmarkThreads() {
|
||||||
c = createClient(cmd,len,NULL,thread_id);
|
int i;
|
||||||
createMissingClients(c);
|
|
||||||
|
|
||||||
config.start = mstime();
|
|
||||||
if (!config.num_threads) aeMain(config.el);
|
|
||||||
else {
|
|
||||||
for (i = 0; i < config.num_threads; i++) {
|
for (i = 0; i < config.num_threads; i++) {
|
||||||
benchmarkThread *t = config.threads[i];
|
benchmarkThread *t = config.threads[i];
|
||||||
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
|
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
|
||||||
@ -857,6 +845,23 @@ static void benchmark(char *title, char *cmd, int len) {
|
|||||||
for (i = 0; i < config.num_threads; i++)
|
for (i = 0; i < config.num_threads; i++)
|
||||||
pthread_join(config.threads[i]->thread, NULL);
|
pthread_join(config.threads[i]->thread, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void benchmark(char *title, char *cmd, int len) {
|
||||||
|
client c;
|
||||||
|
|
||||||
|
config.title = title;
|
||||||
|
config.requests_issued = 0;
|
||||||
|
config.requests_finished = 0;
|
||||||
|
|
||||||
|
if (config.num_threads) initBenchmarkThreads();
|
||||||
|
|
||||||
|
int thread_id = config.num_threads > 0 ? 0 : -1;
|
||||||
|
c = createClient(cmd,len,NULL,thread_id);
|
||||||
|
createMissingClients(c);
|
||||||
|
|
||||||
|
config.start = mstime();
|
||||||
|
if (!config.num_threads) aeMain(config.el);
|
||||||
|
else startBenchmarkThreads();
|
||||||
config.totlatency = mstime()-config.start;
|
config.totlatency = mstime()-config.start;
|
||||||
|
|
||||||
showLatencyReport();
|
showLatencyReport();
|
||||||
@ -1289,6 +1294,11 @@ int parseOptions(int argc, const char **argv) {
|
|||||||
if (config.pipeline <= 0) config.pipeline=1;
|
if (config.pipeline <= 0) config.pipeline=1;
|
||||||
} else if (!strcmp(argv[i],"-r")) {
|
} else if (!strcmp(argv[i],"-r")) {
|
||||||
if (lastarg) goto invalid;
|
if (lastarg) goto invalid;
|
||||||
|
const char *next = argv[++i], *p = next;
|
||||||
|
if (*p == '-') {
|
||||||
|
p++;
|
||||||
|
if (*p < '0' || *p > '9') goto invalid;
|
||||||
|
}
|
||||||
config.randomkeys = 1;
|
config.randomkeys = 1;
|
||||||
config.randomkeys_keyspacelen = atoi(argv[++i]);
|
config.randomkeys_keyspacelen = atoi(argv[++i]);
|
||||||
if (config.randomkeys_keyspacelen < 0)
|
if (config.randomkeys_keyspacelen < 0)
|
||||||
@ -1552,9 +1562,15 @@ int main(int argc, const char **argv) {
|
|||||||
|
|
||||||
if (config.idlemode) {
|
if (config.idlemode) {
|
||||||
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
|
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
|
||||||
c = createClient("",0,NULL,-1); /* will never receive a reply */
|
int thread_id = -1, use_threads = (config.num_threads > 0);
|
||||||
|
if (use_threads) {
|
||||||
|
thread_id = 0;
|
||||||
|
initBenchmarkThreads();
|
||||||
|
}
|
||||||
|
c = createClient("",0,NULL,thread_id); /* will never receive a reply */
|
||||||
createMissingClients(c);
|
createMissingClients(c);
|
||||||
aeMain(config.el);
|
if (use_threads) startBenchmarkThreads();
|
||||||
|
else aeMain(config.el);
|
||||||
/* and will wait for every */
|
/* and will wait for every */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2268,7 +2268,7 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
|
|||||||
static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) {
|
static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) {
|
||||||
assert(config.cluster_manager_command.backup_dir);
|
assert(config.cluster_manager_command.backup_dir);
|
||||||
sds filename = sdsnew(config.cluster_manager_command.backup_dir);
|
sds filename = sdsnew(config.cluster_manager_command.backup_dir);
|
||||||
if (filename[sdslen(filename)] - 1 != '/')
|
if (filename[sdslen(filename) - 1] != '/')
|
||||||
filename = sdscat(filename, "/");
|
filename = sdscat(filename, "/");
|
||||||
filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip,
|
filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip,
|
||||||
node->port, node->name);
|
node->port, node->name);
|
||||||
@ -2726,12 +2726,42 @@ static sds clusterManagerNodeGetJSON(clusterManagerNode *node,
|
|||||||
slots,
|
slots,
|
||||||
node->slots_count,
|
node->slots_count,
|
||||||
flags,
|
flags,
|
||||||
node->current_epoch
|
(unsigned long long)node->current_epoch
|
||||||
);
|
);
|
||||||
if (error_count > 0) {
|
if (error_count > 0) {
|
||||||
json = sdscatprintf(json, ",\n \"cluster_errors\": %lu",
|
json = sdscatprintf(json, ",\n \"cluster_errors\": %lu",
|
||||||
error_count);
|
error_count);
|
||||||
}
|
}
|
||||||
|
if (node->migrating_count > 0 && node->migrating != NULL) {
|
||||||
|
int i = 0;
|
||||||
|
sds migrating = sdsempty();
|
||||||
|
for (; i < node->migrating_count; i += 2) {
|
||||||
|
sds slot = node->migrating[i];
|
||||||
|
sds dest = node->migrating[i + 1];
|
||||||
|
if (slot && dest) {
|
||||||
|
if (sdslen(migrating) > 0) migrating = sdscat(migrating, ",");
|
||||||
|
migrating = sdscatfmt(migrating, "\"%S\": \"%S\"", slot, dest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sdslen(migrating) > 0)
|
||||||
|
json = sdscatfmt(json, ",\n \"migrating\": {%S}", migrating);
|
||||||
|
sdsfree(migrating);
|
||||||
|
}
|
||||||
|
if (node->importing_count > 0 && node->importing != NULL) {
|
||||||
|
int i = 0;
|
||||||
|
sds importing = sdsempty();
|
||||||
|
for (; i < node->importing_count; i += 2) {
|
||||||
|
sds slot = node->importing[i];
|
||||||
|
sds from = node->importing[i + 1];
|
||||||
|
if (slot && from) {
|
||||||
|
if (sdslen(importing) > 0) importing = sdscat(importing, ",");
|
||||||
|
importing = sdscatfmt(importing, "\"%S\": \"%S\"", slot, from);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sdslen(importing) > 0)
|
||||||
|
json = sdscatfmt(json, ",\n \"importing\": {%S}", importing);
|
||||||
|
sdsfree(importing);
|
||||||
|
}
|
||||||
json = sdscat(json, "\n }");
|
json = sdscat(json, "\n }");
|
||||||
sdsfree(replicate);
|
sdsfree(replicate);
|
||||||
sdsfree(slots);
|
sdsfree(slots);
|
||||||
@ -2841,7 +2871,7 @@ static void clusterManagerShowClusterInfo(void) {
|
|||||||
replicas++;
|
replicas++;
|
||||||
}
|
}
|
||||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE");
|
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE");
|
||||||
if (reply != NULL || reply->type == REDIS_REPLY_INTEGER)
|
if (reply != NULL && reply->type == REDIS_REPLY_INTEGER)
|
||||||
dbsize = reply->integer;
|
dbsize = reply->integer;
|
||||||
if (dbsize < 0) {
|
if (dbsize < 0) {
|
||||||
char *err = "";
|
char *err = "";
|
||||||
|
@ -85,6 +85,9 @@
|
|||||||
#define REDISMODULE_CTX_FLAGS_OOM (1<<10)
|
#define REDISMODULE_CTX_FLAGS_OOM (1<<10)
|
||||||
/* Less than 25% of memory available according to maxmemory. */
|
/* Less than 25% of memory available according to maxmemory. */
|
||||||
#define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
|
#define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11)
|
||||||
|
/* The command was sent over the replication link. */
|
||||||
|
#define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12)
|
||||||
|
|
||||||
|
|
||||||
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
|
#define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */
|
||||||
#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */
|
#define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */
|
||||||
|
@ -480,11 +480,11 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
"write fast @sortedset",
|
"write fast @sortedset",
|
||||||
0,NULL,1,1,1,0,0,0},
|
0,NULL,1,1,1,0,0,0},
|
||||||
|
|
||||||
{"bzpopmin",bzpopminCommand,-2,
|
{"bzpopmin",bzpopminCommand,-3,
|
||||||
"write no-script fast @sortedset @blocking",
|
"write no-script fast @sortedset @blocking",
|
||||||
0,NULL,1,-2,1,0,0,0},
|
0,NULL,1,-2,1,0,0,0},
|
||||||
|
|
||||||
{"bzpopmax",bzpopmaxCommand,-2,
|
{"bzpopmax",bzpopmaxCommand,-3,
|
||||||
"write no-script fast @sortedset @blocking",
|
"write no-script fast @sortedset @blocking",
|
||||||
0,NULL,1,-2,1,0,0,0},
|
0,NULL,1,-2,1,0,0,0},
|
||||||
|
|
||||||
|
@ -1064,6 +1064,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
|
|||||||
sdsfree(ele);
|
sdsfree(ele);
|
||||||
}
|
}
|
||||||
setTypeReleaseIterator(si);
|
setTypeReleaseIterator(si);
|
||||||
|
server.lazyfree_lazy_server_del ? freeObjAsync(dstset) :
|
||||||
decrRefCount(dstset);
|
decrRefCount(dstset);
|
||||||
} else {
|
} else {
|
||||||
/* If we have a target key where to store the resulting set
|
/* If we have a target key where to store the resulting set
|
||||||
|
@ -1737,14 +1737,17 @@ NULL
|
|||||||
/* Everything but the "HELP" option requires a key and group name. */
|
/* Everything but the "HELP" option requires a key and group name. */
|
||||||
if (c->argc >= 4) {
|
if (c->argc >= 4) {
|
||||||
o = lookupKeyWrite(c->db,c->argv[2]);
|
o = lookupKeyWrite(c->db,c->argv[2]);
|
||||||
if (o) s = o->ptr;
|
if (o) {
|
||||||
|
if (checkType(c,o,OBJ_STREAM)) return;
|
||||||
|
s = o->ptr;
|
||||||
|
}
|
||||||
grpname = c->argv[3]->ptr;
|
grpname = c->argv[3]->ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check for missing key/group. */
|
/* Check for missing key/group. */
|
||||||
if (c->argc >= 4 && !mkstream) {
|
if (c->argc >= 4 && !mkstream) {
|
||||||
/* At this point key must exist, or there is an error. */
|
/* At this point key must exist, or there is an error. */
|
||||||
if (o == NULL) {
|
if (s == NULL) {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"The XGROUP subcommand requires the key to exist. "
|
"The XGROUP subcommand requires the key to exist. "
|
||||||
"Note that for CREATE you may want to use the MKSTREAM "
|
"Note that for CREATE you may want to use the MKSTREAM "
|
||||||
@ -1752,8 +1755,6 @@ NULL
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkType(c,o,OBJ_STREAM)) return;
|
|
||||||
|
|
||||||
/* Certain subcommands require the group to exist. */
|
/* Certain subcommands require the group to exist. */
|
||||||
if ((cg = streamLookupCG(s,grpname)) == NULL &&
|
if ((cg = streamLookupCG(s,grpname)) == NULL &&
|
||||||
(!strcasecmp(opt,"SETID") ||
|
(!strcasecmp(opt,"SETID") ||
|
||||||
@ -1781,7 +1782,8 @@ NULL
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Handle the MKSTREAM option now that the command can no longer fail. */
|
/* Handle the MKSTREAM option now that the command can no longer fail. */
|
||||||
if (s == NULL && mkstream) {
|
if (s == NULL) {
|
||||||
|
serverAssert(mkstream);
|
||||||
o = createStreamObject();
|
o = createStreamObject();
|
||||||
dbAdd(c->db,c->argv[2],o);
|
dbAdd(c->db,c->argv[2],o);
|
||||||
s = o->ptr;
|
s = o->ptr;
|
||||||
@ -2279,8 +2281,13 @@ void xclaimCommand(client *c) {
|
|||||||
/* Update the consumer and idle time. */
|
/* Update the consumer and idle time. */
|
||||||
nack->consumer = consumer;
|
nack->consumer = consumer;
|
||||||
nack->delivery_time = deliverytime;
|
nack->delivery_time = deliverytime;
|
||||||
/* Set the delivery attempts counter if given. */
|
/* Set the delivery attempts counter if given, otherwise
|
||||||
if (retrycount >= 0) nack->delivery_count = retrycount;
|
* autoincrement unless JUSTID option provided */
|
||||||
|
if (retrycount >= 0) {
|
||||||
|
nack->delivery_count = retrycount;
|
||||||
|
} else if (!justid) {
|
||||||
|
nack->delivery_count++;
|
||||||
|
}
|
||||||
/* Add the entry in the new consumer local PEL. */
|
/* Add the entry in the new consumer local PEL. */
|
||||||
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
|
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
|
||||||
/* Send the reply for this entry. */
|
/* Send the reply for this entry. */
|
||||||
|
@ -436,8 +436,11 @@ start_server {
|
|||||||
|
|
||||||
test "$pop: with non-integer timeout" {
|
test "$pop: with non-integer timeout" {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
$rd $pop blist1 1.1
|
r del blist1
|
||||||
assert_error "ERR*not an integer*" {$rd read}
|
$rd $pop blist1 0.1
|
||||||
|
r rpush blist1 foo
|
||||||
|
assert_equal {blist1 foo} [$rd read]
|
||||||
|
assert_equal 0 [r exists blist1]
|
||||||
}
|
}
|
||||||
|
|
||||||
test "$pop: with zero timeout should block indefinitely" {
|
test "$pop: with zero timeout should block indefinitely" {
|
||||||
|
@ -195,6 +195,49 @@ start_server {
|
|||||||
assert_equal "" [lindex $reply 0]
|
assert_equal "" [lindex $reply 0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {XCLAIM without JUSTID increments delivery count} {
|
||||||
|
# Add 3 items into the stream, and create a consumer group
|
||||||
|
r del mystream
|
||||||
|
set id1 [r XADD mystream * a 1]
|
||||||
|
set id2 [r XADD mystream * b 2]
|
||||||
|
set id3 [r XADD mystream * c 3]
|
||||||
|
r XGROUP CREATE mystream mygroup 0
|
||||||
|
|
||||||
|
# Client 1 reads item 1 from the stream without acknowledgements.
|
||||||
|
# Client 2 then claims pending item 1 from the PEL of client 1
|
||||||
|
set reply [
|
||||||
|
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
|
||||||
|
]
|
||||||
|
assert {[llength [lindex $reply 0 1 0 1]] == 2}
|
||||||
|
assert {[lindex $reply 0 1 0 1] eq {a 1}}
|
||||||
|
r debug sleep 0.2
|
||||||
|
set reply [
|
||||||
|
r XCLAIM mystream mygroup client2 10 $id1
|
||||||
|
]
|
||||||
|
assert {[llength [lindex $reply 0 1]] == 2}
|
||||||
|
assert {[lindex $reply 0 1] eq {a 1}}
|
||||||
|
|
||||||
|
set reply [
|
||||||
|
r XPENDING mystream mygroup - + 10
|
||||||
|
]
|
||||||
|
assert {[llength [lindex $reply 0]] == 4}
|
||||||
|
assert {[lindex $reply 0 3] == 2}
|
||||||
|
|
||||||
|
# Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
|
||||||
|
r debug sleep 0.2
|
||||||
|
set reply [
|
||||||
|
r XCLAIM mystream mygroup client3 10 $id1 JUSTID
|
||||||
|
]
|
||||||
|
assert {[llength $reply] == 1}
|
||||||
|
assert {[lindex $reply 0] eq $id1}
|
||||||
|
|
||||||
|
set reply [
|
||||||
|
r XPENDING mystream mygroup - + 10
|
||||||
|
]
|
||||||
|
assert {[llength [lindex $reply 0]] == 4}
|
||||||
|
assert {[lindex $reply 0 3] == 2}
|
||||||
|
}
|
||||||
|
|
||||||
start_server {} {
|
start_server {} {
|
||||||
set master [srv -1 client]
|
set master [srv -1 client]
|
||||||
set master_host [srv -1 host]
|
set master_host [srv -1 host]
|
||||||
|
@ -43,6 +43,9 @@
|
|||||||
#
|
#
|
||||||
# /!\ This script should be run as root
|
# /!\ This script should be run as root
|
||||||
#
|
#
|
||||||
|
# NOTE: This script will not work on Mac OSX.
|
||||||
|
# It supports Debian and Ubuntu Linux.
|
||||||
|
#
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|
||||||
die () {
|
die () {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user