From d387f67dcbb1e8c57aa8775ab9b53bbbeceda7c3 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 15:59:29 +0100 Subject: [PATCH 01/37] Sentinel: implement auth-user directive for ACLs. --- src/sentinel.c | 45 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 10c003d03..83d6c00bb 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -205,7 +205,8 @@ typedef struct sentinelRedisInstance { dict *slaves; /* Slaves for this master instance. */ unsigned int quorum;/* Number of sentinels that need to agree on failure. */ int parallel_syncs; /* How many slaves to reconfigure at same time. */ - char *auth_pass; /* Password to use for AUTH against master & slaves. */ + char *auth_pass; /* Password to use for AUTH against master & replica. */ + char *auth_user; /* Username for ACLs AUTH against master & replica. */ /* Slave specific. */ mstime_t master_link_down_time; /* Slave replication link down time. */ @@ -1231,6 +1232,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * SENTINEL_DEFAULT_DOWN_AFTER; ri->master_link_down_time = 0; ri->auth_pass = NULL; + ri->auth_user = NULL; ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; ri->slave_reconf_sent_time = 0; ri->slave_master_host = NULL; @@ -1289,6 +1291,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { sdsfree(ri->slave_master_host); sdsfree(ri->leader); sdsfree(ri->auth_pass); + sdsfree(ri->auth_user); sdsfree(ri->info); releaseSentinelAddr(ri->addr); dictRelease(ri->renamed_commands); @@ -1654,19 +1657,19 @@ char *sentinelHandleConfiguration(char **argv, int argc) { ri->failover_timeout = atoi(argv[2]); if (ri->failover_timeout <= 0) return "negative or zero time parameter."; - } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { + } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { /* parallel-syncs */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->parallel_syncs = atoi(argv[2]); - } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { /* notification-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; if (access(argv[2],X_OK) == -1) return "Notification script seems non existing or non executable."; ri->notification_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { /* client-reconfig-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; @@ -1674,11 +1677,16 @@ char *sentinelHandleConfiguration(char **argv, int argc) { return "Client reconfiguration script seems non existing or " "non executable."; ri->client_reconfig_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { + } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { /* auth-pass */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->auth_pass = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) { + /* auth-user */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->auth_user = sdsnew(argv[2]); } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) { /* current-epoch */ unsigned long long current_epoch = strtoull(argv[1],NULL,10); @@ -1836,7 +1844,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } - /* sentinel auth-pass */ + /* sentinel auth-pass & auth-user */ if (master->auth_pass) { line = sdscatprintf(sdsempty(), "sentinel auth-pass %s %s", @@ -1844,6 +1852,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } + if (master->auth_user) { + line = sdscatprintf(sdsempty(), + "sentinel auth-user %s %s", + master->name, master->auth_user); + rewriteConfigRewriteLine(state,"sentinel",line,1); + } + /* sentinel config-epoch */ line = sdscatprintf(sdsempty(), "sentinel config-epoch %s %llu", @@ -1968,19 +1983,29 @@ werr: * will disconnect and reconnect the link and so forth. */ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { char *auth_pass = NULL; + char *auth_user = NULL; if (ri->flags & SRI_MASTER) { auth_pass = ri->auth_pass; + auth_user = ri->auth_user; } else if (ri->flags & SRI_SLAVE) { auth_pass = ri->master->auth_pass; + auth_user = ri->master->auth_user; } else if (ri->flags & SRI_SENTINEL) { auth_pass = ACLDefaultUserFirstPassword(); + auth_user = NULL; } - if (auth_pass) { + if (auth_pass && auth_user == NULL) { if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", sentinelInstanceMapCommand(ri,"AUTH"), auth_pass) == C_OK) ri->link->pending_commands++; + } else if (auth_pass && auth_user) { + /* If we also have an username, use the ACL-style AUTH command + * with two arguments, username and password. */ + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"AUTH"), + auth_user, auth_pass) == C_OK) ri->link->pending_commands++; } } @@ -3522,6 +3547,12 @@ void sentinelSetCommand(client *c) { sdsfree(ri->auth_pass); ri->auth_pass = strlen(value) ? sdsnew(value) : NULL; changes++; + } else if (!strcasecmp(option,"auth-user") && moreargs > 0) { + /* auth-user */ + char *value = c->argv[++j]->ptr; + sdsfree(ri->auth_user); + ri->auth_user = strlen(value) ? sdsnew(value) : NULL; + changes++; } else if (!strcasecmp(option,"quorum") && moreargs > 0) { /* quorum */ robj *o = c->argv[++j]; From 9c2e42ddfc55505390d81b8110158ef9aba7b127 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 16:56:50 +0100 Subject: [PATCH 02/37] ACL: Make Redis 6 more backward compatible with requirepass. Note that this as a side effect fixes Sentinel "requirepass" mode. --- src/acl.c | 11 +---------- src/config.c | 16 ++++++++++++---- src/sentinel.c | 2 +- src/server.h | 3 +++ 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/acl.c b/src/acl.c index efe6b96ad..27f4bdb84 100644 --- a/src/acl.c +++ b/src/acl.c @@ -899,16 +899,6 @@ char *ACLSetUserStringError(void) { return errmsg; } -/* Return the first password of the default user or NULL. - * This function is needed for backward compatibility with the old - * directive "requirepass" when Redis supported a single global - * password. */ -sds ACLDefaultUserFirstPassword(void) { - if (listLength(DefaultUser->passwords) == 0) return NULL; - listNode *first = listFirst(DefaultUser->passwords); - return listNodeValue(first); -} - /* Initialize the default user, that will always exist for all the process * lifetime. */ void ACLInitDefaultUser(void) { @@ -925,6 +915,7 @@ void ACLInit(void) { UsersToLoad = listCreate(); ACLLog = listCreate(); ACLInitDefaultUser(); + server.requirepass = NULL; /* Only used for backward compatibility. */ } /* Check the username and password pair and return C_OK if they are valid, diff --git a/src/config.c b/src/config.c index 211b6d003..7c87ebe6e 100644 --- a/src/config.c +++ b/src/config.c @@ -411,11 +411,15 @@ void loadServerConfigFromString(char *config) { goto loaderr; } /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",argv[1]); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(server.requirepass); + server.requirepass = sdsnew(argv[1]); } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ /* DEAD OPTION */ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) { @@ -623,11 +627,15 @@ void configSetCommand(client *c) { config_set_special_field("requirepass") { if (sdslen(o->ptr) > CONFIG_AUTHPASS_MAX_LEN) goto badfmt; /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",(char*)o->ptr); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(server.requirepass); + server.requirepass = sdsnew(o->ptr); } config_set_special_field("save") { int vlen, j; sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); @@ -899,7 +907,7 @@ void configGetCommand(client *c) { } if (stringmatch(pattern,"requirepass",1)) { addReplyBulkCString(c,"requirepass"); - sds password = ACLDefaultUserFirstPassword(); + sds password = server.requirepass; if (password) { addReplyBulkCBuffer(c,password,sdslen(password)); } else { @@ -1341,7 +1349,7 @@ void rewriteConfigBindOption(struct rewriteConfigState *state) { void rewriteConfigRequirepassOption(struct rewriteConfigState *state, char *option) { int force = 1; sds line; - sds password = ACLDefaultUserFirstPassword(); + sds password = server.requirepass; /* If there is no password set, we don't want the requirepass option * to be present in the configuration at all. */ diff --git a/src/sentinel.c b/src/sentinel.c index 83d6c00bb..d091bf230 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -1992,7 +1992,7 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { auth_pass = ri->master->auth_pass; auth_user = ri->master->auth_user; } else if (ri->flags & SRI_SENTINEL) { - auth_pass = ACLDefaultUserFirstPassword(); + auth_pass = server.requirepass; auth_user = NULL; } diff --git a/src/server.h b/src/server.h index 3c19a17ea..fa6770dfa 100644 --- a/src/server.h +++ b/src/server.h @@ -1395,6 +1395,9 @@ struct redisServer { /* ACLs */ char *acl_filename; /* ACL Users file. NULL if not configured. */ unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */ + sds requirepass; /* Remember the cleartext password set with the + old "requirepass" directive for backward + compatibility with Redis <= 5. */ /* Assert & bug reporting */ const char *assert_failed; const char *assert_file; From cbbf9b3931c517aa59fc9ede9e79184f0c0cc69e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 17:11:43 +0100 Subject: [PATCH 03/37] Sentinel: document auth-user directive. --- sentinel.conf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sentinel.conf b/sentinel.conf index 796f45088..4ca5e5f8f 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -102,6 +102,18 @@ sentinel monitor mymaster 127.0.0.1 6379 2 # # sentinel auth-pass mymaster MySUPER--secret-0123passw0rd +# sentinel auth-user +# +# This is useful in order to authenticate to instances having ACL capabilities, +# that is, running Redis 6.0 or greater. When just auth-pass is provided the +# Sentinel instance will authenticate to Redis using the old "AUTH " +# method. When also an username is provided, it will use "AUTH ". +# In the Redis servers side, the ACL to provide just minimal access to +# Sentinel instances, should be configured along the following lines: +# +# user sentinel-user >somepassword +client +subscribe +publish \ +# +ping +info +multi +slaveof +config +client +exec on + # sentinel down-after-milliseconds # # Number of milliseconds the master (or any attached replica or sentinel) should From 34ea2f4e1a7df2f1ae1404e7194c55b253323c87 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 20 Mar 2020 12:45:48 +0100 Subject: [PATCH 04/37] ACL: default user off should not allow automatic authentication. This fixes issue #7011. --- src/networking.c | 3 ++- src/server.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0690bbdf6..69d59a59b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -124,7 +124,8 @@ client *createClient(connection *conn) { c->ctime = c->lastinteraction = server.unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ - c->authenticated = (c->user->flags & USER_FLAG_NOPASS) != 0; + c->authenticated = (c->user->flags & USER_FLAG_NOPASS) && + !(c->user->flags & USER_FLAG_DISABLED); c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; diff --git a/src/server.c b/src/server.c index f702da94a..612805ce5 100644 --- a/src/server.c +++ b/src/server.c @@ -3380,7 +3380,7 @@ int processCommand(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || - DefaultUser->flags & USER_FLAG_DISABLED) && + (DefaultUser->flags & USER_FLAG_DISABLED)) && !c->authenticated; if (auth_required) { /* AUTH and HELLO and no auth modules are valid even in From 61b98f32a298ed1b971dd7ae5d32db9b9a8e5cc9 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 20 Mar 2020 12:52:06 +0100 Subject: [PATCH 05/37] Regression test for #7011. --- tests/unit/acl.tcl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index fc1664a75..85c9b81a9 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -248,4 +248,11 @@ start_server {tags {"acl"}} { r AUTH default "" assert {[llength [r ACL LOG]] == 5} } + + test {When default user is off, new connections are not authenticated} { + r ACL setuser default off + catch {set rd1 [redis_deferring_client]} e + r ACL setuser default on + set e + } {*NOAUTH*} } From 299f1d0258e0d489b9c7438f840e14b8789f37ce Mon Sep 17 00:00:00 2001 From: WuYunlong Date: Wed, 18 Mar 2020 16:17:46 +0800 Subject: [PATCH 06/37] Add 14-consistency-check.tcl to prove there is a data consistency issue. --- tests/cluster/tests/14-consistency-check.tcl | 87 ++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 tests/cluster/tests/14-consistency-check.tcl diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl new file mode 100644 index 000000000..a43725ebc --- /dev/null +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -0,0 +1,87 @@ +source "../tests/includes/init-tests.tcl" + +test "Create a 5 nodes cluster" { + create_cluster 5 5 +} + +test "Cluster should start ok" { + assert_cluster_state ok +} + +test "Cluster is writable" { + cluster_write_test 0 +} + +proc find_non_empty_master {} { + set master_id_no {} + foreach_redis_id id { + if {[RI $id role] eq {master} && [R $id dbsize] > 0} { + set master_id_no $id + } + } + return $master_id_no +} + +proc get_one_of_my_replica {id} { + set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1] + set replica_id_num [get_instance_id_by_port redis $replica_port] + return $replica_id_num +} + +proc cluster_write_keys_with_expire {id ttl} { + set prefix [randstring 20 20 alpha] + set port [get_instance_attrib redis $id port] + set cluster [redis_cluster 127.0.0.1:$port] + for {set j 100} {$j < 200} {incr j} { + $cluster setex key_expire.$j $ttl $prefix.$j + } + $cluster close +} + +proc test_slave_load_expired_keys {aof} { + test "Slave expired keys is loaded when restarted: appendonly=$aof" { + set master_id [find_non_empty_master] + set replica_id [get_one_of_my_replica $master_id] + + set master_dbsize [R $master_id dbsize] + set slave_dbsize [R $replica_id dbsize] + assert_equal $master_dbsize $slave_dbsize + + set data_ttl 5 + cluster_write_keys_with_expire $master_id $data_ttl + after 100 + set replica_dbsize_1 [R $replica_id dbsize] + assert {$replica_dbsize_1 > $slave_dbsize} + + R $replica_id config set appendonly $aof + R $replica_id config rewrite + + set start_time [clock seconds] + set end_time [expr $start_time+$data_ttl+2] + R $replica_id save + set replica_dbsize_2 [R $replica_id dbsize] + assert {$replica_dbsize_2 > $slave_dbsize} + kill_instance redis $replica_id + + set master_port [get_instance_attrib redis $master_id port] + exec ../../../src/redis-cli -h 127.0.0.1 -p $master_port debug sleep [expr $data_ttl+3] > /dev/null & + + while {[clock seconds] <= $end_time} { + #wait for $data_ttl seconds + } + restart_instance redis $replica_id + + wait_for_condition 200 50 { + [R $replica_id ping] eq {PONG} + } else { + fail "replica #$replica_id not started" + } + + set replica_dbsize_3 [R $replica_id dbsize] + assert {$replica_dbsize_3 > $slave_dbsize} + } +} + +test_slave_load_expired_keys no +after 5000 +test_slave_load_expired_keys yes From 0578157d569e673d5c7728ea7941a4e0f24a4d48 Mon Sep 17 00:00:00 2001 From: WuYunlong Date: Wed, 18 Mar 2020 16:20:10 +0800 Subject: [PATCH 07/37] Fix master replica inconsistency for upgrading scenario. Before this commit, when upgrading a replica, expired keys will not be loaded, thus causing replica having less keys in db. To this point, master and replica's keys is logically consistent. However, before the keys in master and replica are physically consistent, that is, they have the same dbsize, if master got a problem and the replica got promoted and becomes new master of that partition, and master updates a key which does not exist on master, but physically exists on the old master(new replica), the old master would refuse to update the key, thus causing master and replica data inconsistent. How could this happen? That's all because of the wrong judgement of roles while starting up the server. We can not use server.masterhost to judge if the server is master or replica, since it fails in cluster mode. When we start the server, we load rdb and do want to load expired keys, and do not want to have the ability to active expire keys, if it is a replica. --- src/rdb.c | 2 +- src/server.c | 7 ++++++- src/server.h | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index cbcea96c6..5d34f5a32 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2231,7 +2231,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { + if (iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { diff --git a/src/server.c b/src/server.c index 612805ce5..4b010b870 100644 --- a/src/server.c +++ b/src/server.c @@ -1691,7 +1691,7 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { - if (server.masterhost == NULL) { + if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); @@ -4863,6 +4863,11 @@ int redisIsSupervised(int mode) { return 0; } +int iAmMaster(void) { + return ((!server.cluster_enabled && server.masterhost == NULL) || + (server.cluster_enabled && nodeIsMaster(server.cluster->myself))); +} + int main(int argc, char **argv) { struct timeval tv; diff --git a/src/server.h b/src/server.h index fa6770dfa..fdfe5b8ea 100644 --- a/src/server.h +++ b/src/server.h @@ -2393,4 +2393,6 @@ int tlsConfigure(redisTLSContextConfig *ctx_config); #define redisDebugMark() \ printf("-- MARK %s:%d --\n", __FILE__, __LINE__) +int iAmMaster(void); + #endif From 04c53fa145273d531229d0324fef22494e17868a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=8A=B9=ED=98=84?= Date: Wed, 18 Mar 2020 14:40:50 +0900 Subject: [PATCH 08/37] Update redis.conf --- redis.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis.conf b/redis.conf index c9d256bef..7c55a3ab0 100644 --- a/redis.conf +++ b/redis.conf @@ -1628,7 +1628,7 @@ hz 10 # offers, and enables by default, the ability to use an adaptive HZ value # which will temporary raise when there are many connected clients. # -# When dynamic HZ is enabled, the actual configured HZ will be used as +# When dynamic HZ is enabled, the actual configured HZ will be used # as a baseline, but multiples of the configured HZ value will be actually # used as needed once more clients are connected. In this way an idle # instance will use very little CPU time while a busy instance will be From 95324b8190e5338c287dff32ce65d50a29c981eb Mon Sep 17 00:00:00 2001 From: artix Date: Tue, 18 Feb 2020 10:46:10 +0100 Subject: [PATCH 09/37] Support Redis Cluster Proxy PROXY INFO command --- src/redis-cli.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7ad80c0a1..7e440d67c 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1292,7 +1292,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { (argc == 3 && !strcasecmp(command,"latency") && !strcasecmp(argv[1],"graph")) || (argc == 2 && !strcasecmp(command,"latency") && - !strcasecmp(argv[1],"doctor"))) + !strcasecmp(argv[1],"doctor")) || + /* Format PROXY INFO command for Redis Cluster Proxy: + * https://github.com/artix75/redis-cluster-proxy */ + (argc >= 2 && !strcasecmp(command,"proxy") && + !strcasecmp(argv[1],"info"))) { output_raw = 1; } From 2dd1ca6af0d0d6514e8c18eeba86b5f1b7a55979 Mon Sep 17 00:00:00 2001 From: hwware Date: Fri, 20 Mar 2020 02:40:54 -0400 Subject: [PATCH 10/37] add missing commands in cluster help --- src/cluster.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 5f63d2b8f..72755823a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4261,7 +4261,7 @@ void clusterCommand(client *c) { "FORGET -- Remove a node from the cluster.", "GETKEYSINSLOT -- Return key names stored by current node in a slot.", "FLUSHSLOTS -- Delete current node own slots information.", -"INFO - Return onformation about the cluster.", +"INFO - Return information about the cluster.", "KEYSLOT -- Return the hash slot for .", "MEET [bus-port] -- Connect nodes into a working cluster.", "MYID -- Return the node id.", @@ -4272,6 +4272,7 @@ void clusterCommand(client *c) { "SET-config-epoch - Set config epoch of current node.", "SETSLOT (importing|migrating|stable|node ) -- Set slot state.", "REPLICAS -- Return replicas.", +"SAVECONFIG - Force saving cluster configuration on disk.", "SLOTS -- Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL From c7524a7e447d92775c1fb61d8f0640955bce952b Mon Sep 17 00:00:00 2001 From: hwware Date: Mon, 23 Mar 2020 01:04:49 -0400 Subject: [PATCH 11/37] clean CLIENT_TRACKING_CACHING flag when disabled caching --- src/tracking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 45f83103a..5c1f48cba 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -94,7 +94,7 @@ void disableTracking(client *c) { server.tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| - CLIENT_TRACKING_OPTOUT); + CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING); } } From 81e8686cc7e1ed9f2c5d306b93cfa7462a05c7d2 Mon Sep 17 00:00:00 2001 From: hwware Date: Mon, 23 Mar 2020 01:07:46 -0400 Subject: [PATCH 12/37] remove redundant Semicolon --- src/tracking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tracking.c b/src/tracking.c index 5c1f48cba..6f7929430 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -271,7 +271,7 @@ void trackingInvalidateKey(robj *keyobj) { trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); - if (ids == raxNotFound) return;; + if (ids == raxNotFound) return; raxIterator ri; raxStart(&ri,ids); From 87dbd8f54cdf5f0fb97851cf51dffe1bb9a16a1c Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:42:03 +0200 Subject: [PATCH 13/37] Conns: Fix connClose() / connAccept() behavior. We assume accept handlers may choose to reject a connection and close it, but connAccept() callers can't distinguish between this state and other error states requiring connClose(). This makes it safe (and mandatory!) to always call connClose() if connAccept() fails, and safe for accept handlers to close connections (which will defer). --- src/connection.c | 12 ++++++++--- src/connection.h | 15 ++++++++++---- src/connhelpers.h | 53 +++++++++++++++++++++++++---------------------- 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/connection.c b/src/connection.c index 58d86c31b..2015c9195 100644 --- a/src/connection.c +++ b/src/connection.c @@ -152,7 +152,7 @@ static void connSocketClose(connection *conn) { /* If called from within a handler, schedule the close but * keep the connection until the handler returns. */ - if (conn->flags & CONN_FLAG_IN_HANDLER) { + if (connHasRefs(conn)) { conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; return; } @@ -183,10 +183,16 @@ static int connSocketRead(connection *conn, void *buf, size_t buf_len) { } static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { + int ret = C_OK; + if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; conn->state = CONN_STATE_CONNECTED; - if (!callHandler(conn, accept_handler)) return C_ERR; - return C_OK; + + connIncrRefs(conn); + if (!callHandler(conn, accept_handler)) ret = C_ERR; + connDecrRefs(conn); + + return ret; } /* Register a write handler, to be called when the connection is writable. diff --git a/src/connection.h b/src/connection.h index 97622f8d6..db09dfd83 100644 --- a/src/connection.h +++ b/src/connection.h @@ -45,9 +45,8 @@ typedef enum { CONN_STATE_ERROR } ConnectionState; -#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */ -#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */ -#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */ +#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */ +#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -70,7 +69,8 @@ typedef struct ConnectionType { struct connection { ConnectionType *type; ConnectionState state; - int flags; + short int flags; + short int refs; int last_errno; void *private_data; ConnectionCallbackFunc conn_handler; @@ -88,6 +88,13 @@ struct connection { * connAccept() may directly call accept_handler(), or return and call it * at a later time. This behavior is a bit awkward but aims to reduce the need * to wait for the next event loop, if no additional handshake is required. + * + * IMPORTANT: accept_handler may decide to close the connection, calling connClose(). + * To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED + * in this case, and connAccept() returns with an error. + * + * connAccept() callers must always check the return value and on error (C_ERR) + * a connClose() must be called. */ static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) { diff --git a/src/connhelpers.h b/src/connhelpers.h index f237c9b1d..86250d09e 100644 --- a/src/connhelpers.h +++ b/src/connhelpers.h @@ -37,46 +37,49 @@ * implementations (currently sockets in connection.c and TLS in tls.c). * * Currently helpers implement the mechanisms for invoking connection - * handlers, tracking in-handler states and dealing with deferred - * destruction (if invoked by a handler). + * handlers and tracking connection references, to allow safe destruction + * of connections from within a handler. */ -/* Called whenever a handler is invoked on a connection and sets the - * CONN_FLAG_IN_HANDLER flag to indicate we're in a handler context. +/* Incremenet connection references. * - * An attempt to close a connection while CONN_FLAG_IN_HANDLER is - * set will result with deferred close, i.e. setting the CONN_FLAG_CLOSE_SCHEDULED - * instead of destructing it. + * Inside a connection handler, we guarantee refs >= 1 so it is always + * safe to connClose(). + * + * In other cases where we don't want to prematurely lose the connection, + * it can go beyond 1 as well; currently it is only done by connAccept(). */ -static inline void enterHandler(connection *conn) { - conn->flags |= CONN_FLAG_IN_HANDLER; +static inline void connIncrRefs(connection *conn) { + conn->refs++; } -/* Called whenever a handler returns. This unsets the CONN_FLAG_IN_HANDLER - * flag and performs actual close/destruction if a deferred close was - * scheduled by the handler. +/* Decrement connection references. + * + * Note that this is not intended to provide any automatic free logic! + * callHandler() takes care of that for the common flows, and anywhere an + * explicit connIncrRefs() is used, the caller is expected to take care of + * that. */ -static inline int exitHandler(connection *conn) { - conn->flags &= ~CONN_FLAG_IN_HANDLER; - if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); - return 0; - } - return 1; + +static inline void connDecrRefs(connection *conn) { + conn->refs--; +} + +static inline int connHasRefs(connection *conn) { + return conn->refs; } /* Helper for connection implementations to call handlers: - * 1. Mark the handler in use. + * 1. Increment refs to protect the connection. * 2. Execute the handler (if set). - * 3. Mark the handler as NOT in use and perform deferred close if was - * requested by the handler at any time. + * 3. Decrement refs and perform deferred close, if refs==0. */ static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) { - conn->flags |= CONN_FLAG_IN_HANDLER; + connIncrRefs(conn); if (handler) handler(conn); - conn->flags &= ~CONN_FLAG_IN_HANDLER; + connDecrRefs(conn); if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); + if (!connHasRefs(conn)) connClose(conn); return 0; } return 1; From 50dcd9f96d133b740aa04a3877d67fb890630bc8 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:46:16 +0200 Subject: [PATCH 14/37] Cluster: fix misleading accept errors. --- src/cluster.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 72755823a..a2e9ff5b6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -681,9 +681,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * or schedule it for later depending on connection implementation. */ if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) { - serverLog(LL_VERBOSE, - "Error accepting cluster node connection: %s", - connGetLastError(conn)); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_VERBOSE, + "Error accepting cluster node connection: %s", + connGetLastError(conn)); connClose(conn); return; } From cdcab0e820710c17eb5cdaf0fa532908e3417df1 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 22 Mar 2020 14:47:44 +0200 Subject: [PATCH 15/37] Fix crashes related to failed/rejected accepts. --- src/networking.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 69d59a59b..a550e4040 100644 --- a/src/networking.c +++ b/src/networking.c @@ -786,7 +786,7 @@ void clientAcceptHandler(connection *conn) { serverLog(LL_WARNING, "Error accepting a client connection: %s", connGetLastError(conn)); - freeClient(c); + freeClientAsync(c); return; } @@ -828,7 +828,7 @@ void clientAcceptHandler(connection *conn) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; - freeClient(c); + freeClientAsync(c); return; } } @@ -887,9 +887,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) { */ if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; - serverLog(LL_WARNING, - "Error accepting a client connection: %s (conn: %s)", - connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_WARNING, + "Error accepting a client connection: %s (conn: %s)", + connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } From eb80887936cd57223bda4776129bab157b09c5a7 Mon Sep 17 00:00:00 2001 From: hwware Date: Wed, 18 Mar 2020 09:33:52 -0400 Subject: [PATCH 16/37] fix potentical memory leak in redis-cli --- src/redis-cli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/redis-cli.c b/src/redis-cli.c index 7e440d67c..72480d08c 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1993,6 +1993,8 @@ static void repl(void) { if (config.eval) { config.eval_ldb = 1; config.output = OUTPUT_RAW; + sdsfreesplitres(argv,argc); + linenoiseFree(line); return; /* Return to evalMode to restart the session. */ } else { printf("Use 'restart' only in Lua debugging mode."); From f2f3dc5e73f93ec8c5b9efe360adbf06da0ea0bd Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 18 Mar 2020 18:34:27 +0530 Subject: [PATCH 17/37] Allow RM_GetContextFlags to work with ctx==NULL --- src/module.c | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/module.c b/src/module.c index 74da6c24d..d6b055f78 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,20 +1848,22 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { int flags = 0; /* Client specific flags */ - if (ctx->client) { - if (ctx->client->flags & CLIENT_LUA) - flags |= REDISMODULE_CTX_FLAGS_LUA; - if (ctx->client->flags & CLIENT_MULTI) - flags |= REDISMODULE_CTX_FLAGS_MULTI; - /* Module command recieved from MASTER, is replicated. */ - if (ctx->client->flags & CLIENT_MASTER) - flags |= REDISMODULE_CTX_FLAGS_REPLICATED; - } + if (ctx) { + if (ctx->client) { + if (ctx->client->flags & CLIENT_LUA) + flags |= REDISMODULE_CTX_FLAGS_LUA; + if (ctx->client->flags & CLIENT_MULTI) + flags |= REDISMODULE_CTX_FLAGS_MULTI; + /* Module command recieved from MASTER, is replicated. */ + if (ctx->client->flags & CLIENT_MASTER) + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; + } - /* For DIRTY flags, we need the blocked client if used */ - client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; - if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { - flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + /* For DIRTY flags, we need the blocked client if used */ + client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; + if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { + flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + } } if (server.cluster_enabled) From 50f8f9504b2ba24edc646cfb32dac2a496ba34cd Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:17:50 +0100 Subject: [PATCH 18/37] Modules: updated function doc after #7003. --- src/module.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index d6b055f78..6f61a5ca8 100644 --- a/src/module.c +++ b/src/module.c @@ -1795,7 +1795,12 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * current request context (whether the client is a Lua script or in a MULTI), * and about the Redis instance in general, i.e replication and persistence. * - * The available flags are: + * It is possible to call this function even with a NULL context, however + * in this case the following flags will not be reported: + * + * * LUA, MULTI, REPLICATED, DIRTY (see below for more info). + * + * Available flags and their meaning: * * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script * From b3e4abf06ea01506a809898552845a191fb52cbf Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Wed, 4 Mar 2020 20:51:45 +0800 Subject: [PATCH 19/37] Added BITFIELD_RO variants for read-only operations. --- src/bitops.c | 19 ++++++++++++++++++- src/server.c | 4 ++++ src/server.h | 1 + tests/unit/bitfield.tcl | 31 +++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/bitops.c b/src/bitops.c index ee1ce0460..ffb330013 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -902,6 +902,9 @@ void bitposCommand(client *c) { * OVERFLOW [WRAP|SAT|FAIL] */ +#define BITFIELD_COMMON (1<<0) +#define BITFIELD_READONLY (1<<1) + struct bitfieldOp { uint64_t offset; /* Bitfield offset. */ int64_t i64; /* Increment amount (INCRBY) or SET value */ @@ -911,7 +914,7 @@ struct bitfieldOp { int sign; /* True if signed, otherwise unsigned op. */ }; -void bitfieldCommand(client *c) { +void bitfieldGeneric(client *c, int flags) { robj *o; size_t bitoffset; int j, numops = 0, changes = 0; @@ -999,6 +1002,12 @@ void bitfieldCommand(client *c) { return; } } else { + if (flags & BITFIELD_READONLY) { + zfree(ops); + addReplyError(c, "bitfield_ro only support get subcommand"); + return; + } + /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, @@ -1129,3 +1138,11 @@ void bitfieldCommand(client *c) { } zfree(ops); } + +void bitfieldCommand(client *c) { + bitfieldGeneric(c, BITFIELD_COMMON); +} + +void bitfieldroCommand(client *c) { + bitfieldGeneric(c, BITFIELD_READONLY); +} diff --git a/src/server.c b/src/server.c index 4b010b870..f5fb339f9 100644 --- a/src/server.c +++ b/src/server.c @@ -238,6 +238,10 @@ struct redisCommand redisCommandTable[] = { "write use-memory @bitmap", 0,NULL,1,1,1,0,0,0}, + {"bitfield_ro",bitfieldroCommand,-2, + "read-only fast @bitmap", + 0,NULL,1,1,1,0,0,0}, + {"setrange",setrangeCommand,4, "write use-memory @string", 0,NULL,1,1,1,0,0,0}, diff --git a/src/server.h b/src/server.h index fdfe5b8ea..ed4707d66 100644 --- a/src/server.h +++ b/src/server.h @@ -2177,6 +2177,7 @@ void existsCommand(client *c); void setbitCommand(client *c); void getbitCommand(client *c); void bitfieldCommand(client *c); +void bitfieldroCommand(client *c); void setrangeCommand(client *c); void getrangeCommand(client *c); void incrCommand(client *c); diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl index d76452b1b..819d8f36d 100644 --- a/tests/unit/bitfield.tcl +++ b/tests/unit/bitfield.tcl @@ -199,3 +199,34 @@ start_server {tags {"bitops"}} { r del mystring } } + +start_server {tags {"repl"}} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + test {setup slave} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + } + + test {write on master, read on slave} { + $master del bits + assert_equal 0 [$master bitfield bits set u8 0 255] + assert_equal 255 [$master bitfield bits set u8 0 100] + wait_for_ofs_sync $master $slave + assert_equal 100 [$slave bitfield_ro bits get u8 0] + } + + test {bitfield_ro with write option} { + catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err + assert_match {*ERR bitfield_ro only support get subcommand*} $err + } + } +} From ec9cf002d537d2d565b3d91c116d2bab0a9a5fd4 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:28:09 +0100 Subject: [PATCH 20/37] Minor changes to BITFIELD_RO PR #6951. --- src/bitops.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index ffb330013..d4e82c937 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -902,8 +902,8 @@ void bitposCommand(client *c) { * OVERFLOW [WRAP|SAT|FAIL] */ -#define BITFIELD_COMMON (1<<0) -#define BITFIELD_READONLY (1<<1) +#define BITFIELD_FLAG_NONE 0 +#define BITFIELD_FLAG_READONLY (1<<0) struct bitfieldOp { uint64_t offset; /* Bitfield offset. */ @@ -914,6 +914,9 @@ struct bitfieldOp { int sign; /* True if signed, otherwise unsigned op. */ }; +/* This implements both the BITFIELD command and the BITFIELD_RO command + * when flags is set to BITFIELD_FLAG_READONLY: in this case only the + * GET subcommand is allowed, other subcommands will return an error. */ void bitfieldGeneric(client *c, int flags) { robj *o; size_t bitoffset; @@ -1002,9 +1005,9 @@ void bitfieldGeneric(client *c, int flags) { return; } } else { - if (flags & BITFIELD_READONLY) { + if (flags & BITFIELD_FLAG_READONLY) { zfree(ops); - addReplyError(c, "bitfield_ro only support get subcommand"); + addReplyError(c, "BITFIELD_RO only support the GET subcommand"); return; } @@ -1140,9 +1143,9 @@ void bitfieldGeneric(client *c, int flags) { } void bitfieldCommand(client *c) { - bitfieldGeneric(c, BITFIELD_COMMON); + bitfieldGeneric(c, BITFIELD_FLAG_NONE); } void bitfieldroCommand(client *c) { - bitfieldGeneric(c, BITFIELD_READONLY); + bitfieldGeneric(c, BITFIELD_FLAG_READONLY); } From 8783304a2da57561e423da280cc08c9a8e4c44ad Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 11:47:37 +0100 Subject: [PATCH 21/37] Abort transactions after -READONLY error. Fix #7014. --- src/server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.c b/src/server.c index f5fb339f9..ddc90b3dd 100644 --- a/src/server.c +++ b/src/server.c @@ -3509,6 +3509,7 @@ int processCommand(client *c) { !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { + flagTransaction(c); addReply(c, shared.roslaveerr); return C_OK; } From 70a98a43ea5e4544f83bdea5257a3733c8afe188 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 12:00:46 +0100 Subject: [PATCH 22/37] Fix BITFIELD_RO test. --- src/bitops.c | 2 +- tests/unit/bitfield.tcl | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index d4e82c937..f78e4fd34 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -1007,7 +1007,7 @@ void bitfieldGeneric(client *c, int flags) { } else { if (flags & BITFIELD_FLAG_READONLY) { zfree(ops); - addReplyError(c, "BITFIELD_RO only support the GET subcommand"); + addReplyError(c, "BITFIELD_RO only supports the GET subcommand"); return; } diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl index 819d8f36d..1f2f6395e 100644 --- a/tests/unit/bitfield.tcl +++ b/tests/unit/bitfield.tcl @@ -207,7 +207,7 @@ start_server {tags {"repl"}} { set master_port [srv -1 port] set slave [srv 0 client] - test {setup slave} { + test {BITFIELD: setup slave} { $slave slaveof $master_host $master_port wait_for_condition 50 100 { [s 0 master_link_status] eq {up} @@ -216,7 +216,7 @@ start_server {tags {"repl"}} { } } - test {write on master, read on slave} { + test {BITFIELD: write on master, read on slave} { $master del bits assert_equal 0 [$master bitfield bits set u8 0 255] assert_equal 255 [$master bitfield bits set u8 0 100] @@ -224,9 +224,9 @@ start_server {tags {"repl"}} { assert_equal 100 [$slave bitfield_ro bits get u8 0] } - test {bitfield_ro with write option} { + test {BITFIELD_RO fails when write option is used} { catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err - assert_match {*ERR bitfield_ro only support get subcommand*} $err + assert_match {*ERR BITFIELD_RO only supports the GET subcommand*} $err } } } From 34b89832200fa8bce5d739cb732d1bddfc25a83e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Mar 2020 16:17:35 +0100 Subject: [PATCH 23/37] Improve comments of replicationCacheMasterUsingMyself(). --- src/replication.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 31e14d7fe..49c38f73f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2725,9 +2725,14 @@ void replicationCacheMaster(client *c) { * current offset if no data was lost during the failover. So we use our * current replication ID and offset in order to synthesize a cached master. */ void replicationCacheMasterUsingMyself(void) { + /* This will be used to populate the field server.master->reploff + * by replicationCreateMasterClient(). We'll later set the created + * master as server.cached_master, so the replica will use such + * offset for PSYNC. */ + server.master_initial_offset = server.master_repl_offset; + /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ - server.master_initial_offset = server.master_repl_offset; replicationCreateMasterClient(NULL,-1); /* Use our own ID / offset. */ From e43cd8316f16525371f06e29512e66daa24c8f74 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 23 Mar 2020 20:13:52 +0200 Subject: [PATCH 24/37] MULTI/EXEC during LUA script timeout are messed up Redis refusing to run MULTI or EXEC during script timeout may cause partial transactions to run. 1) if the client sends MULTI+commands+EXEC in pipeline without waiting for response, but these arrive to the shards partially while there's a busy script, and partially after it eventually finishes: we'll end up running only part of the transaction (since multi was ignored, and exec would fail). 2) similar to the above if EXEC arrives during busy script, it'll be ignored and the client state remains in a transaction. the 3rd test which i added for a case where MULTI and EXEC are ok, and only the body arrives during busy script was already handled correctly since processCommand calls flagTransaction --- src/server.c | 1 + tests/unit/multi.tcl | 72 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/src/server.c b/src/server.c index ddc90b3dd..5a54a3abb 100644 --- a/src/server.c +++ b/src/server.c @@ -3553,6 +3553,7 @@ int processCommand(client *c) { c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && + c->cmd->proc != multiCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 9fcef71d6..55f18bec8 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -320,4 +320,76 @@ start_server {tags {"multi"}} { $rd close r ping } {PONG} + + test {MULTI and script timeout} { + # check that if MULTI arrives during timeout, it is either refused, or + # allowed to pass, and we don't end up executing half of the transaction + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {EXEC and script timeout} { + # check that if EXEC arrives during timeout, we don't end up executing + # half of the transaction, and also that we exit the multi state + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {MULTI-EXEC body and script timeout} { + # check that we don't run an imcomplete transaction due to some commands + # arriving during busy script + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } } From 8caa2714768947082fe876891d93431a3f69efdf Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2020 12:46:59 +0100 Subject: [PATCH 25/37] Explain why we allow transactions in -BUSY state. Related to #7022. --- src/server.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/server.c b/src/server.c index 5a54a3abb..65a01db57 100644 --- a/src/server.c +++ b/src/server.c @@ -3548,12 +3548,19 @@ int processCommand(client *c) { return C_OK; } - /* Lua script too slow? Only allow a limited number of commands. */ + /* Lua script too slow? Only allow a limited number of commands. + * Note that we need to allow the transactions commands, otherwise clients + * sending a transaction with pipelining without error checking, may have + * the MULTI plus a few initial commands refused, then the timeout + * condition resolves, and the bottom-half of the transaction gets + * executed, see Github PR #7022. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && - c->cmd->proc != multiCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && + c->cmd->proc != multiCommand && + c->cmd->proc != execCommand && + c->cmd->proc != discardCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && From 5f72f696884c6007704e4439f850aa6ce3662013 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Mar 2020 11:02:40 +0100 Subject: [PATCH 26/37] PSYNC2: meaningful offset implemented. A very commonly signaled operational problem with Redis master-replicas sets is that, once the master becomes unavailable for some reason, especially because of network problems, many times it wont be able to perform a partial resynchronization with the new master, once it rejoins the partition, for the following reason: 1. The master becomes isolated, however it keeps sending PINGs to the replicas. Such PINGs will never be received since the link connection is actually already severed. 2. On the other side, one of the replicas will turn into the new master, setting its secondary replication ID offset to the one of the last command received from the old master: this offset will not include the PINGs sent by the master once the link was already disconnected. 3. When the master rejoins the partion and is turned into a replica, its offset will be too advanced because of the PINGs, so a PSYNC will fail, and a full synchronization will be required. Related to issue #7002 and other discussion we had in the past around this problem. --- src/replication.c | 36 +++++++++++++++++++++++++++++++++++- src/server.c | 4 ++++ src/server.h | 1 + 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 49c38f73f..e62afb2fb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -162,6 +162,7 @@ void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len; + server.master_repl_meaningful_offset = server.master_repl_offset; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ @@ -1768,6 +1769,7 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(server.replid,server.master->replid,sizeof(server.replid)); server.master_repl_offset = server.master->reploff; + server.master_repl_meaningful_offset = server.master->reploff; clearReplicationId2(); /* Let's create the replication backlog if needed. Slaves need to @@ -2725,12 +2727,37 @@ void replicationCacheMaster(client *c) { * current offset if no data was lost during the failover. So we use our * current replication ID and offset in order to synthesize a cached master. */ void replicationCacheMasterUsingMyself(void) { + serverLog(LL_NOTICE, + "Before turning into a replica, using my own master parameters " + "to synthesize a cached master: I may be able to synchronize with " + "the new master with just a partial transfer."); + /* This will be used to populate the field server.master->reploff * by replicationCreateMasterClient(). We'll later set the created * master as server.cached_master, so the replica will use such * offset for PSYNC. */ server.master_initial_offset = server.master_repl_offset; + /* However if the "meaningful" offset, that is the offset without + * the final PINGs in the stream, is different, use this instead: + * often when the master is no longer reachable, replicas will never + * receive the PINGs, however the master will end with an incremented + * offset because of the PINGs and will not be able to incrementally + * PSYNC with the new master. */ + if (server.master_repl_offset > server.master_repl_meaningful_offset) { + long long delta = server.master_repl_offset - + server.master_repl_meaningful_offset; + serverLog(LL_NOTICE, + "Using the meaningful offset %lld instead of %lld to exclude " + "the final PINGs (%lld bytes difference)", + server.master_repl_meaningful_offset, + server.master_repl_offset, + delta); + server.master_initial_offset = server.master_repl_meaningful_offset; + server.repl_backlog_histlen -= delta; + if (server.repl_backlog_histlen < 0) server.repl_backlog_histlen = 0; + } + /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ replicationCreateMasterClient(NULL,-1); @@ -2742,7 +2769,6 @@ void replicationCacheMasterUsingMyself(void) { unlinkClient(server.master); server.cached_master = server.master; server.master = NULL; - serverLog(LL_NOTICE,"Before turning into a replica, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); } /* Free a cached master, called when there are no longer the conditions for @@ -3122,10 +3148,18 @@ void replicationCron(void) { clientsArePaused(); if (!manual_failover_in_progress) { + long long before_ping = server.master_repl_meaningful_offset; ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); + /* The server.master_repl_meaningful_offset variable represents + * the offset of the replication stream without the pending PINGs. + * This is useful to set the right replication offset for PSYNC + * when the master is turned into a replica. Otherwise pending + * PINGs may not allow it to perform an incremental sync with the + * new master. */ + server.master_repl_meaningful_offset = before_ping; } } diff --git a/src/server.c b/src/server.c index 65a01db57..84439461e 100644 --- a/src/server.c +++ b/src/server.c @@ -2355,6 +2355,7 @@ void initServerConfig(void) { server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.master_repl_offset = 0; + server.master_repl_meaningful_offset = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -4398,6 +4399,7 @@ sds genRedisInfoString(const char *section) { "master_replid:%s\r\n" "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" + "master_repl_meaningful_offset:%lld\r\n" "second_repl_offset:%lld\r\n" "repl_backlog_active:%d\r\n" "repl_backlog_size:%lld\r\n" @@ -4406,6 +4408,7 @@ sds genRedisInfoString(const char *section) { server.replid, server.replid2, server.master_repl_offset, + server.master_repl_meaningful_offset, server.second_replid_offset, server.repl_backlog != NULL, server.repl_backlog_size, @@ -4783,6 +4786,7 @@ void loadDataFromDisk(void) { { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; + server.master_repl_meaningful_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ diff --git a/src/server.h b/src/server.h index ed4707d66..818fbc3b3 100644 --- a/src/server.h +++ b/src/server.h @@ -1241,6 +1241,7 @@ struct redisServer { char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ long long master_repl_offset; /* My current replication offset */ + long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */ long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ From e257f121e19f3a4ec1d489bd07d75a21731de778 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2020 15:43:34 +0100 Subject: [PATCH 27/37] PSYNC2: meaningful offset test. --- tests/integration/psync2-pingoff.tcl | 61 ++++++++++++++++++++++++++++ tests/test_helper.tcl | 1 + 2 files changed, 62 insertions(+) create mode 100644 tests/integration/psync2-pingoff.tcl diff --git a/tests/integration/psync2-pingoff.tcl b/tests/integration/psync2-pingoff.tcl new file mode 100644 index 000000000..1cea290e7 --- /dev/null +++ b/tests/integration/psync2-pingoff.tcl @@ -0,0 +1,61 @@ +# Test the meaningful offset implementation to make sure masters +# are able to PSYNC with replicas even if the replication stream +# has pending PINGs at the end. + +start_server {tags {"psync2"}} { +start_server {} { + # Config + set debug_msg 0 ; # Enable additional debug messages + + for {set j 0} {$j < 2} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + $R($j) CONFIG SET repl-ping-replica-period 1 + if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} + } + + # Setup replication + test "PSYNC2 meaningful offset: setup" { + $R(1) replicaof $R_host(0) $R_port(0) + $R(0) set foo bar + wait_for_condition 50 1000 { + [$R(0) dbsize] == 1 && [$R(1) dbsize] == 1 + } else { + fail "Replicas not replicating from master" + } + } + + test "PSYNC2 meaningful offset: write and wait replication" { + $R(0) INCR counter + $R(0) INCR counter + $R(0) INCR counter + wait_for_condition 50 1000 { + [$R(0) GET counter] eq [$R(1) GET counter] + } else { + fail "Master and replica don't agree about counter" + } + } + + # In this test we'll make sure the replica will get stuck, but with + # an active connection: this way the master will continue to send PINGs + # every second (we modified the PING period earlier) + test "PSYNC2 meaningful offset: pause replica and promote it" { + $R(1) MULTI + $R(1) DEBUG SLEEP 5 + $R(1) SLAVEOF NO ONE + $R(1) EXEC + $R(1) ping ; # Wait for it to return back available + } + + test "Make the old master a replica of the new one and check conditions" { + set sync_partial [status $R(1) sync_partial_ok] + assert {$sync_partial == 0} + $R(0) REPLICAOF $R_host(1) $R_port(1) + wait_for_condition 50 1000 { + [status $R(1) sync_partial_ok] == 1 + } else { + fail "The new master was not able to partial sync" + } + } +}} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 4dbead193..5cb43104b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -47,6 +47,7 @@ set ::all_tests { integration/logging integration/psync2 integration/psync2-reg + integration/psync2-pingoff unit/pubsub unit/slowlog unit/scripting From 11db53f875ad136483c7ba55f6049c72c9c6137b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BC=AF=E6=88=90?= Date: Thu, 1 Mar 2018 11:46:56 +0800 Subject: [PATCH 28/37] Boost up performance for redis PUB-SUB patterns matching If lots of clients PSUBSCRIBE to same patterns, multiple pattens matching will take place. This commit change it into just one single pattern matching by using a `dict *` to store the unique pattern and which clients subscribe to it. --- src/pubsub.c | 52 +++++++++++++++++++++++++++++++++++++++++----------- src/server.c | 1 + src/server.h | 1 + 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/pubsub.c b/src/pubsub.c index 5cb4298e0..6fa397704 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -206,6 +206,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { + dictEntry *de; + list *clients; int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -217,6 +219,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) { pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); + /* Add the client to the pattern -> list of clients hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + if (de == NULL) { + clients = listCreate(); + dictAdd(server.pubsub_patterns_dict,pattern,clients); + incrRefCount(pattern); + } else { + clients = dictGetVal(de); + } + listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); @@ -226,6 +238,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) { /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { + dictEntry *de; + list *clients; listNode *ln; pubsubPattern pat; int retval = 0; @@ -238,6 +252,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { pat.pattern = pattern; ln = listSearchKey(server.pubsub_patterns,&pat); listDelNode(server.pubsub_patterns,ln); + /* Remove the client from the pattern -> clients list hash table */ + de = dictFind(server.pubsub_patterns_dict,pattern); + serverAssertWithInfo(c,NULL,de != NULL); + clients = dictGetVal(de); + ln = listSearchKey(clients,c); + serverAssertWithInfo(c,NULL,ln != NULL); + listDelNode(clients,ln); + if (listLength(clients) == 0) { + /* Free the list and associated hash entry at all if this was + * the latest client. */ + dictDelete(server.pubsub_patterns_dict,pattern); + } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); @@ -284,6 +310,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; + dictIterator *di; listNode *ln; listIter li; @@ -302,23 +329,26 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - if (listLength(server.pubsub_patterns)) { - listRewind(server.pubsub_patterns,&li); + di = dictGetIterator(server.pubsub_patterns_dict); + if (di) { channel = getDecodedObject(channel); - while ((ln = listNext(&li)) != NULL) { - pubsubPattern *pat = ln->value; - - if (stringmatchlen((char*)pat->pattern->ptr, - sdslen(pat->pattern->ptr), + while((de = dictNext(di)) != NULL) { + robj *pattern = dictGetKey(de); + list *clients = dictGetVal(de); + if (!stringmatchlen((char*)pattern->ptr, + sdslen(pattern->ptr), (char*)channel->ptr, - sdslen(channel->ptr),0)) - { - addReplyPubsubPatMessage(pat->client, - pat->pattern,channel,message); + sdslen(channel->ptr),0)) continue; + + listRewind(clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); + dictReleaseIterator(di); } return receivers; } diff --git a/src/server.c b/src/server.c index 84439461e..861aea631 100644 --- a/src/server.c +++ b/src/server.c @@ -2787,6 +2787,7 @@ void initServer(void) { evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); + server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; diff --git a/src/server.h b/src/server.h index 818fbc3b3..cb2f864a1 100644 --- a/src/server.h +++ b/src/server.h @@ -1344,6 +1344,7 @@ struct redisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ + dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ From 316a8f154530cb49c5beaa26757805a911a2e667 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Mar 2020 16:19:56 +0100 Subject: [PATCH 29/37] PSYNC2: fix backlog_idx when adjusting for meaningful offset See #7002. --- src/replication.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/replication.c b/src/replication.c index e62afb2fb..20f9a2129 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2755,6 +2755,9 @@ void replicationCacheMasterUsingMyself(void) { delta); server.master_initial_offset = server.master_repl_meaningful_offset; server.repl_backlog_histlen -= delta; + server.repl_backlog_idx = + (server.repl_backlog_idx + (server.repl_backlog_size - delta)) % + server.repl_backlog_size; if (server.repl_backlog_histlen < 0) server.repl_backlog_histlen = 0; } From 9d6d1779944c6d3847cc0fe9d4066a6cc1e95f35 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 11:33:18 +0100 Subject: [PATCH 30/37] Precise timeouts: refactor unblocking on timeout. --- src/server.c | 41 +++++++++++++++++++++++++++++------------ src/server.h | 3 +++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/server.c b/src/server.c index 861aea631..e657f1196 100644 --- a/src/server.c +++ b/src/server.c @@ -1502,6 +1502,23 @@ long long getInstantaneousMetric(int metric) { return sum / STATS_METRIC_SAMPLES; } +/* Check if this blocked client timedout (does nothing if the client is + * not blocked right now). If so send a reply, unblock it, and return 1. + * Otherwise 0 is returned and no operation is performed. */ +int checkBlockedClientTimeout(client *c, mstime_t now) { + if (c->flags & CLIENT_BLOCKED && + c->bpop.timeout != 0 + && c->bpop.timeout < now) + { + /* Handle blocking operation specific timeout. */ + replyToBlockedClientTimedOut(c); + unblockClient(c); + return 1; + } else { + return 0; + } +} + /* Check for timeouts. Returns non-zero if the client was terminated. * The function gets the current time in milliseconds as argument since * it gets called multiple times in a loop, so calling gettimeofday() for @@ -1510,10 +1527,11 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { time_t now = now_ms/1000; if (server.maxidletime && - !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ + /* This handles the idle clients connection timeout if set. */ + !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ + !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ + !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ (now - c->lastinteraction > server.maxidletime)) { serverLog(LL_VERBOSE,"Closing idle client"); @@ -1522,15 +1540,14 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { } else if (c->flags & CLIENT_BLOCKED) { /* Blocked OPS timeout is handled with milliseconds resolution. * However note that the actual resolution is limited by - * server.hz. */ + * server.hz. So for short timeouts (less than SERVER_SHORT_TIMEOUT + * milliseconds) we populate a Radix tree and handle such timeouts + * in clientsHandleShortTimeout(). */ + if (checkBlockedClientTimeout(c,now_ms)) return 0; - if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - } else if (server.cluster_enabled) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this server. */ + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (server.cluster_enabled) { if (clusterRedirectBlockedClientIfNeeded(c)) unblockClient(c); } diff --git a/src/server.h b/src/server.h index cb2f864a1..d697e79a2 100644 --- a/src/server.h +++ b/src/server.h @@ -277,6 +277,9 @@ typedef long long ustime_t; /* microsecond time type. */ buffer configuration. Just the first three: normal, slave, pubsub. */ +/* Other client related defines. */ +#define CLIENT_SHORT_TIMEOUT 2000 /* See clientsHandleShortTimeout(). */ + /* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ #define REPL_STATE_NONE 0 /* No active replication */ From 7add0f249078aa11cdeb11ec7535519315f0369e Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 13:28:39 +0100 Subject: [PATCH 31/37] Precise timeouts: working initial implementation. --- src/blocked.c | 1 + src/server.c | 135 +++++++++++++++++++++++++++++++++++++++----------- src/server.h | 2 + 3 files changed, 110 insertions(+), 28 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 06aa5850e..c470cba00 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -619,6 +619,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); } blockClient(c,btype); + addClientToShortTimeoutTable(c); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.c b/src/server.c index e657f1196..26581d979 100644 --- a/src/server.c +++ b/src/server.c @@ -1473,34 +1473,7 @@ int allPersistenceDisabled(void) { return server.saveparamslen == 0 && server.aof_state == AOF_OFF; } -/* ======================= Cron: called every 100 ms ======================== */ - -/* Add a sample to the operations per second array of samples. */ -void trackInstantaneousMetric(int metric, long long current_reading) { - long long t = mstime() - server.inst_metric[metric].last_sample_time; - long long ops = current_reading - - server.inst_metric[metric].last_sample_count; - long long ops_sec; - - ops_sec = t > 0 ? (ops*1000/t) : 0; - - server.inst_metric[metric].samples[server.inst_metric[metric].idx] = - ops_sec; - server.inst_metric[metric].idx++; - server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; - server.inst_metric[metric].last_sample_time = mstime(); - server.inst_metric[metric].last_sample_count = current_reading; -} - -/* Return the mean of all the samples. */ -long long getInstantaneousMetric(int metric) { - int j; - long long sum = 0; - - for (j = 0; j < STATS_METRIC_SAMPLES; j++) - sum += server.inst_metric[metric].samples[j]; - return sum / STATS_METRIC_SAMPLES; -} +/* ========================== Clients timeouts ============================= */ /* Check if this blocked client timedout (does nothing if the client is * not blocked right now). If so send a reply, unblock it, and return 1. @@ -1555,6 +1528,107 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { return 0; } +/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we + * populate a radix tree of 128 bit keys composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a short timeout, we add the client in + * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run + * the tree and unblock the clients. + * + * Design hint: why we block only clients with short timeouts? For frugality: + * Clients blocking for 30 seconds usually don't need to be unblocked + * precisely, and anyway for the nature of Redis to *guarantee* unblock time + * precision is hard, so we can avoid putting a large number of clients in + * the radix tree without a good reason. This idea also has a role in memory + * usage as well given that we don't do cleanup, the shorter a client timeout, + * the less time it will stay in the radix tree. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle short timeouts. The client is not added to the list if its + * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */ +void addClientToShortTimeoutTable(client *c) { + if (c->bpop.timeout == 0 || + c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT) + { + return; + } + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); +} + +/* This function is called in beforeSleep() in order to unblock ASAP clients + * that are waiting in blocking operations with a short timeout set. */ +void clientsHandleShortTimeout(void) { + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,server.clients_timeout_table); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) checkBlockedClientTimeout(c,now); + raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* ======================= Cron: called every 100 ms ======================== */ + +/* Add a sample to the operations per second array of samples. */ +void trackInstantaneousMetric(int metric, long long current_reading) { + long long t = mstime() - server.inst_metric[metric].last_sample_time; + long long ops = current_reading - + server.inst_metric[metric].last_sample_count; + long long ops_sec; + + ops_sec = t > 0 ? (ops*1000/t) : 0; + + server.inst_metric[metric].samples[server.inst_metric[metric].idx] = + ops_sec; + server.inst_metric[metric].idx++; + server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; + server.inst_metric[metric].last_sample_time = mstime(); + server.inst_metric[metric].last_sample_count = current_reading; +} + +/* Return the mean of all the samples. */ +long long getInstantaneousMetric(int metric) { + int j; + long long sum = 0; + + for (j = 0; j < STATS_METRIC_SAMPLES; j++) + sum += server.inst_metric[metric].samples[j]; + return sum / STATS_METRIC_SAMPLES; +} + /* The client query buffer is an sds.c string that can end with a lot of * free space not used, this function reclaims space if needed. * @@ -2109,11 +2183,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle precise timeouts of blocked clients. */ + clientsHandleShortTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); @@ -2738,6 +2816,7 @@ void initServer(void) { server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); + server.clients_timeout_table = raxNew(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); diff --git a/src/server.h b/src/server.h index d697e79a2..c20f70136 100644 --- a/src/server.h +++ b/src/server.h @@ -1070,6 +1070,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ + rax *clients_timeout_table; /* Radix tree for clients with short timeout. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ @@ -2140,6 +2141,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); +void addClientToShortTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From 30f1df8c48406b21b3ec896f652b25a484f4dd46 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 14:37:00 +0100 Subject: [PATCH 32/37] Precise timeouts: fix bugs in initial implementation. --- src/blocked.c | 2 +- src/server.c | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index c470cba00..84a5287d4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -111,6 +111,7 @@ void blockClient(client *c, int btype) { c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; + addClientToShortTimeoutTable(c); } /* This function is called in the beforeSleep() function of the event loop @@ -619,7 +620,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); } blockClient(c,btype); - addClientToShortTimeoutTable(c); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.c b/src/server.c index 26581d979..19f43e413 100644 --- a/src/server.c +++ b/src/server.c @@ -1588,6 +1588,7 @@ void clientsHandleShortTimeout(void) { uint64_t now = mstime(); raxIterator ri; raxStart(&ri,server.clients_timeout_table); + raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id, timeout; @@ -1745,6 +1746,9 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { */ #define CLIENTS_CRON_MIN_ITERATIONS 5 void clientsCron(void) { + /* Unblock short timeout clients ASAP. */ + clientsHandleShortTimeout(); + /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we From 6862fd70da38b0b09d08a307924f2e7b6bcf297a Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 15:52:16 +0100 Subject: [PATCH 33/37] Precise timeouts: fast exit for clientsHandleShortTimeout(). --- src/server.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.c b/src/server.c index 19f43e413..9a86ae3de 100644 --- a/src/server.c +++ b/src/server.c @@ -1585,6 +1585,7 @@ void addClientToShortTimeoutTable(client *c) { /* This function is called in beforeSleep() in order to unblock ASAP clients * that are waiting in blocking operations with a short timeout set. */ void clientsHandleShortTimeout(void) { + if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); raxIterator ri; raxStart(&ri,server.clients_timeout_table); From a443ec2e9a7a34e822ce41febb7b84ee4ee3c45c Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 16:02:26 +0100 Subject: [PATCH 34/37] Precise timeouts: use only radix tree for timeouts. --- src/blocked.c | 2 +- src/server.c | 46 +++++++++++++--------------------------------- src/server.h | 5 +---- 3 files changed, 15 insertions(+), 38 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 84a5287d4..443daec7f 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -111,7 +111,7 @@ void blockClient(client *c, int btype) { c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; - addClientToShortTimeoutTable(c); + addClientToTimeoutTable(c); } /* This function is called in the beforeSleep() function of the event loop diff --git a/src/server.c b/src/server.c index 9a86ae3de..f14ecc0cc 100644 --- a/src/server.c +++ b/src/server.c @@ -1511,13 +1511,6 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { freeClient(c); return 1; } else if (c->flags & CLIENT_BLOCKED) { - /* Blocked OPS timeout is handled with milliseconds resolution. - * However note that the actual resolution is limited by - * server.hz. So for short timeouts (less than SERVER_SHORT_TIMEOUT - * milliseconds) we populate a Radix tree and handle such timeouts - * in clientsHandleShortTimeout(). */ - if (checkBlockedClientTimeout(c,now_ms)) return 0; - /* Cluster: handle unblock & redirect of clients blocked * into keys no longer served by this server. */ if (server.cluster_enabled) { @@ -1528,8 +1521,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { return 0; } -/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we - * populate a radix tree of 128 bit keys composed as such: +/* For blocked clients timeouts we populate a radix tree of 128 bit keys + * composed as such: * * [8 byte big endian expire time]+[8 byte client ID] * @@ -1538,16 +1531,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * blocked with such timeout, we just go forward. * * Every time a client blocks with a short timeout, we add the client in - * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run - * the tree and unblock the clients. - * - * Design hint: why we block only clients with short timeouts? For frugality: - * Clients blocking for 30 seconds usually don't need to be unblocked - * precisely, and anyway for the nature of Redis to *guarantee* unblock time - * precision is hard, so we can avoid putting a large number of clients in - * the radix tree without a good reason. This idea also has a role in memory - * usage as well given that we don't do cleanup, the shorter a client timeout, - * the less time it will stay in the radix tree. */ + * the tree. In beforeSleep() we call clientsHandleTimeout() to run + * the tree and unblock the clients. */ #define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ @@ -1568,13 +1553,9 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { /* Add the specified client id / timeout as a key in the radix tree we use * to handle short timeouts. The client is not added to the list if its - * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */ -void addClientToShortTimeoutTable(client *c) { - if (c->bpop.timeout == 0 || - c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT) - { - return; - } + * timeout is zero (block forever). */ +void addClientToTimeoutTable(client *c) { + if (c->bpop.timeout == 0) return; uint64_t timeout = c->bpop.timeout; uint64_t id = c->id; unsigned char buf[CLIENT_ST_KEYLEN]; @@ -1584,7 +1565,7 @@ void addClientToShortTimeoutTable(client *c) { /* This function is called in beforeSleep() in order to unblock ASAP clients * that are waiting in blocking operations with a short timeout set. */ -void clientsHandleShortTimeout(void) { +void clientsHandleTimeout(void) { if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); raxIterator ri; @@ -1747,9 +1728,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { */ #define CLIENTS_CRON_MIN_ITERATIONS 5 void clientsCron(void) { - /* Unblock short timeout clients ASAP. */ - clientsHandleShortTimeout(); - /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we @@ -2189,7 +2167,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ - clientsHandleShortTimeout(); + clientsHandleTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -4102,11 +4080,13 @@ sds genRedisInfoString(const char *section) { "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" - "tracking_clients:%d\r\n", + "tracking_clients:%d\r\n" + "clients_in_timeout_table:%lld\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, server.blocked_clients, - server.tracking_clients); + server.tracking_clients, + raxSize(server.clients_timeout_table)); } /* Memory */ diff --git a/src/server.h b/src/server.h index c20f70136..4801a5aed 100644 --- a/src/server.h +++ b/src/server.h @@ -277,9 +277,6 @@ typedef long long ustime_t; /* microsecond time type. */ buffer configuration. Just the first three: normal, slave, pubsub. */ -/* Other client related defines. */ -#define CLIENT_SHORT_TIMEOUT 2000 /* See clientsHandleShortTimeout(). */ - /* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ #define REPL_STATE_NONE 0 /* No active replication */ @@ -2141,7 +2138,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); -void addClientToShortTimeoutTable(client *c); +void addClientToTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From ad94066ec8b2c665aa4d5f49d79d9de3f95428b0 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Mar 2020 16:05:20 +0100 Subject: [PATCH 35/37] Precise timeouts: fix comments after functional change. --- src/server.c | 10 +++++----- src/server.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server.c b/src/server.c index f14ecc0cc..cb3c143f8 100644 --- a/src/server.c +++ b/src/server.c @@ -1530,7 +1530,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * reached the timeout already, if they are no longer existing or no longer * blocked with such timeout, we just go forward. * - * Every time a client blocks with a short timeout, we add the client in + * Every time a client blocks with a timeout, we add the client in * the tree. In beforeSleep() we call clientsHandleTimeout() to run * the tree and unblock the clients. */ @@ -1552,8 +1552,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { } /* Add the specified client id / timeout as a key in the radix tree we use - * to handle short timeouts. The client is not added to the list if its - * timeout is zero (block forever). */ + * to handle blocked clients timeouts. The client is not added to the list + * if its timeout is zero (block forever). */ void addClientToTimeoutTable(client *c) { if (c->bpop.timeout == 0) return; uint64_t timeout = c->bpop.timeout; @@ -1563,8 +1563,8 @@ void addClientToTimeoutTable(client *c) { raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); } -/* This function is called in beforeSleep() in order to unblock ASAP clients - * that are waiting in blocking operations with a short timeout set. */ +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ void clientsHandleTimeout(void) { if (raxSize(server.clients_timeout_table) == 0) return; uint64_t now = mstime(); diff --git a/src/server.h b/src/server.h index 4801a5aed..4a8426bab 100644 --- a/src/server.h +++ b/src/server.h @@ -1067,7 +1067,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ - rax *clients_timeout_table; /* Radix tree for clients with short timeout. */ + rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ From 67643ead69ffd0e3d6a4d6fbd7835ec90beb7bd3 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Mar 2020 11:13:38 +0100 Subject: [PATCH 36/37] Precise timeouts: cleaup the table on unblock. Now that this mechanism is the sole one used for blocked clients timeouts, it is more wise to cleanup the table when the client unblocks for any reason. We use a flag: CLIENT_IN_TO_TABLE, in order to avoid a radix tree lookup when the client was already removed from the table because we processed it by scanning the radix tree. --- src/blocked.c | 1 + src/server.c | 20 ++++++++++++++++++-- src/server.h | 2 ++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 443daec7f..795985ea1 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -186,6 +186,7 @@ void unblockClient(client *c) { server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } diff --git a/src/server.c b/src/server.c index cb3c143f8..9a89290ca 100644 --- a/src/server.c +++ b/src/server.c @@ -1560,7 +1560,20 @@ void addClientToTimeoutTable(client *c) { uint64_t id = c->id; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,id); - raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); + if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); } /* This function is called in beforeSleep() in order to unblock clients @@ -1577,7 +1590,10 @@ void clientsHandleTimeout(void) { decodeTimeoutKey(ri.key,&timeout,&id); if (timeout >= now) break; /* All the timeouts are in the future. */ client *c = lookupClientByID(id); - if (c) checkBlockedClientTimeout(c,now); + if (c) { + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); raxSeek(&ri,"^",NULL,0); } diff --git a/src/server.h b/src/server.h index 4a8426bab..03a153c7b 100644 --- a/src/server.h +++ b/src/server.h @@ -252,6 +252,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, depending on optin/optout mode. */ +#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -2139,6 +2140,7 @@ void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); void addClientToTimeoutTable(client *c); +void removeClientFromTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); From 0f7dfc378ce5ab0524ac28b6142545bc41435386 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 4 Dec 2019 17:29:29 +0200 Subject: [PATCH 37/37] AOFRW on an empty stream created with MKSTREAM loads badkly the AOF will be loaded successfully, but the stream will be missing, i.e inconsistencies with the original db. this was because XADD with id of 0-0 would error. add a test to reproduce. --- src/aof.c | 3 ++- tests/unit/type/stream-cgroups.tcl | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/aof.c b/src/aof.c index 8ab9349f0..6bb239252 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1212,12 +1212,13 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Use the XADD MAXLEN 0 trick to generate an empty stream if * the key we are serializing is an empty string, which is possible * for the Stream type. */ + id.ms = 0; id.seq = 1; if (rioWriteBulkCount(r,'*',7) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; if (rioWriteBulkString(r,"x",1) == 0) return 0; if (rioWriteBulkString(r,"y",1) == 0) return 0; } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 072ed14d6..6b9a4a9cd 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -311,4 +311,17 @@ start_server { } } } + + start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} { + test {Empty stream with no lastid can be rewrite into AOF correctly} { + r XGROUP CREATE mystream group-name $ MKSTREAM + assert {[dict get [r xinfo stream mystream] length] == 0} + set grpinfo [r xinfo groups mystream] + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 0} + assert {[r xinfo groups mystream] == $grpinfo} + } + } }