From 35e8ae3eb5f80ebb5cad5b509d1fde56176bca0d Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 11 Jul 2022 16:23:31 +0800 Subject: [PATCH] Add cluster-port support to redis-cli --cluster (#10344) In #9389, we add a new `cluster-port` config and make cluster bus port configurable, and currently redis-cli --cluster create/add-node doesn't support with a configurable `cluster-port` instance. Because redis-cli uses the old way (port + 10000) to send the `CLUSTER MEET` command. Now we add this support on redis-cli `--cluster`, note we don't need to explicitly pass in the `cluster-port` parameter, we can get the real `cluster-port` of the node in `clusterManagerNodeLoadInfo`, so the `--cluster create` and `--cluster add-node` interfaces have not changed. We will use the `cluster-port` when we are doing `CLUSTER MEET`, also note that `CLUSTER MEET` bus-port parameter was added in 4.0, so if the bus_port (the one in redis-cli) is 0, or equal (port + 10000), we just call `CLUSTER MEET` with 2 arguments, using the old form. Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com> --- src/redis-cli.c | 111 ++++++++++++++++++++++++----------------- tests/support/util.tcl | 4 +- tests/unit/cluster.tcl | 94 +++++++++++++++++++++++++++++++--- 3 files changed, 153 insertions(+), 56 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 444d9af0a..0de78c67e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -81,6 +81,7 @@ #define REDIS_CLI_CLUSTER_YES_ENV "REDISCLI_CLUSTER_YES" #define CLUSTER_MANAGER_SLOTS 16384 +#define CLUSTER_MANAGER_PORT_INCR 10000 /* same as CLUSTER_PORT_INCR */ #define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000 #define CLUSTER_MANAGER_MIGRATE_PIPELINE 10 #define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2 @@ -2867,6 +2868,7 @@ typedef struct clusterManagerNode { sds name; char *ip; int port; + int bus_port; /* cluster-port */ uint64_t current_epoch; time_t ping_sent; time_t ping_recv; @@ -2937,7 +2939,7 @@ typedef int (*clusterManagerOnReplyError)(redisReply *reply, /* Cluster Manager helper functions */ -static clusterManagerNode *clusterManagerNewNode(char *ip, int port); +static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port); static clusterManagerNode *clusterManagerNodeByName(const char *name); static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n); static void clusterManagerNodeResetSlots(clusterManagerNode *node); @@ -2997,15 +2999,15 @@ typedef struct clusterManagerCommandDef { clusterManagerCommandDef clusterManagerCommands[] = { {"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN", "replicas "}, - {"check", clusterManagerCommandCheck, -1, "host:port", + {"check", clusterManagerCommandCheck, -1, " or - separated by either colon or space", "search-multiple-owners"}, - {"info", clusterManagerCommandInfo, -1, "host:port", NULL}, - {"fix", clusterManagerCommandFix, -1, "host:port", + {"info", clusterManagerCommandInfo, -1, " or - separated by either colon or space", NULL}, + {"fix", clusterManagerCommandFix, -1, " or - separated by either colon or space", "search-multiple-owners,fix-with-unreachable-masters"}, - {"reshard", clusterManagerCommandReshard, -1, "host:port", + {"reshard", clusterManagerCommandReshard, -1, " or - separated by either colon or space", "from ,to ,slots ,yes,timeout ,pipeline ," "replace"}, - {"rebalance", clusterManagerCommandRebalance, -1, "host:port", + {"rebalance", clusterManagerCommandRebalance, -1, " or - separated by either colon or space", "weight ,use-empty-masters," "timeout ,simulate,pipeline ,threshold ,replace"}, {"add-node", clusterManagerCommandAddNode, 2, @@ -3094,6 +3096,7 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) { static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr, int *bus_port_ptr) { + /* ip:port[@bus_port] */ char *c = strrchr(addr, '@'); if (c != NULL) { *c = '\0'; @@ -3203,12 +3206,15 @@ static void freeClusterManager(void) { dictRelease(clusterManagerUncoveredSlots); } -static clusterManagerNode *clusterManagerNewNode(char *ip, int port) { +static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port) { clusterManagerNode *node = zmalloc(sizeof(*node)); node->context = NULL; node->name = NULL; node->ip = ip; node->port = port; + /* We don't need to know the bus_port, at this point this value may be wrong. + * If it is used, it will be corrected in clusterManagerLoadInfoFromNode. */ + node->bus_port = bus_port ? bus_port : port + CLUSTER_MANAGER_PORT_INCR; node->current_epoch = 0; node->ping_sent = 0; node->ping_recv = 0; @@ -4611,9 +4617,20 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, success = 0; goto cleanup; } + + char *ip = NULL; + int port = 0, bus_port = 0; + if (addr == NULL || !parseClusterNodeAddress(addr, &ip, &port, &bus_port)) { + fprintf(stderr, "Error: invalid CLUSTER NODES reply\n"); + success = 0; + goto cleanup; + } + int myself = (strstr(flags, "myself") != NULL); clusterManagerNode *currentNode = NULL; if (myself) { + /* bus-port could be wrong, correct it here, see clusterManagerNewNode. */ + node->bus_port = bus_port; node->flags |= CLUSTER_MANAGER_FLAG_MYSELF; currentNode = node; clusterManagerNodeResetSlots(node); @@ -4681,22 +4698,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue; else break; } else { - if (addr == NULL) { - fprintf(stderr, "Error: invalid CLUSTER NODES reply\n"); - success = 0; - goto cleanup; - } - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c == NULL) { - fprintf(stderr, "Error: invalid CLUSTER NODES reply\n"); - success = 0; - goto cleanup; - } - *c = '\0'; - int port = atoi(++c); - currentNode = clusterManagerNewNode(sdsnew(addr), port); + currentNode = clusterManagerNewNode(sdsnew(ip), port, bus_port); currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND; if (node->friends == NULL) node->friends = listCreate(); listAddNodeTail(node->friends, currentNode); @@ -6110,17 +6112,14 @@ static int clusterManagerCommandCreate(int argc, char **argv) { cluster_manager.nodes = listCreate(); for (i = 0; i < argc; i++) { char *addr = argv[i]; - char *c = strrchr(addr, '@'); - if (c != NULL) *c = '\0'; - c = strrchr(addr, ':'); - if (c == NULL) { + char *ip = NULL; + int port = 0; + if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) { fprintf(stderr, "Invalid address format: %s\n", addr); return 0; } - *c = '\0'; - char *ip = addr; - int port = atoi(++c); - clusterManagerNode *node = clusterManagerNewNode(ip, port); + + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerNodeConnect(node)) { freeClusterManagerNode(node); return 0; @@ -6327,8 +6326,16 @@ assign_replicas: continue; } redisReply *reply = NULL; - reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d", - first_ip, first->port); + if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) { + /* CLUSTER MEET bus-port parameter was added in 4.0. + * So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR), + * we just call CLUSTER MEET with 2 arguments, using the old form. */ + reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d", + first->ip, first->port); + } else { + reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d %d", + first->ip, first->port, first->bus_port); + } int is_err = 0; if (reply != NULL) { if ((is_err = reply->type == REDIS_REPLY_ERROR)) @@ -6362,6 +6369,8 @@ assign_replicas: } success = 0; goto cleanup; + } else if (err != NULL) { + zfree(err); } } // Reset Nodes @@ -6405,7 +6414,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port, ref_ip, ref_port); // Check the existing cluster - clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port); + clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port, 0); if (!clusterManagerLoadInfoFromNode(refnode)) return 0; if (!clusterManagerCheckCluster(0)) return 0; @@ -6429,7 +6438,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { } // Add the new node - clusterManagerNode *new_node = clusterManagerNewNode(ip, port); + clusterManagerNode *new_node = clusterManagerNewNode(ip, port, 0); int added = 0; if (!clusterManagerNodeConnect(new_node)) { clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n", @@ -6507,8 +6516,18 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { success = 0; goto cleanup; } - reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d", - first_ip, first->port); + + if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) { + /* CLUSTER MEET bus-port parameter was added in 4.0. + * So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR), + * we just call CLUSTER MEET with 2 arguments, using the old form. */ + reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d", + first_ip, first->port); + } else { + reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d %d", + first->ip, first->port, first->bus_port); + } + if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL))) goto cleanup; @@ -6545,7 +6564,7 @@ static int clusterManagerCommandDeleteNode(int argc, char **argv) { char *node_id = argv[1]; clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n", node_id, ip, port); - clusterManagerNode *ref_node = clusterManagerNewNode(ip, port); + clusterManagerNode *ref_node = clusterManagerNewNode(ip, port, 0); clusterManagerNode *node = NULL; // Load cluster information @@ -6607,7 +6626,7 @@ static int clusterManagerCommandInfo(int argc, char **argv) { int port = 0; char *ip = NULL; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *node = clusterManagerNewNode(ip, port); + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(node)) return 0; clusterManagerShowClusterInfo(); return 1; @@ -6620,7 +6639,7 @@ static int clusterManagerCommandCheck(int argc, char **argv) { int port = 0; char *ip = NULL; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *node = clusterManagerNewNode(ip, port); + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(node)) return 0; clusterManagerShowClusterInfo(); return clusterManagerCheckCluster(0); @@ -6638,7 +6657,7 @@ static int clusterManagerCommandReshard(int argc, char **argv) { int port = 0; char *ip = NULL; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *node = clusterManagerNewNode(ip, port); + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(node)) return 0; clusterManagerCheckCluster(0); if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { @@ -6827,7 +6846,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) { clusterManagerNode **weightedNodes = NULL; list *involved = NULL; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *node = clusterManagerNewNode(ip, port); + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(node)) return 0; int result = 1, i; if (config.cluster_manager_command.weight != NULL) { @@ -7028,7 +7047,7 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) { return 0; } // Load cluster information - clusterManagerNode *node = clusterManagerNewNode(ip, port); + clusterManagerNode *node = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(node)) return 0; int ok_count = 0, err_count = 0; @@ -7098,7 +7117,7 @@ static int clusterManagerCommandImport(int argc, char **argv) { clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n", src_ip, src_port, ip, port); - clusterManagerNode *refnode = clusterManagerNewNode(ip, port); + clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(refnode)) return 0; if (!clusterManagerCheckCluster(0)) return 0; char *reply_err = NULL; @@ -7233,7 +7252,7 @@ static int clusterManagerCommandCall(int argc, char **argv) { int port = 0, i; char *ip = NULL; if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *refnode = clusterManagerNewNode(ip, port); + clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(refnode)) return 0; argc--; argv++; @@ -7278,7 +7297,7 @@ static int clusterManagerCommandBackup(int argc, char **argv) { int success = 1, port = 0; char *ip = NULL; if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args; - clusterManagerNode *refnode = clusterManagerNewNode(ip, port); + clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0); if (!clusterManagerLoadInfoFromNode(refnode)) return 0; int no_issues = clusterManagerCheckCluster(0); int cluster_errors_count = (no_issues ? 0 : diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 6741b719a..fd72dcf75 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -72,8 +72,8 @@ proc sanitizer_errors_from_file {filename} { } proc getInfoProperty {infostr property} { - if {[regexp "\r\n$property:(.*?)\r\n" $infostr _ value]} { - set _ $value + if {[regexp -lineanchor "^$property:(.*?)\r\n" $infostr _ value]} { + return $value } } diff --git a/tests/unit/cluster.tcl b/tests/unit/cluster.tcl index 9d49a2dee..180a19257 100644 --- a/tests/unit/cluster.tcl +++ b/tests/unit/cluster.tcl @@ -3,9 +3,7 @@ source tests/support/cli.tcl proc cluster_info {r field} { - if {[regexp "^$field:(.*?)\r\n" [$r cluster info] _ value]} { - set _ $value - } + set _ [getInfoProperty [$r cluster info] $field] } # Provide easy access to CLUSTER INFO properties. Same semantic as "proc s". @@ -110,7 +108,7 @@ start_multiple_servers 3 [list overrides $base_conf] { } $node3_rd close - + test "Run blocking command again on cluster node1" { $node1 del key9184688 # key9184688 is mapped to slot 10923 which has been moved to node1 @@ -123,9 +121,9 @@ start_multiple_servers 3 [list overrides $base_conf] { fail "Client not blocked" } } - + test "Kill a cluster node and wait for fail state" { - # kill node3 in cluster + # kill node3 in cluster exec kill -SIGSTOP $node3_pid wait_for_condition 1000 50 { @@ -135,7 +133,7 @@ start_multiple_servers 3 [list overrides $base_conf] { fail "Cluster doesn't fail" } } - + test "Verify command got unblocked after cluster failure" { assert_error {*CLUSTERDOWN*} {$node1_rd read} @@ -208,7 +206,7 @@ start_multiple_servers 5 [list overrides $base_conf] { 127.0.0.1:[srv -4 port] \ 127.0.0.1:[srv 0 port] } e - assert_match {*node already contains functions*} $e + assert_match {*node already contains functions*} $e } } ;# stop servers @@ -315,6 +313,86 @@ test {Migrate the last slot away from a node using redis-cli} { } } +# Test redis-cli --cluster create, add-node with cluster-port. +# Create five nodes, three with custom cluster_port and two with default values. +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { + + # The first three are used to test --cluster create. + # The last two are used to test --cluster add-node + set node1_rd [redis_client 0] + set node2_rd [redis_client -1] + set node3_rd [redis_client -2] + set node4_rd [redis_client -3] + set node5_rd [redis_client -4] + + test {redis-cli --cluster create with cluster-port} { + exec src/redis-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + wait_for_condition 1000 50 { + [csi 0 cluster_state] eq {ok} && + [csi -1 cluster_state] eq {ok} && + [csi -2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Make sure each node can meet other nodes + assert_equal 3 [csi 0 cluster_known_nodes] + assert_equal 3 [csi -1 cluster_known_nodes] + assert_equal 3 [csi -2 cluster_known_nodes] + } + + test {redis-cli --cluster add-node with cluster-port} { + # Adding node to the cluster (without cluster-port) + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -3 port] \ + 127.0.0.1:[srv 0 port] + + wait_for_condition 1000 50 { + [csi 0 cluster_state] eq {ok} && + [csi -1 cluster_state] eq {ok} && + [csi -2 cluster_state] eq {ok} && + [csi -3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Adding node to the cluster (with cluster-port) + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -4 port] \ + 127.0.0.1:[srv 0 port] + + wait_for_condition 1000 50 { + [csi 0 cluster_state] eq {ok} && + [csi -1 cluster_state] eq {ok} && + [csi -2 cluster_state] eq {ok} && + [csi -3 cluster_state] eq {ok} && + [csi -4 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Make sure each node can meet other nodes + assert_equal 5 [csi 0 cluster_known_nodes] + assert_equal 5 [csi -1 cluster_known_nodes] + assert_equal 5 [csi -2 cluster_known_nodes] + assert_equal 5 [csi -3 cluster_known_nodes] + assert_equal 5 [csi -4 cluster_known_nodes] + } +# stop 5 servers +} +} +} +} +} + } ;# tags set ::singledb $old_singledb