From 31707f25c78faa65a9d842e51e1fb8e647c3f84a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 15:59:29 +0100 Subject: [PATCH 01/37] Sentinel: implement auth-user directive for ACLs. --- src/sentinel.c | 45 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 10c003d03..83d6c00bb 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -205,7 +205,8 @@ typedef struct sentinelRedisInstance { dict *slaves; /* Slaves for this master instance. */ unsigned int quorum;/* Number of sentinels that need to agree on failure. */ int parallel_syncs; /* How many slaves to reconfigure at same time. */ - char *auth_pass; /* Password to use for AUTH against master & slaves. */ + char *auth_pass; /* Password to use for AUTH against master & replica. */ + char *auth_user; /* Username for ACLs AUTH against master & replica. */ /* Slave specific. */ mstime_t master_link_down_time; /* Slave replication link down time. */ @@ -1231,6 +1232,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * SENTINEL_DEFAULT_DOWN_AFTER; ri->master_link_down_time = 0; ri->auth_pass = NULL; + ri->auth_user = NULL; ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; ri->slave_reconf_sent_time = 0; ri->slave_master_host = NULL; @@ -1289,6 +1291,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { sdsfree(ri->slave_master_host); sdsfree(ri->leader); sdsfree(ri->auth_pass); + sdsfree(ri->auth_user); sdsfree(ri->info); releaseSentinelAddr(ri->addr); dictRelease(ri->renamed_commands); @@ -1654,19 +1657,19 @@ char *sentinelHandleConfiguration(char **argv, int argc) { ri->failover_timeout = atoi(argv[2]); if (ri->failover_timeout <= 0) return "negative or zero time parameter."; - } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { + } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { /* parallel-syncs */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->parallel_syncs = atoi(argv[2]); - } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { /* notification-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; if (access(argv[2],X_OK) == -1) return "Notification script seems non existing or non executable."; ri->notification_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { /* client-reconfig-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; @@ -1674,11 +1677,16 @@ char *sentinelHandleConfiguration(char **argv, int argc) { return "Client reconfiguration script seems non existing or " "non executable."; ri->client_reconfig_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { + } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { /* auth-pass */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->auth_pass = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) { + /* auth-user */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->auth_user = sdsnew(argv[2]); } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) { /* current-epoch */ unsigned long long current_epoch = strtoull(argv[1],NULL,10); @@ -1836,7 +1844,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } - /* sentinel auth-pass */ + /* sentinel auth-pass & auth-user */ if (master->auth_pass) { line = sdscatprintf(sdsempty(), "sentinel auth-pass %s %s", @@ -1844,6 +1852,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } + if (master->auth_user) { + line = sdscatprintf(sdsempty(), + "sentinel auth-user %s %s", + master->name, master->auth_user); + rewriteConfigRewriteLine(state,"sentinel",line,1); + } + /* sentinel config-epoch */ line = sdscatprintf(sdsempty(), "sentinel config-epoch %s %llu", @@ -1968,19 +1983,29 @@ werr: * will disconnect and reconnect the link and so forth. */ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { char *auth_pass = NULL; + char *auth_user = NULL; if (ri->flags & SRI_MASTER) { auth_pass = ri->auth_pass; + auth_user = ri->auth_user; } else if (ri->flags & SRI_SLAVE) { auth_pass = ri->master->auth_pass; + auth_user = ri->master->auth_user; } else if (ri->flags & SRI_SENTINEL) { auth_pass = ACLDefaultUserFirstPassword(); + auth_user = NULL; } - if (auth_pass) { + if (auth_pass && auth_user == NULL) { if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", sentinelInstanceMapCommand(ri,"AUTH"), auth_pass) == C_OK) ri->link->pending_commands++; + } else if (auth_pass && auth_user) { + /* If we also have an username, use the ACL-style AUTH command + * with two arguments, username and password. */ + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"AUTH"), + auth_user, auth_pass) == C_OK) ri->link->pending_commands++; } } @@ -3522,6 +3547,12 @@ void sentinelSetCommand(client *c) { sdsfree(ri->auth_pass); ri->auth_pass = strlen(value) ? sdsnew(value) : NULL; changes++; + } else if (!strcasecmp(option,"auth-user") && moreargs > 0) { + /* auth-user */ + char *value = c->argv[++j]->ptr; + sdsfree(ri->auth_user); + ri->auth_user = strlen(value) ? sdsnew(value) : NULL; + changes++; } else if (!strcasecmp(option,"quorum") && moreargs > 0) { /* quorum */ robj *o = c->argv[++j]; From d9f9a29afaf49c07a002b091d97cb6013477b951 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 16:56:50 +0100 Subject: [PATCH 02/37] ACL: Make Redis 6 more backward compatible with requirepass. Note that this as a side effect fixes Sentinel "requirepass" mode. --- src/acl.c | 11 +---------- src/config.c | 16 ++++++++++++---- src/sentinel.c | 2 +- src/server.h | 3 +++ 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/acl.c b/src/acl.c index efe6b96ad..27f4bdb84 100644 --- a/src/acl.c +++ b/src/acl.c @@ -899,16 +899,6 @@ char *ACLSetUserStringError(void) { return errmsg; } -/* Return the first password of the default user or NULL. - * This function is needed for backward compatibility with the old - * directive "requirepass" when Redis supported a single global - * password. */ -sds ACLDefaultUserFirstPassword(void) { - if (listLength(DefaultUser->passwords) == 0) return NULL; - listNode *first = listFirst(DefaultUser->passwords); - return listNodeValue(first); -} - /* Initialize the default user, that will always exist for all the process * lifetime. */ void ACLInitDefaultUser(void) { @@ -925,6 +915,7 @@ void ACLInit(void) { UsersToLoad = listCreate(); ACLLog = listCreate(); ACLInitDefaultUser(); + server.requirepass = NULL; /* Only used for backward compatibility. */ } /* Check the username and password pair and return C_OK if they are valid, diff --git a/src/config.c b/src/config.c index 211b6d003..7c87ebe6e 100644 --- a/src/config.c +++ b/src/config.c @@ -411,11 +411,15 @@ void loadServerConfigFromString(char *config) { goto loaderr; } /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",argv[1]); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(server.requirepass); + server.requirepass = sdsnew(argv[1]); } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ /* DEAD OPTION */ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) { @@ -623,11 +627,15 @@ void configSetCommand(client *c) { config_set_special_field("requirepass") { if (sdslen(o->ptr) > CONFIG_AUTHPASS_MAX_LEN) goto badfmt; /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",(char*)o->ptr); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(server.requirepass); + server.requirepass = sdsnew(o->ptr); } config_set_special_field("save") { int vlen, j; sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); @@ -899,7 +907,7 @@ void configGetCommand(client *c) { } if (stringmatch(pattern,"requirepass",1)) { addReplyBulkCString(c,"requirepass"); - sds password = ACLDefaultUserFirstPassword(); + sds password = server.requirepass; if (password) { addReplyBulkCBuffer(c,password,sdslen(password)); } else { @@ -1341,7 +1349,7 @@ void rewriteConfigBindOption(struct rewriteConfigState *state) { void rewriteConfigRequirepassOption(struct rewriteConfigState *state, char *option) { int force = 1; sds line; - sds password = ACLDefaultUserFirstPassword(); + sds password = server.requirepass; /* If there is no password set, we don't want the requirepass option * to be present in the configuration at all. */ diff --git a/src/sentinel.c b/src/sentinel.c index 83d6c00bb..d091bf230 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -1992,7 +1992,7 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { auth_pass = ri->master->auth_pass; auth_user = ri->master->auth_user; } else if (ri->flags & SRI_SENTINEL) { - auth_pass = ACLDefaultUserFirstPassword(); + auth_pass = server.requirepass; auth_user = NULL; } diff --git a/src/server.h b/src/server.h index 3c19a17ea..fa6770dfa 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,6 +1395,9 @@ struct redisServer { /* ACLs */ char *acl_filename; /* ACL Users file. NULL if not configured. */ unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */ + sds requirepass; /* Remember the cleartext password set with the + old "requirepass" directive for backward + compatibility with Redis <= 5. */ /* Assert & bug reporting */ const char *assert_failed; const char *assert_file; From 63e4a4701069a28e97c8b93c0c1ea91ded99e90f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 17:11:43 +0100 Subject: [PATCH 03/37] Sentinel: document auth-user directive. --- sentinel.conf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sentinel.conf b/sentinel.conf index 796f45088..4ca5e5f8f 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -102,6 +102,18 @@ sentinel monitor mymaster 127.0.0.1 6379 2 # # sentinel auth-pass mymaster MySUPER--secret-0123passw0rd +# sentinel auth-user +# +# This is useful in order to authenticate to instances having ACL capabilities, +# that is, running Redis 6.0 or greater. When just auth-pass is provided the +# Sentinel instance will authenticate to Redis using the old "AUTH " +# method. When also an username is provided, it will use "AUTH ". +# In the Redis servers side, the ACL to provide just minimal access to +# Sentinel instances, should be configured along the following lines: +# +# user sentinel-user >somepassword +client +subscribe +publish \ +# +ping +info +multi +slaveof +config +client +exec on + # sentinel down-after-milliseconds # # Number of milliseconds the master (or any attached replica or sentinel) should From 21c22bcd9adf0844a798859b4d7a39311207a428 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 20 Mar 2020 12:45:48 +0100 Subject: [PATCH 04/37] ACL: default user off should not allow automatic authentication. This fixes issue #7011. --- src/networking.c | 3 ++- src/server.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0690bbdf6..69d59a59b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -124,7 +124,8 @@ client *createClient(connection *conn) { c->ctime = c->lastinteraction = server.unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ - c->authenticated = (c->user->flags & USER_FLAG_NOPASS) != 0; + c->authenticated = (c->user->flags & USER_FLAG_NOPASS) && + !(c->user->flags & USER_FLAG_DISABLED); c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; diff --git a/src/server.c b/src/server.c index f702da94a..612805ce5 100644 --- a/src/server.c +++ b/src/server.c @@ -3380,7 +3380,7 @@ int processCommand(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || - DefaultUser->flags & USER_FLAG_DISABLED) && + (DefaultUser->flags & USER_FLAG_DISABLED)) && !c->authenticated; if (auth_required) { /* AUTH and HELLO and no auth modules are valid even in From 91af3f73e1fe1c32870d676e7958463d443a7d43 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 20 Mar 2020 12:52:06 +0100 Subject: [PATCH 05/37] Regression test for #7011. --- tests/unit/acl.tcl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index fc1664a75..85c9b81a9 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -248,4 +248,11 @@ start_server {tags {"acl"}} { r AUTH default "" assert {[llength [r ACL LOG]] == 5} } + + test {When default user is off, new connections are not authenticated} { + r ACL setuser default off + catch {set rd1 [redis_deferring_client]} e + r ACL setuser default on + set e + } {*NOAUTH*} } From 23ccd476b7833ba0e7d9228d8448b43e00d3df51 Mon Sep 17 00:00:00 2001 From: WuYunlong Date: Wed, 18 Mar 2020 16:17:46 +0800 Subject: [PATCH 06/37] Add 14-consistency-check.tcl to prove there is a data consistency issue. --- tests/cluster/tests/14-consistency-check.tcl | 87 ++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 tests/cluster/tests/14-consistency-check.tcl diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl new file mode 100644 index 000000000..a43725ebc --- /dev/null +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -0,0 +1,87 @@ +source "../tests/includes/init-tests.tcl" + +test "Create a 5 nodes cluster" { + create_cluster 5 5 +} + +test "Cluster should start ok" { + assert_cluster_state ok +} + +test "Cluster is writable" { + cluster_write_test 0 +} + +proc find_non_empty_master {} { + set master_id_no {} + foreach_redis_id id { + if {[RI $id role] eq {master} && [R $id dbsize] > 0} { + set master_id_no $id + } + } + return $master_id_no +} + +proc get_one_of_my_replica {id} { + set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1] + set replica_id_num [get_instance_id_by_port redis $replica_port] + return $replica_id_num +} + +proc cluster_write_keys_with_expire {id ttl} { + set prefix [randstring 20 20 alpha] + set port [get_instance_attrib redis $id port] + set cluster [redis_cluster 127.0.0.1:$port] + for {set j 100} {$j < 200} {incr j} { + $cluster setex key_expire.$j $ttl $prefix.$j + } + $cluster close +} + +proc test_slave_load_expired_keys {aof} { + test "Slave expired keys is loaded when restarted: appendonly=$aof" { + set master_id [find_non_empty_master] + set replica_id [get_one_of_my_replica $master_id] + + set master_dbsize [R $master_id dbsize] + set slave_dbsize [R $replica_id dbsize] + assert_equal $master_dbsize $slave_dbsize + + set data_ttl 5 + cluster_write_keys_with_expire $master_id $data_ttl + after 100 + set replica_dbsize_1 [R $replica_id dbsize] + assert {$replica_dbsize_1 > $slave_dbsize} + + R $replica_id config set appendonly $aof + R $replica_id config rewrite + + set start_time [clock seconds] + set end_time [expr $start_time+$data_ttl+2] + R $replica_id save + set replica_dbsize_2 [R $replica_id dbsize] + assert {$replica_dbsize_2 > $slave_dbsize} + kill_instance redis $replica_id + + set master_port [get_instance_attrib redis $master_id port] + exec ../../../src/redis-cli -h 127.0.0.1 -p $master_port debug sleep [expr $data_ttl+3] > /dev/null & + + while {[clock seconds] <= $end_time} { + #wait for $data_ttl seconds + } + restart_instance redis $replica_id + + wait_for_condition 200 50 { + [R $replica_id ping] eq {PONG} + } else { + fail "replica #$replica_id not started" + } + + set replica_dbsize_3 [R $replica_id dbsize] + assert {$replica_dbsize_3 > $slave_dbsize} + } +} + +test_slave_load_expired_keys no +after 5000 +test_slave_load_expired_keys yes From 37c6571a6ce4f3abcf79b77f701dd631c6c55563 Mon Sep 17 00:00:00 2001 From: WuYunlong Date: Wed, 18 Mar 2020 16:20:10 +0800 Subject: [PATCH 07/37] Fix master replica inconsistency for upgrading scenario. Before this commit, when upgrading a replica, expired keys will not be loaded, thus causing replica having less keys in db. To this point, master and replica's keys is logically consistent. However, before the keys in master and replica are physically consistent, that is, they have the same dbsize, if master got a problem and the replica got promoted and becomes new master of that partition, and master updates a key which does not exist on master, but physically exists on the old master(new replica), the old master would refuse to update the key, thus causing master and replica data inconsistent. How could this happen? That's all because of the wrong judgement of roles while starting up the server. We can not use server.masterhost to judge if the server is master or replica, since it fails in cluster mode. When we start the server, we load rdb and do want to load expired keys, and do not want to have the ability to active expire keys, if it is a replica. --- src/rdb.c | 2 +- src/server.c | 7 ++++++- src/server.h | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index cbcea96c6..5d34f5a32 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2231,7 +2231,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { + if (iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { diff --git a/src/server.c b/src/server.c index 612805ce5..4b010b870 100644 --- a/src/server.c +++ b/src/server.c @@ -1691,7 +1691,7 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { - if (server.masterhost == NULL) { + if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); @@ -4863,6 +4863,11 @@ int redisIsSupervised(int mode) { return 0; } +int iAmMaster(void) { + return ((!server.cluster_enabled && server.masterhost == NULL) || + (server.cluster_enabled && nodeIsMaster(server.cluster->myself))); +} + int main(int argc, char **argv) { struct timeval tv; diff --git a/src/server.h b/src/server.h index fa6770dfa..fdfe5b8ea 100644 --- a/src/server.h +++ b/src/server.h @@ -2393,4 +2393,6 @@ int tlsConfigure(redisTLSContextConfig *ctx_config); #define redisDebugMark() \ printf("-- MARK %s:%d --\n", __FILE__, __LINE__) +int iAmMaster(void); + #endif From 799a7dd8acbeef65ee0a81bf9052765fbcbc5172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=8A=B9=ED=98=84?= Date: Wed, 18 Mar 2020 14:40:50 +0900 Subject: [PATCH 08/37] Update redis.conf --- redis.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis.conf b/redis.conf index c9d256bef..7c55a3ab0 100644 --- a/redis.conf +++ b/redis.conf @@ -1628,7 +1628,7 @@ hz 10 # offers, and enables by default, the ability to use an adaptive HZ value # which will temporary raise when there are many connected clients. # -# When dynamic HZ is enabled, the actual configured HZ will be used as +# When dynamic HZ is enabled, the actual configured HZ will be used # as a baseline, but multiples of the configured HZ value will be actually # used as needed once more clients are connected. In this way an idle # instance will use very little CPU time while a busy instance will be From 5ccdb7a5be29a8f328bf530283d21b1abe4bd294 Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Feb 2020 10:46:10 +0100 Subject: [PATCH 09/37] Support Redis Cluster Proxy PROXY INFO command --- src/redis-cli.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7ad80c0a1..7e440d67c 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1292,7 +1292,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { (argc == 3 && !strcasecmp(command,"latency") && !strcasecmp(argv[1],"graph")) || (argc == 2 && !strcasecmp(command,"latency") && - !strcasecmp(argv[1],"doctor"))) + !strcasecmp(argv[1],"doctor")) || + /* Format PROXY INFO command for Redis Cluster Proxy: + * https://github.com/artix75/redis-cluster-proxy */ + (argc >= 2 && !strcasecmp(command,"proxy") && + !strcasecmp(argv[1],"info"))) { output_raw = 1; } From 04d838274f68edbe0a89d20d4bdebe3089c42d6e Mon Sep 17 00:00:00 2001 From: hwware Date: Fri, 20 Mar 2020 02:40:54 -0400 Subject: [PATCH 10/37] add missing commands in cluster help --- src/cluster.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 5f63d2b8f..72755823a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4261,7 +4261,7 @@ void clusterCommand(client *c) { "FORGET -- Remove a node from the cluster.", "GETKEYSINSLOT -- Return key names stored by current node in a slot.", "FLUSHSLOTS -- Delete current node own slots information.", -"INFO - Return onformation about the cluster.", +"INFO - Return information about the cluster.", "KEYSLOT -- Return the hash slot for .", "MEET [bus-port] -- Connect nodes into a working cluster.", "MYID -- Return the node id.", @@ -4272,6 +4272,7 @@ void clusterCommand(client *c) { "SET-config-epoch - Set config epoch of current node.", "SETSLOT (importing|migrating|stable|node ) -- Set slot state.", "REPLICAS -- Return replicas.", +"SAVECONFIG - Force saving cluster configuration on disk.", "SLOTS -- Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL From 9c9ef6fb9b8063291af9cd947d74dc89d32da3ce Mon Sep 17 00:00:00 2001 From: hwware Date: Mon, 23 Mar 2020 01:04:49 -0400 Subject: [PATCH 11/37] clean CLIENT_TRACKING_CACHING flag when disabled caching --- src/tracking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 45f83103a..5c1f48cba 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -94,7 +94,7 @@ void disableTracking(client *c) { server.tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| - CLIENT_TRACKING_OPTOUT); + CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING); } } From 4a0249c0c858864cd835e532e119b521bdbf8147 Mon Sep 17 00:00:00 2001 From: hwware Date: Mon, 23 Mar 2020 01:07:46 -0400 Subject: [PATCH 12/37] remove redundant Semicolon --- src/tracking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 5c1f48cba..6f7929430 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -271,7 +271,7 @@ void trackingInvalidateKey(robj *keyobj) { trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); - if (ids == raxNotFound) return;; + if (ids == raxNotFound) return; raxIterator ri; raxStart(&ri,ids); From 1a948d0c5bfeda39bacccff4dfc6057517a2b501 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:42:03 +0200 Subject: [PATCH 13/37] Conns: Fix connClose() / connAccept() behavior. We assume accept handlers may choose to reject a connection and close it, but connAccept() callers can't distinguish between this state and other error states requiring connClose(). This makes it safe (and mandatory!) to always call connClose() if connAccept() fails, and safe for accept handlers to close connections (which will defer). --- src/connection.c | 12 ++++++++--- src/connection.h | 15 ++++++++++---- src/connhelpers.h | 53 +++++++++++++++++++++++++---------------------- 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/connection.c b/src/connection.c index 58d86c31b..2015c9195 100644 --- a/src/connection.c +++ b/src/connection.c @@ -152,7 +152,7 @@ static void connSocketClose(connection *conn) { /* If called from within a handler, schedule the close but * keep the connection until the handler returns. */ - if (conn->flags & CONN_FLAG_IN_HANDLER) { + if (connHasRefs(conn)) { conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; return; } @@ -183,10 +183,16 @@ static int connSocketRead(connection *conn, void *buf, size_t buf_len) { } static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { + int ret = C_OK; + if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; conn->state = CONN_STATE_CONNECTED; - if (!callHandler(conn, accept_handler)) return C_ERR; - return C_OK; + + connIncrRefs(conn); + if (!callHandler(conn, accept_handler)) ret = C_ERR; + connDecrRefs(conn); + + return ret; } /* Register a write handler, to be called when the connection is writable. diff --git a/src/connection.h b/src/connection.h index 97622f8d6..db09dfd83 100644 --- a/src/connection.h +++ b/src/connection.h @@ -45,9 +45,8 @@ typedef enum { CONN_STATE_ERROR } ConnectionState; -#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */ -#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */ -#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */ +#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */ +#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -70,7 +69,8 @@ typedef struct ConnectionType { struct connection { ConnectionType *type; ConnectionState state; - int flags; + short int flags; + short int refs; int last_errno; void *private_data; ConnectionCallbackFunc conn_handler; @@ -88,6 +88,13 @@ struct connection { * connAccept() may directly call accept_handler(), or return and call it * at a later time. This behavior is a bit awkward but aims to reduce the need * to wait for the next event loop, if no additional handshake is required. + * + * IMPORTANT: accept_handler may decide to close the connection, calling connClose(). + * To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED + * in this case, and connAccept() returns with an error. + * + * connAccept() callers must always check the return value and on error (C_ERR) + * a connClose() must be called. */ static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) { diff --git a/src/connhelpers.h b/src/connhelpers.h index f237c9b1d..86250d09e 100644 --- a/src/connhelpers.h +++ b/src/connhelpers.h @@ -37,46 +37,49 @@ * implementations (currently sockets in connection.c and TLS in tls.c). * * Currently helpers implement the mechanisms for invoking connection - * handlers, tracking in-handler states and dealing with deferred - * destruction (if invoked by a handler). + * handlers and tracking connection references, to allow safe destruction + * of connections from within a handler. */ -/* Called whenever a handler is invoked on a connection and sets the - * CONN_FLAG_IN_HANDLER flag to indicate we're in a handler context. +/* Incremenet connection references. * - * An attempt to close a connection while CONN_FLAG_IN_HANDLER is - * set will result with deferred close, i.e. setting the CONN_FLAG_CLOSE_SCHEDULED - * instead of destructing it. + * Inside a connection handler, we guarantee refs >= 1 so it is always + * safe to connClose(). + * + * In other cases where we don't want to prematurely lose the connection, + * it can go beyond 1 as well; currently it is only done by connAccept(). */ -static inline void enterHandler(connection *conn) { - conn->flags |= CONN_FLAG_IN_HANDLER; +static inline void connIncrRefs(connection *conn) { + conn->refs++; } -/* Called whenever a handler returns. This unsets the CONN_FLAG_IN_HANDLER - * flag and performs actual close/destruction if a deferred close was - * scheduled by the handler. +/* Decrement connection references. + * + * Note that this is not intended to provide any automatic free logic! + * callHandler() takes care of that for the common flows, and anywhere an + * explicit connIncrRefs() is used, the caller is expected to take care of + * that. */ -static inline int exitHandler(connection *conn) { - conn->flags &= ~CONN_FLAG_IN_HANDLER; - if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); - return 0; - } - return 1; + +static inline void connDecrRefs(connection *conn) { + conn->refs--; +} + +static inline int connHasRefs(connection *conn) { + return conn->refs; } /* Helper for connection implementations to call handlers: - * 1. Mark the handler in use. + * 1. Increment refs to protect the connection. * 2. Execute the handler (if set). - * 3. Mark the handler as NOT in use and perform deferred close if was - * requested by the handler at any time. + * 3. Decrement refs and perform deferred close, if refs==0. */ static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) { - conn->flags |= CONN_FLAG_IN_HANDLER; + connIncrRefs(conn); if (handler) handler(conn); - conn->flags &= ~CONN_FLAG_IN_HANDLER; + connDecrRefs(conn); if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); + if (!connHasRefs(conn)) connClose(conn); return 0; } return 1; From 1f1d642e01c33e1171a186d49fb20d8b52ab2ae0 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:46:16 +0200 Subject: [PATCH 14/37] Cluster: fix misleading accept errors. --- src/cluster.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 72755823a..a2e9ff5b6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -681,9 +681,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * or schedule it for later depending on connection implementation. */ if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) { - serverLog(LL_VERBOSE, - "Error accepting cluster node connection: %s", - connGetLastError(conn)); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_VERBOSE, + "Error accepting cluster node connection: %s", + connGetLastError(conn)); connClose(conn); return; } From 700126e9cfe755c43eff0ee41a0fec0332a4b3a2 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:47:44 +0200 Subject: [PATCH 15/37] Fix crashes related to failed/rejected accepts. --- src/networking.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 69d59a59b..a550e4040 100644 --- a/src/networking.c +++ b/src/networking.c @@ -786,7 +786,7 @@ void clientAcceptHandler(connection *conn) { serverLog(LL_WARNING, "Error accepting a client connection: %s", connGetLastError(conn)); - freeClient(c); + freeClientAsync(c); return; } @@ -828,7 +828,7 @@ void clientAcceptHandler(connection *conn) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; - freeClient(c); + freeClientAsync(c); return; } } @@ -887,9 +887,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) { */ if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; - serverLog(LL_WARNING, - "Error accepting a client connection: %s (conn: %s)", - connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_WARNING, + "Error accepting a client connection: %s (conn: %s)", + connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } From 2dcae610875846e2fdf781764e02fd404fc73f73 Mon Sep 17 00:00:00 2001 From: hwware Date: Wed, 18 Mar 2020 09:33:52 -0400 Subject: [PATCH 16/37] fix potentical memory leak in redis-cli --- src/redis-cli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e440d67c..72480d08c 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1993,6 +1993,8 @@ static void repl(void) { if (config.eval) { config.eval_ldb = 1; config.output = OUTPUT_RAW; + sdsfreesplitres(argv,argc); + linenoiseFree(line); return; /* Return to evalMode to restart the session. */ } else { printf("Use 'restart' only in Lua debugging mode."); From 6680c067057d6e130e9ff4d388e1510a27ee5884 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 18 Mar 2020 18:34:27 +0530 Subject: [PATCH 17/37] Allow RM_GetContextFlags to work with ctx==NULL --- src/module.c | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/module.c b/src/module.c index 74da6c24d..d6b055f78 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,20 +1848,22 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { int flags = 0; /* Client specific flags */ - if (ctx->client) { - if (ctx->client->flags & CLIENT_LUA) - flags |= REDISMODULE_CTX_FLAGS_LUA; - if (ctx->client->flags & CLIENT_MULTI) - flags |= REDISMODULE_CTX_FLAGS_MULTI; - /* Module command recieved from MASTER, is replicated. */ - if (ctx->client->flags & CLIENT_MASTER) - flags |= REDISMODULE_CTX_FLAGS_REPLICATED; - } + if (ctx) { + if (ctx->client) { + if (ctx->client->flags & CLIENT_LUA) + flags |= REDISMODULE_CTX_FLAGS_LUA; + if (ctx->client->flags & CLIENT_MULTI) + flags |= REDISMODULE_CTX_FLAGS_MULTI; + /* Module command recieved from MASTER, is replicated. */ + if (ctx->client->flags & CLIENT_MASTER) + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; + } - /* For DIRTY flags, we need the blocked client if used */ - client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; - if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { - flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + /* For DIRTY flags, we need the blocked client if used */ + client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; + if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { + flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + } } if (server.cluster_enabled) From 5a13e0feb172eeef62a6d8575b8402d72ab63ea7 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:17:50 +0100 Subject: [PATCH 18/37] Modules: updated function doc after #7003. --- src/module.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index d6b055f78..6f61a5ca8 100644 --- a/src/module.c +++ b/src/module.c @@ -1795,7 +1795,12 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * current request context (whether the client is a Lua script or in a MULTI), * and about the Redis instance in general, i.e replication and persistence. * - * The available flags are: + * It is possible to call this function even with a NULL context, however + * in this case the following flags will not be reported: + * + * * LUA, MULTI, REPLICATED, DIRTY (see below for more info). + * + * Available flags and their meaning: * * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script * From 015d1cb2ff91a83d1a5f790cdfaf3e57d4970f5c Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Wed, 4 Mar 2020 20:51:45 +0800 Subject: [PATCH 19/37] Added BITFIELD_RO variants for read-only operations. --- src/bitops.c | 19 ++++++++++++++++++- src/server.c | 4 ++++ src/server.h | 1 + tests/unit/bitfield.tcl | 31 +++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/bitops.c b/src/bitops.c index ee1ce0460..ffb330013 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -902,6 +902,9 @@ void bitposCommand(client *c) { * OVERFLOW [WRAP|SAT|FAIL] */ +#define BITFIELD_COMMON (1<<0) +#define BITFIELD_READONLY (1<<1) + struct bitfieldOp { uint64_t offset; /* Bitfield offset. */ int64_t i64; /* Increment amount (INCRBY) or SET value */ @@ -911,7 +914,7 @@ struct bitfieldOp { int sign; /* True if signed, otherwise unsigned op. */ }; -void bitfieldCommand(client *c) { +void bitfieldGeneric(client *c, int flags) { robj *o; size_t bitoffset; int j, numops = 0, changes = 0; @@ -999,6 +1002,12 @@ void bitfieldCommand(client *c) { return; } } else { + if (flags & BITFIELD_READONLY) { + zfree(ops); + addReplyError(c, "bitfield_ro only support get subcommand"); + return; + } + /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, @@ -1129,3 +1138,11 @@ void bitfieldCommand(client *c) { } zfree(ops); } + +void bitfieldCommand(client *c) { + bitfieldGeneric(c, BITFIELD_COMMON); +} + +void bitfieldroCommand(client *c) { + bitfieldGeneric(c, BITFIELD_READONLY); +} diff --git a/src/server.c b/src/server.c index 4b010b870..f5fb339f9 100644 --- a/src/server.c +++ b/src/server.c @@ -238,6 +238,10 @@ struct redisCommand redisCommandTable[] = { "write use-memory @bitmap", 0,NULL,1,1,1,0,0,0}, + {"bitfield_ro",bitfieldroCommand,-2, + "read-only fast @bitmap", + 0,NULL,1,1,1,0,0,0}, + {"setrange",setrangeCommand,4, "write use-memory @string", 0,NULL,1,1,1,0,0,0}, diff --git a/src/server.h b/src/server.h index fdfe5b8ea..ed4707d66 100644 --- a/src/server.h +++ b/src/server.h @@ -2177,6 +2177,7 @@ void existsCommand(client *c); void setbitCommand(client *c); void getbitCommand(client *c); void bitfieldCommand(client *c); +void bitfieldroCommand(client *c); void setrangeCommand(client *c); void getrangeCommand(client *c); void incrCommand(client *c); diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl index d76452b1b..819d8f36d 100644 --- a/tests/unit/bitfield.tcl +++ b/tests/unit/bitfield.tcl @@ -199,3 +199,34 @@ start_server {tags {"bitops"}} { r del mystring } } + +start_server {tags {"repl"}} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + test {setup slave} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + } + + test {write on master, read on slave} { + $master del bits + assert_equal 0 [$master bitfield bits set u8 0 255] + assert_equal 255 [$master bitfield bits set u8 0 100] + wait_for_ofs_sync $master $slave + assert_equal 100 [$slave bitfield_ro bits get u8 0] + } + + test {bitfield_ro with write option} { + catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err + assert_match {*ERR bitfield_ro only support get subcommand*} $err + } + } +} From 243b26d97dbb607968e972dc01a23ac7eefc3722 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:28:09 +0100 Subject: [PATCH 20/37] Minor changes to BITFIELD_RO PR #6951. --- src/bitops.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index ffb330013..d4e82c937 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -902,8 +902,8 @@ void bitposCommand(client *c) { * OVERFLOW [WRAP|SAT|FAIL] */ -#define BITFIELD_COMMON (1<<0) -#define BITFIELD_READONLY (1<<1) +#define BITFIELD_FLAG_NONE 0 +#define BITFIELD_FLAG_READONLY (1<<0) struct bitfieldOp { uint64_t offset; /* Bitfield offset. */ @@ -914,6 +914,9 @@ struct bitfieldOp { int sign; /* True if signed, otherwise unsigned op. */ }; +/* This implements both the BITFIELD command and the BITFIELD_RO command + * when flags is set to BITFIELD_FLAG_READONLY: in this case only the + * GET subcommand is allowed, other subcommands will return an error. */ void bitfieldGeneric(client *c, int flags) { robj *o; size_t bitoffset; @@ -1002,9 +1005,9 @@ void bitfieldGeneric(client *c, int flags) { return; } } else { - if (flags & BITFIELD_READONLY) { + if (flags & BITFIELD_FLAG_READONLY) { zfree(ops); - addReplyError(c, "bitfield_ro only support get subcommand"); + addReplyError(c, "BITFIELD_RO only support the GET subcommand"); return; } @@ -1140,9 +1143,9 @@ void bitfieldGeneric(client *c, int flags) { } void bitfieldCommand(client *c) { - bitfieldGeneric(c, BITFIELD_COMMON); + bitfieldGeneric(c, BITFIELD_FLAG_NONE); } void bitfieldroCommand(client *c) { - bitfieldGeneric(c, BITFIELD_READONLY); + bitfieldGeneric(c, BITFIELD_FLAG_READONLY); } From 6a1a5cb2a163710b021989dbea998f450b2d506c Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:47:37 +0100 Subject: [PATCH 21/37] Abort transactions after -READONLY error. Fix #7014. --- src/server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.c b/src/server.c index f5fb339f9..ddc90b3dd 100644 --- a/src/server.c +++ b/src/server.c @@ -3509,6 +3509,7 @@ int processCommand(client *c) { !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { + flagTransaction(c); addReply(c, shared.roslaveerr); return C_OK; } From 4cfceac287c5987b0b6c6c853ac990d8f0630eb0 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 12:00:46 +0100 Subject: [PATCH 22/37] Fix BITFIELD_RO test. --- src/bitops.c | 2 +- tests/unit/bitfield.tcl | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index d4e82c937..f78e4fd34 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -1007,7 +1007,7 @@ void bitfieldGeneric(client *c, int flags) { } else { if (flags & BITFIELD_FLAG_READONLY) { zfree(ops); - addReplyError(c, "BITFIELD_RO only support the GET subcommand"); + addReplyError(c, "BITFIELD_RO only supports the GET subcommand"); return; } diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl index 819d8f36d..1f2f6395e 100644 --- a/tests/unit/bitfield.tcl +++ b/tests/unit/bitfield.tcl @@ -207,7 +207,7 @@ start_server {tags {"repl"}} { set master_port [srv -1 port] set slave [srv 0 client] - test {setup slave} { + test {BITFIELD: setup slave} { $slave slaveof $master_host $master_port wait_for_condition 50 100 { [s 0 master_link_status] eq {up} @@ -216,7 +216,7 @@ start_server {tags {"repl"}} { } } - test {write on master, read on slave} { + test {BITFIELD: write on master, read on slave} { $master del bits assert_equal 0 [$master bitfield bits set u8 0 255] assert_equal 255 [$master bitfield bits set u8 0 100] @@ -224,9 +224,9 @@ start_server {tags {"repl"}} { assert_equal 100 [$slave bitfield_ro bits get u8 0] } - test {bitfield_ro with write option} { + test {BITFIELD_RO fails when write option is used} { catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err - assert_match {*ERR bitfield_ro only support get subcommand*} $err + assert_match {*ERR BITFIELD_RO only supports the GET subcommand*} $err } } } From 484a14ebde9fce00d936f25e45f9adc82405179e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 16:17:35 +0100 Subject: [PATCH 23/37] Improve comments of replicationCacheMasterUsingMyself(). --- src/replication.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 31e14d7fe..49c38f73f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2725,9 +2725,14 @@ void replicationCacheMaster(client *c) { * current offset if no data was lost during the failover. So we use our * current replication ID and offset in order to synthesize a cached master. */ void replicationCacheMasterUsingMyself(void) { + /* This will be used to populate the field server.master->reploff + * by replicationCreateMasterClient(). We'll later set the created + * master as server.cached_master, so the replica will use such + * offset for PSYNC. */ + server.master_initial_offset = server.master_repl_offset; + /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ - server.master_initial_offset = server.master_repl_offset; replicationCreateMasterClient(NULL,-1); /* Use our own ID / offset. */ From e1e2f91589d07b13721c5d8834b667745713a80d Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 23 Mar 2020 20:13:52 +0200 Subject: [PATCH 24/37] MULTI/EXEC during LUA script timeout are messed up Redis refusing to run MULTI or EXEC during script timeout may cause partial transactions to run. 1) if the client sends MULTI+commands+EXEC in pipeline without waiting for response, but these arrive to the shards partially while there's a busy script, and partially after it eventually finishes: we'll end up running only part of the transaction (since multi was ignored, and exec would fail). 2) similar to the above if EXEC arrives during busy script, it'll be ignored and the client state remains in a transaction. the 3rd test which i added for a case where MULTI and EXEC are ok, and only the body arrives during busy script was already handled correctly since processCommand calls flagTransaction --- src/server.c | 1 + tests/unit/multi.tcl | 72 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/src/server.c b/src/server.c index ddc90b3dd..5a54a3abb 100644 --- a/src/server.c +++ b/src/server.c @@ -3553,6 +3553,7 @@ int processCommand(client *c) { c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && + c->cmd->proc != multiCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 9fcef71d6..55f18bec8 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -320,4 +320,76 @@ start_server {tags {"multi"}} { $rd close r ping } {PONG} + + test {MULTI and script timeout} { + # check that if MULTI arrives during timeout, it is either refused, or + # allowed to pass, and we don't end up executing half of the transaction + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {EXEC and script timeout} { + # check that if EXEC arrives during timeout, we don't end up executing + # half of the transaction, and also that we exit the multi state + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {MULTI-EXEC body and script timeout} { + # check that we don't run an imcomplete transaction due to some commands + # arriving during busy script + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } } From 45ba72ad0142f5631e3b1dc551f2bb3283065482 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2020 12:46:59 +0100 Subject: [PATCH 25/37] Explain why we allow transactions in -BUSY state. Related to #7022. --- src/server.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/server.c b/src/server.c index 5a54a3abb..65a01db57 100644 --- a/src/server.c +++ b/src/server.c @@ -3548,12 +3548,19 @@ int processCommand(client *c) { return C_OK; } - /* Lua script too slow? Only allow a limited number of commands. */ + /* Lua script too slow? Only allow a limited number of commands. + * Note that we need to allow the transactions commands, otherwise clients + * sending a transaction with pipelining without error checking, may have + * the MULTI plus a few initial commands refused, then the timeout + * condition resolves, and the bottom-half of the transaction gets + * executed, see Github PR #7022. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && - c->cmd->proc != multiCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && + c->cmd->proc != multiCommand && + c->cmd->proc != execCommand && + c->cmd->proc != discardCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && From ce73158a9caa37accc8d0713ad446afcfb5a1e28 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Mar 2020 11:02:40 +0100 Subject: [PATCH 26/37] PSYNC2: meaningful offset implemented. A very commonly signaled operational problem with Redis master-replicas sets is that, once the master becomes unavailable for some reason, especially because of network problems, many times it wont be able to perform a partial resynchronization with the new master, once it rejoins the partition, for the following reason: 1. The master becomes isolated, however it keeps sending PINGs to the replicas. Such PINGs will never be received since the link connection is actually already severed. 2. On the other side, one of the replicas will turn into the new master, setting its secondary replication ID offset to the one of the last command received from the old master: this offset will not include the PINGs sent by the master once the link was already disconnected. 3. When the master rejoins the partion and is turned into a replica, its offset will be too advanced because of the PINGs, so a PSYNC will fail, and a full synchronization will be required. Related to issue #7002 and other discussion we had in the past around this problem. --- src/replication.c | 36 +++++++++++++++++++++++++++++++++++- src/server.c | 4 ++++ src/server.h | 1 + 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 49c38f73f..e62afb2fb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -162,6 +162,7 @@ void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len; + server.master_repl_meaningful_offset = server.master_repl_offset; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ @@ -1768,6 +1769,7 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(server.replid,server.master->replid,sizeof(server.replid)); server.master_repl_offset = server.master->reploff; + server.master_repl_meaningful_offset = server.master->reploff; clearReplicationId2(); /* Let's create the replication backlog if needed. Slaves need to @@ -2725,12 +2727,37 @@ void replicationCacheMaster(client *c) { * current offset if no data was lost during the failover. So we use our * current replication ID and offset in order to synthesize a cached master. */ void replicationCacheMasterUsingMyself(void) { + serverLog(LL_NOTICE, + "Before turning into a replica, using my own master parameters " + "to synthesize a cached master: I may be able to synchronize with " + "the new master with just a partial transfer."); + /* This will be used to populate the field server.master->reploff * by replicationCreateMasterClient(). We'll later set the created * master as server.cached_master, so the replica will use such * offset for PSYNC. */ server.master_initial_offset = server.master_repl_offset; + /* However if the "meaningful" offset, that is the offset without + * the final PINGs in the stream, is different, use this instead: + * often when the master is no longer reachable, replicas will never + * receive the PINGs, however the master will end with an incremented + * offset because of the PINGs and will not be able to incrementally + * PSYNC with the new master. */ + if (server.master_repl_offset > server.master_repl_meaningful_offset) { + long long delta = server.master_repl_offset - + server.master_repl_meaningful_offset; + serverLog(LL_NOTICE, + "Using the meaningful offset %lld instead of %lld to exclude " + "the final PINGs (%lld bytes difference)", + server.master_repl_meaningful_offset, + server.master_repl_offset, + delta); + server.master_initial_offset = server.master_repl_meaningful_offset; + server.repl_backlog_histlen -= delta; + if (server.repl_backlog_histlen < 0) server.repl_backlog_histlen = 0; + } + /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ replicationCreateMasterClient(NULL,-1); @@ -2742,7 +2769,6 @@ void replicationCacheMasterUsingMyself(void) { unlinkClient(server.master); server.cached_master = server.master; server.master = NULL; - serverLog(LL_NOTICE,"Before turning into a replica, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); } /* Free a cached master, called when there are no longer the conditions for @@ -3122,10 +3148,18 @@ void replicationCron(void) { clientsArePaused(); if (!manual_failover_in_progress) { + long long before_ping = server.master_repl_meaningful_offset; ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); + /* The server.master_repl_meaningful_offset variable represents + * the offset of the replication stream without the pending PINGs. + * This is useful to set the right replication offset for PSYNC + * when the master is turned into a replica. Otherwise pending + * PINGs may not allow it to perform an incremental sync with the + * new master. */ + server.master_repl_meaningful_offset = before_ping; } } diff --git a/src/server.c b/src/server.c index 65a01db57..84439461e 100644 --- a/src/server.c +++ b/src/server.c @@ -2355,6 +2355,7 @@ void initServerConfig(void) { server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.master_repl_offset = 0; + server.master_repl_meaningful_offset = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -4398,6 +4399,7 @@ sds genRedisInfoString(const char *section) { "master_replid:%s\r\n" "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" + "master_repl_meaningful_offset:%lld\r\n" "second_repl_offset:%lld\r\n" "repl_backlog_active:%d\r\n" "repl_backlog_size:%lld\r\n" @@ -4406,6 +4408,7 @@ sds genRedisInfoString(const char *section) { server.replid, server.replid2, server.master_repl_offset, + server.master_repl_meaningful_offset, server.second_replid_offset, server.repl_backlog != NULL, server.repl_backlog_size, @@ -4783,6 +4786,7 @@ void loadDataFromDisk(void) { { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; + server.master_repl_meaningful_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ diff --git a/src/server.h b/src/server.h index ed4707d66..818fbc3b3 100644 --- a/src/server.h +++ b/src/server.h @@ -1241,6 +1241,7 @@ struct redisServer { char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ long long master_repl_offset; /* My current replication offset */ + long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */ long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ From 28d402d31bde0d4f149a7abfc408325e1221a8d0 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2020 15:43:34 +0100 Subject: [PATCH 27/37] PSYNC2: meaningful offset test. --- tests/integration/psync2-pingoff.tcl | 61 ++++++++++++++++++++++++++++ tests/test_helper.tcl | 1 + 2 files changed, 62 insertions(+) create mode 100644 tests/integration/psync2-pingoff.tcl diff --git a/tests/integration/psync2-pingoff.tcl b/tests/integration/psync2-pingoff.tcl new file mode 100644 index 000000000..1cea290e7 --- /dev/null +++ b/tests/integration/psync2-pingoff.tcl @@ -0,0 +1,61 @@ +# Test the meaningful offset implementation to make sure masters +# are able to PSYNC with replicas even if the replication stream +# has pending PINGs at the end. + +start_server {tags {"psync2"}} { +start_server {} { + # Config + set debug_msg 0 ; # Enable additional debug messages + + for {set j 0} {$j < 2} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + $R($j) CONFIG SET repl-ping-replica-period 1 + if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} + } + + # Setup replication + test "PSYNC2 meaningful offset: setup" { + $R(1) replicaof $R_host(0) $R_port(0) + $R(0) set foo bar + wait_for_condition 50 1000 { + [$R(0) dbsize] == 1 && [$R(1) dbsize] == 1 + } else { + fail "Replicas not replicating from master" + } + } + + test "PSYNC2 meaningful offset: write and wait replication" { + $R(0) INCR counter + $R(0) INCR counter + $R(0) INCR counter + wait_for_condition 50 1000 { + [$R(0) GET counter] eq [$R(1) GET counter] + } else { + fail "Master and replica don't agree about counter" + } + } + + # In this test we'll make sure the replica will get stuck, but with + # an active connection: this way the master will continue to send PINGs + # every second (we modified the PING period earlier) + test "PSYNC2 meaningful offset: pause replica and promote it" { + $R(1) MULTI + $R(1) DEBUG SLEEP 5 + $R(1) SLAVEOF NO ONE + $R(1) EXEC + $R(1) ping ; # Wait for it to return back available + } + + test "Make the old master a replica of the new one and check conditions" { + set sync_partial [status $R(1) sync_partial_ok] + assert {$sync_partial == 0} + $R(0) REPLICAOF $R_host(1) $R_port(1) + wait_for_condition 50 1000 { + [status $R(1) sync_partial_ok] == 1 + } else { + fail "The new master was not able to partial sync" + } + } +}} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 4dbead193..5cb43104b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -47,6 +47,7 @@ set ::all_tests { integration/logging integration/psync2 integration/psync2-reg + integration/psync2-pingoff unit/pubsub unit/slowlog unit/scripting From 2a13f59a18e0a2ba70013646f92e6d8239737eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=AF=E6=88=90?= Date: Thu, 1 Mar 2018 11:46:56 +0800 Subject: [PATCH 28/37] Boost up performance for redis PUB-SUB patterns matching If lots of clients PSUBSCRIBE to same patterns, multiple pattens matching will take place. This commit change it into just one single pattern matching by using a `dict *` to store the unique pattern and which clients subscribe to it. --- src/pubsub.c | 52 +++++++++++++++++++++++++++++++++++++++++----------- src/server.c | 1 + src/server.h | 1 + 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/pubsub.c b/src/pubsub.c index 5cb4298e0..6fa397704 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -206,6 +206,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { + dictEntry *de; + list *clients; int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -217,6 +219,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) { pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); + /* Add the client to the pattern -> list of clients hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + if (de == NULL) { + clients = listCreate(); + dictAdd(server.pubsub_patterns_dict,pattern,clients); + incrRefCount(pattern); + } else { + clients = dictGetVal(de); + } + listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); @@ -226,6 +238,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) { /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { + dictEntry *de; + list *clients; listNode *ln; pubsubPattern pat; int retval = 0; @@ -238,6 +252,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { pat.pattern = pattern; ln = listSearchKey(server.pubsub_patterns,&pat); listDelNode(server.pubsub_patterns,ln); + /* Remove the client from the pattern -> clients list hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + serverAssertWithInfo(c,NULL,de != NULL); + clients = dictGetVal(de); + ln = listSearchKey(clients,c); + serverAssertWithInfo(c,NULL,ln != NULL); + listDelNode(clients,ln); + if (listLength(clients) == 0) { + /* Free the list and associated hash entry at all if this was + * the latest client. */ + dictDelete(server.pubsub_patterns_dict,pattern); + } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); @@ -284,6 +310,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; + dictIterator *di; listNode *ln; listIter li; @@ -302,23 +329,26 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - if (listLength(server.pubsub_patterns)) { - listRewind(server.pubsub_patterns,&li); + di = dictGetIterator(server.pubsub_patterns_dict); + if (di) { channel = getDecodedObject(channel); - while ((ln = listNext(&li)) != NULL) { - pubsubPattern *pat = ln->value; - - if (stringmatchlen((char*)pat->pattern->ptr, - sdslen(pat->pattern->ptr), + while((de = dictNext(di)) != NULL) { + robj *pattern = dictGetKey(de); + list *clients = dictGetVal(de); + if (!stringmatchlen((char*)pattern->ptr, + sdslen(pattern->ptr), (char*)channel->ptr, - sdslen(channel->ptr),0)) - { - addReplyPubsubPatMessage(pat->client, - pat->pattern,channel,message); + sdslen(channel->ptr),0)) continue; + + listRewind(clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); + dictReleaseIterator(di); } return receivers; } diff --git a/src/server.c b/src/server.c index 84439461e..861aea631 100644 --- a/src/server.c +++ b/src/server.c @@ -2787,6 +2787,7 @@ void initServer(void) { evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); + server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; diff --git a/src/server.h b/src/server.h index 818fbc3b3..cb2f864a1 100644 --- a/src/server.h +++ b/src/server.h @@ -1344,6 +1344,7 @@ struct redisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ + dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ From 577656841fe508b492f1c5a7d53e8002198912fc Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Mar 2020 16:19:56 +0100 Subject: [PATCH 29/37] PSYNC2: fix backlog_idx when adjusting for meaningful offset See #7002. --- src/replication.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/replication.c b/src/replication.c index e62afb2fb..20f9a2129 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2755,6 +2755,9 @@ void replicationCacheMasterUsingMyself(void) { delta); server.master_initial_offset = server.master_repl_meaningful_offset; server.repl_backlog_histlen -= delta; + server.repl_backlog_idx = + (server.repl_backlog_idx + (server.repl_backlog_size - delta)) % + server.repl_backlog_size; if (server.repl_backlog_histlen < 0) server.repl_backlog_histlen = 0; } From b636856df04e5594796317502a57fc5abbf06eae Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 11:33:18 +0100 Subject: [PATCH 30/37] Precise timeouts: refactor unblocking on timeout. --- src/server.c | 41 +++++++++++++++++++++++++++++------------ src/server.h | 3 +++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/server.c b/src/server.c index 861aea631..e657f1196 100644 --- a/src/server.c +++ b/src/server.c @@ -1502,6 +1502,23 @@ long long getInstantaneousMetric(int metric) { return sum / STATS_METRIC_SAMPLES; } +/* Check if this blocked client timedout (does nothing if the client is + * not blocked right now). If so send a reply, unblock it, and return 1. + * Otherwise 0 is returned and no operation is performed. */ +int checkBlockedClientTimeout(client *c, mstime_t now) { + if (c->flags & CLIENT_BLOCKED && + c->bpop.timeout != 0 + && c->bpop.timeout < now) + { + /* Handle blocking operation specific timeout. */ + replyToBlockedClientTimedOut(c); + unblockClient(c); + return 1; + } else { + return 0; + } +} + /* Check for timeouts. Returns non-zero if the client was terminated. * The function gets the current time in milliseconds as argument since * it gets called multiple times in a loop, so calling gettimeofday() for @@ -1510,10 +1527,11 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { time_t now = now_ms/1000; if (server.maxidletime && - !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ + /* This handles the idle clients connection timeout if set. */ + !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ + !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ + !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ (now - c->lastinteraction > server.maxidletime)) { serverLog(LL_VERBOSE,"Closing idle client"); @@ -1522,15 +1540,14 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { } else if (c->flags & CLIENT_BLOCKED) { /* Blocked OPS timeout is handled with milliseconds resolution. * However note that the actual resolution is limited by - * server.hz. */ + * server.hz. So for short timeouts (less than SERVER_SHORT_TIMEOUT + * milliseconds) we populate a Radix tree and handle such timeouts + * in clientsHandleShortTimeout(). */ + if (checkBlockedClientTimeout(c,now_ms)) return 0; - if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - } else if (server.cluster_enabled) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this server. */ + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (server.cluster_enabled) { if (clusterRedirectBlockedClientIfNeeded(c)) unblockClient(c); } diff --git a/src/server.h b/src/server.h index cb2f864a1..d697e79a2 100644 --- a/src/server.h +++ b/src/server.h @@ -277,6 +277,9 @@ typedef long long ustime_t; /* microsecond time type. */ buffer configuration. Just the first three: normal, slave, pubsub. */ +/* Other client related defines. */ +#define CLIENT_SHORT_TIMEOUT 2000 /* See clientsHandleShortTimeout(). */ + /* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ #define REPL_STATE_NONE 0 /* No active replication */ From cedeec01f260b43508c07531f06739c639873e62 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 13:28:39 +0100 Subject: [PATCH 31/37] Precise timeouts: working initial implementation. --- src/blocked.c | 1 + src/server.c | 135 +++++++++++++++++++++++++++++++++++++++----------- src/server.h | 2 + 3 files changed, 110 insertions(+), 28 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 06aa5850e..c470cba00 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -619,6 +619,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); } blockClient(c,btype); + addClientToShortTimeoutTable(c); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.c b/src/server.c index e657f1196..26581d979 100644 --- a/src/server.c +++ b/src/server.c @@ -1473,34 +1473,7 @@ int allPersistenceDisabled(void) { return server.saveparamslen == 0 && server.aof_state == AOF_OFF; } -/* ======================= Cron: called every 100 ms ======================== */ - -/* Add a sample to the operations per second array of samples. */ -void trackInstantaneousMetric(int metric, long long current_reading) { - long long t = mstime() - server.inst_metric[metric].last_sample_time; - long long ops = current_reading - - server.inst_metric[metric].last_sample_count; - long long ops_sec; - - ops_sec = t > 0 ? (ops*1000/t) : 0; - - server.inst_metric[metric].samples[server.inst_metric[metric].idx] = - ops_sec; - server.inst_metric[metric].idx++; - server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; - server.inst_metric[metric].last_sample_time = mstime(); - server.inst_metric[metric].last_sample_count = current_reading; -} - -/* Return the mean of all the samples. */ -long long getInstantaneousMetric(int metric) { - int j; - long long sum = 0; - - for (j = 0; j < STATS_METRIC_SAMPLES; j++) - sum += server.inst_metric[metric].samples[j]; - return sum / STATS_METRIC_SAMPLES; -} +/* ========================== Clients timeouts ============================= */ /* Check if this blocked client timedout (does nothing if the client is * not blocked right now). If so send a reply, unblock it, and return 1. @@ -1555,6 +1528,107 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { return 0; } +/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we + * populate a radix tree of 128 bit keys composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a short timeout, we add the client in + * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run + * the tree and unblock the clients. + * + * Design hint: why we block only clients with short timeouts? For frugality: + * Clients blocking for 30 seconds usually don't need to be unblocked + * precisely, and anyway for the nature of Redis to *guarantee* unblock time + * precision is hard, so we can avoid putting a large number of clients in + * the radix tree without a good reason. This idea also has a role in memory + * usage as well given that we don't do cleanup, the shorter a client timeout, + * the less time it will stay in the radix tree. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle short timeouts. The client is not added to the list if its + * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */ +void addClientToShortTimeoutTable(client *c) { + if (c->bpop.timeout == 0 || + c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT) + { + return; + } + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); +} + +/* This function is called in beforeSleep() in order to unblock ASAP clients + * that are waiting in blocking operations with a short timeout set. */ +void clientsHandleShortTimeout(void) { + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,server.clients_timeout_table); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) checkBlockedClientTimeout(c,now); + raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* ======================= Cron: called every 100 ms ======================== */ + +/* Add a sample to the operations per second array of samples. */ +void trackInstantaneousMetric(int metric, long long current_reading) { + long long t = mstime() - server.inst_metric[metric].last_sample_time; + long long ops = current_reading - + server.inst_metric[metric].last_sample_count; + long long ops_sec; + + ops_sec = t > 0 ? (ops*1000/t) : 0; + + server.inst_metric[metric].samples[server.inst_metric[metric].idx] = + ops_sec; + server.inst_metric[metric].idx++; + server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; + server.inst_metric[metric].last_sample_time = mstime(); + server.inst_metric[metric].last_sample_count = current_reading; +} + +/* Return the mean of all the samples. */ +long long getInstantaneousMetric(int metric) { + int j; + long long sum = 0; + + for (j = 0; j < STATS_METRIC_SAMPLES; j++) + sum += server.inst_metric[metric].samples[j]; + return sum / STATS_METRIC_SAMPLES; +} + /* The client query buffer is an sds.c string that can end with a lot of * free space not used, this function reclaims space if needed. * @@ -2109,11 +2183,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle precise timeouts of blocked clients. */ + clientsHandleShortTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); @@ -2738,6 +2816,7 @@ void initServer(void) { server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); + server.clients_timeout_table = raxNew(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); diff --git a/src/server.h b/src/server.h index d697e79a2..c20f70136 100644 --- a/src/server.h +++ b/src/server.h @@ -1070,6 +1070,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ + rax *clients_timeout_table; /* Radix tree for clients with short timeout. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ @@ -2140,6 +2141,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); +void addClientToShortTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From e512707aed6a4a0f8969ee9e5ee952d5e8c9a579 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 14:37:00 +0100 Subject: [PATCH 32/37] Precise timeouts: fix bugs in initial implementation. --- src/blocked.c | 2 +- src/server.c | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index c470cba00..84a5287d4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -111,6 +111,7 @@ void blockClient(client *c, int btype) { c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; + addClientToShortTimeoutTable(c); } /* This function is called in the beforeSleep() function of the event loop @@ -619,7 +620,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); } blockClient(c,btype); - addClientToShortTimeoutTable(c); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.c b/src/server.c index 26581d979..19f43e413 100644 --- a/src/server.c +++ b/src/server.c @@ -1588,6 +1588,7 @@ void clientsHandleShortTimeout(void) { uint64_t now = mstime(); raxIterator ri; raxStart(&ri,server.clients_timeout_table); + raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id, timeout; @@ -1745,6 +1746,9 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { */ #define CLIENTS_CRON_MIN_ITERATIONS 5 void clientsCron(void) { + /* Unblock short timeout clients ASAP. */ + clientsHandleShortTimeout(); + /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we From c49ff924313be4635c5124557614a5342c73ba2a Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 15:52:16 +0100 Subject: [PATCH 33/37] Precise timeouts: fast exit for clientsHandleShortTimeout(). --- src/server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.c b/src/server.c index 19f43e413..9a86ae3de 100644 --- a/src/server.c +++ b/src/server.c @@ -1585,6 +1585,7 @@ void addClientToShortTimeoutTable(client *c) { /* This function is called in beforeSleep() in order to unblock ASAP clients * that are waiting in blocking operations with a short timeout set. */ void clientsHandleShortTimeout(void) { + if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); raxIterator ri; raxStart(&ri,server.clients_timeout_table); From 444089c55ea848b61bb36845d3be5104d041f5c2 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 16:02:26 +0100 Subject: [PATCH 34/37] Precise timeouts: use only radix tree for timeouts. --- src/blocked.c | 2 +- src/server.c | 46 +++++++++++++--------------------------------- src/server.h | 5 +---- 3 files changed, 15 insertions(+), 38 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 84a5287d4..443daec7f 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -111,7 +111,7 @@ void blockClient(client *c, int btype) { c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; - addClientToShortTimeoutTable(c); + addClientToTimeoutTable(c); } /* This function is called in the beforeSleep() function of the event loop diff --git a/src/server.c b/src/server.c index 9a86ae3de..f14ecc0cc 100644 --- a/src/server.c +++ b/src/server.c @@ -1511,13 +1511,6 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { freeClient(c); return 1; } else if (c->flags & CLIENT_BLOCKED) { - /* Blocked OPS timeout is handled with milliseconds resolution. - * However note that the actual resolution is limited by - * server.hz. So for short timeouts (less than SERVER_SHORT_TIMEOUT - * milliseconds) we populate a Radix tree and handle such timeouts - * in clientsHandleShortTimeout(). */ - if (checkBlockedClientTimeout(c,now_ms)) return 0; - /* Cluster: handle unblock & redirect of clients blocked * into keys no longer served by this server. */ if (server.cluster_enabled) { @@ -1528,8 +1521,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { return 0; } -/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we - * populate a radix tree of 128 bit keys composed as such: +/* For blocked clients timeouts we populate a radix tree of 128 bit keys + * composed as such: * * [8 byte big endian expire time]+[8 byte client ID] * @@ -1538,16 +1531,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * blocked with such timeout, we just go forward. * * Every time a client blocks with a short timeout, we add the client in - * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run - * the tree and unblock the clients. - * - * Design hint: why we block only clients with short timeouts? For frugality: - * Clients blocking for 30 seconds usually don't need to be unblocked - * precisely, and anyway for the nature of Redis to *guarantee* unblock time - * precision is hard, so we can avoid putting a large number of clients in - * the radix tree without a good reason. This idea also has a role in memory - * usage as well given that we don't do cleanup, the shorter a client timeout, - * the less time it will stay in the radix tree. */ + * the tree. In beforeSleep() we call clientsHandleTimeout() to run + * the tree and unblock the clients. */ #define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ @@ -1568,13 +1553,9 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { /* Add the specified client id / timeout as a key in the radix tree we use * to handle short timeouts. The client is not added to the list if its - * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */ -void addClientToShortTimeoutTable(client *c) { - if (c->bpop.timeout == 0 || - c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT) - { - return; - } + * timeout is zero (block forever). */ +void addClientToTimeoutTable(client *c) { + if (c->bpop.timeout == 0) return; uint64_t timeout = c->bpop.timeout; uint64_t id = c->id; unsigned char buf[CLIENT_ST_KEYLEN]; @@ -1584,7 +1565,7 @@ void addClientToShortTimeoutTable(client *c) { /* This function is called in beforeSleep() in order to unblock ASAP clients * that are waiting in blocking operations with a short timeout set. */ -void clientsHandleShortTimeout(void) { +void clientsHandleTimeout(void) { if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); raxIterator ri; @@ -1747,9 +1728,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { */ #define CLIENTS_CRON_MIN_ITERATIONS 5 void clientsCron(void) { - /* Unblock short timeout clients ASAP. */ - clientsHandleShortTimeout(); - /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we @@ -2189,7 +2167,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ - clientsHandleShortTimeout(); + clientsHandleTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -4102,11 +4080,13 @@ sds genRedisInfoString(const char *section) { "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" - "tracking_clients:%d\r\n", + "tracking_clients:%d\r\n" + "clients_in_timeout_table:%lld\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, server.blocked_clients, - server.tracking_clients); + server.tracking_clients, + raxSize(server.clients_timeout_table)); } /* Memory */ diff --git a/src/server.h b/src/server.h index c20f70136..4801a5aed 100644 --- a/src/server.h +++ b/src/server.h @@ -277,9 +277,6 @@ typedef long long ustime_t; /* microsecond time type. */ buffer configuration. Just the first three: normal, slave, pubsub. */ -/* Other client related defines. */ -#define CLIENT_SHORT_TIMEOUT 2000 /* See clientsHandleShortTimeout(). */ - /* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ #define REPL_STATE_NONE 0 /* No active replication */ @@ -2141,7 +2138,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); -void addClientToShortTimeoutTable(client *c); +void addClientToTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From 0e5723ff18ff3fbce7fb52c6714b986621e2faab Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 16:05:20 +0100 Subject: [PATCH 35/37] Precise timeouts: fix comments after functional change. --- src/server.c | 10 +++++----- src/server.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server.c b/src/server.c index f14ecc0cc..cb3c143f8 100644 --- a/src/server.c +++ b/src/server.c @@ -1530,7 +1530,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * reached the timeout already, if they are no longer existing or no longer * blocked with such timeout, we just go forward. * - * Every time a client blocks with a short timeout, we add the client in + * Every time a client blocks with a timeout, we add the client in * the tree. In beforeSleep() we call clientsHandleTimeout() to run * the tree and unblock the clients. */ @@ -1552,8 +1552,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { } /* Add the specified client id / timeout as a key in the radix tree we use - * to handle short timeouts. The client is not added to the list if its - * timeout is zero (block forever). */ + * to handle blocked clients timeouts. The client is not added to the list + * if its timeout is zero (block forever). */ void addClientToTimeoutTable(client *c) { if (c->bpop.timeout == 0) return; uint64_t timeout = c->bpop.timeout; @@ -1563,8 +1563,8 @@ void addClientToTimeoutTable(client *c) { raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); } -/* This function is called in beforeSleep() in order to unblock ASAP clients - * that are waiting in blocking operations with a short timeout set. */ +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ void clientsHandleTimeout(void) { if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); diff --git a/src/server.h b/src/server.h index 4801a5aed..4a8426bab 100644 --- a/src/server.h +++ b/src/server.h @@ -1067,7 +1067,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ - rax *clients_timeout_table; /* Radix tree for clients with short timeout. */ + rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ From c881ba1680298ad75bdb5580c0c69f89917edce8 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Mar 2020 11:13:38 +0100 Subject: [PATCH 36/37] Precise timeouts: cleaup the table on unblock. Now that this mechanism is the sole one used for blocked clients timeouts, it is more wise to cleanup the table when the client unblocks for any reason. We use a flag: CLIENT_IN_TO_TABLE, in order to avoid a radix tree lookup when the client was already removed from the table because we processed it by scanning the radix tree. --- src/blocked.c | 1 + src/server.c | 20 ++++++++++++++++++-- src/server.h | 2 ++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 443daec7f..795985ea1 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -186,6 +186,7 @@ void unblockClient(client *c) { server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } diff --git a/src/server.c b/src/server.c index cb3c143f8..9a89290ca 100644 --- a/src/server.c +++ b/src/server.c @@ -1560,7 +1560,20 @@ void addClientToTimeoutTable(client *c) { uint64_t id = c->id; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,id); - raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); + if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); } /* This function is called in beforeSleep() in order to unblock clients @@ -1577,7 +1590,10 @@ void clientsHandleTimeout(void) { decodeTimeoutKey(ri.key,&timeout,&id); if (timeout >= now) break; /* All the timeouts are in the future. */ client *c = lookupClientByID(id); - if (c) checkBlockedClientTimeout(c,now); + if (c) { + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); raxSeek(&ri,"^",NULL,0); } diff --git a/src/server.h b/src/server.h index 4a8426bab..03a153c7b 100644 --- a/src/server.h +++ b/src/server.h @@ -252,6 +252,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, depending on optin/optout mode. */ +#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -2139,6 +2140,7 @@ void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); void addClientToTimeoutTable(client *c); +void removeClientFromTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From 454e12cb8961f21c9dd8502dc82ae6ffd7e22fe0 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 4 Dec 2019 17:29:29 +0200 Subject: [PATCH 37/37] AOFRW on an empty stream created with MKSTREAM loads badkly the AOF will be loaded successfully, but the stream will be missing, i.e inconsistencies with the original db. this was because XADD with id of 0-0 would error. add a test to reproduce. --- src/aof.c | 3 ++- tests/unit/type/stream-cgroups.tcl | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/aof.c b/src/aof.c index 8ab9349f0..6bb239252 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1212,12 +1212,13 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Use the XADD MAXLEN 0 trick to generate an empty stream if * the key we are serializing is an empty string, which is possible * for the Stream type. */ + id.ms = 0; id.seq = 1; if (rioWriteBulkCount(r,'*',7) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; if (rioWriteBulkString(r,"x",1) == 0) return 0; if (rioWriteBulkString(r,"y",1) == 0) return 0; } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 072ed14d6..6b9a4a9cd 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -311,4 +311,17 @@ start_server { } } } + + start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} { + test {Empty stream with no lastid can be rewrite into AOF correctly} { + r XGROUP CREATE mystream group-name $ MKSTREAM + assert {[dict get [r xinfo stream mystream] length] == 0} + set grpinfo [r xinfo groups mystream] + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 0} + assert {[r xinfo groups mystream] == $grpinfo} + } + } }