Add cluster-port support to redis-cli --cluster (#10344)

In #9389, we add a new `cluster-port` config and make cluster bus port configurable,
and currently redis-cli --cluster create/add-node doesn't support with a configurable `cluster-port` instance.
Because redis-cli uses the old way (port + 10000) to send the `CLUSTER MEET` command.

Now we add this support on redis-cli `--cluster`, note we don't need to explicitly pass in the
`cluster-port` parameter, we can get the real `cluster-port` of the node in `clusterManagerNodeLoadInfo`,
so the `--cluster create` and `--cluster add-node` interfaces have not changed.

We will use the `cluster-port` when we are doing `CLUSTER MEET`, also note that `CLUSTER MEET` bus-port
parameter was added in 4.0, so if the bus_port (the one in redis-cli) is 0, or equal (port + 10000),
we just call `CLUSTER MEET` with 2 arguments, using the old form.

Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com>
This commit is contained in:
Binbin 2022-07-11 16:23:31 +08:00 committed by GitHub
parent 1e85b89aef
commit 35e8ae3eb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 153 additions and 56 deletions

View File

@ -81,6 +81,7 @@
#define REDIS_CLI_CLUSTER_YES_ENV "REDISCLI_CLUSTER_YES"
#define CLUSTER_MANAGER_SLOTS 16384
#define CLUSTER_MANAGER_PORT_INCR 10000 /* same as CLUSTER_PORT_INCR */
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10
#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
@ -2867,6 +2868,7 @@ typedef struct clusterManagerNode {
sds name;
char *ip;
int port;
int bus_port; /* cluster-port */
uint64_t current_epoch;
time_t ping_sent;
time_t ping_recv;
@ -2937,7 +2939,7 @@ typedef int (*clusterManagerOnReplyError)(redisReply *reply,
/* Cluster Manager helper functions */
static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
@ -2997,15 +2999,15 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port",
{"check", clusterManagerCommandCheck, -1, "<host:port> or <host> <port> - separated by either colon or space",
"search-multiple-owners"},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port",
{"info", clusterManagerCommandInfo, -1, "<host:port> or <host> <port> - separated by either colon or space", NULL},
{"fix", clusterManagerCommandFix, -1, "<host:port> or <host> <port> - separated by either colon or space",
"search-multiple-owners,fix-with-unreachable-masters"},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
{"reshard", clusterManagerCommandReshard, -1, "<host:port> or <host> <port> - separated by either colon or space",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
"replace"},
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
{"rebalance", clusterManagerCommandRebalance, -1, "<host:port> or <host> <port> - separated by either colon or space",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
@ -3094,6 +3096,7 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) {
static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
int *bus_port_ptr)
{
/* ip:port[@bus_port] */
char *c = strrchr(addr, '@');
if (c != NULL) {
*c = '\0';
@ -3203,12 +3206,15 @@ static void freeClusterManager(void) {
dictRelease(clusterManagerUncoveredSlots);
}
static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port) {
clusterManagerNode *node = zmalloc(sizeof(*node));
node->context = NULL;
node->name = NULL;
node->ip = ip;
node->port = port;
/* We don't need to know the bus_port, at this point this value may be wrong.
* If it is used, it will be corrected in clusterManagerLoadInfoFromNode. */
node->bus_port = bus_port ? bus_port : port + CLUSTER_MANAGER_PORT_INCR;
node->current_epoch = 0;
node->ping_sent = 0;
node->ping_recv = 0;
@ -4611,9 +4617,20 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
success = 0;
goto cleanup;
}
char *ip = NULL;
int port = 0, bus_port = 0;
if (addr == NULL || !parseClusterNodeAddress(addr, &ip, &port, &bus_port)) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}
int myself = (strstr(flags, "myself") != NULL);
clusterManagerNode *currentNode = NULL;
if (myself) {
/* bus-port could be wrong, correct it here, see clusterManagerNewNode. */
node->bus_port = bus_port;
node->flags |= CLUSTER_MANAGER_FLAG_MYSELF;
currentNode = node;
clusterManagerNodeResetSlots(node);
@ -4681,22 +4698,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue;
else break;
} else {
if (addr == NULL) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c == NULL) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}
*c = '\0';
int port = atoi(++c);
currentNode = clusterManagerNewNode(sdsnew(addr), port);
currentNode = clusterManagerNewNode(sdsnew(ip), port, bus_port);
currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND;
if (node->friends == NULL) node->friends = listCreate();
listAddNodeTail(node->friends, currentNode);
@ -6110,17 +6112,14 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
cluster_manager.nodes = listCreate();
for (i = 0; i < argc; i++) {
char *addr = argv[i];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c == NULL) {
char *ip = NULL;
int port = 0;
if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) {
fprintf(stderr, "Invalid address format: %s\n", addr);
return 0;
}
*c = '\0';
char *ip = addr;
int port = atoi(++c);
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
@ -6327,8 +6326,16 @@ assign_replicas:
continue;
}
redisReply *reply = NULL;
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
first_ip, first->port);
if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) {
/* CLUSTER MEET bus-port parameter was added in 4.0.
* So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR),
* we just call CLUSTER MEET with 2 arguments, using the old form. */
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
first->ip, first->port);
} else {
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d %d",
first->ip, first->port, first->bus_port);
}
int is_err = 0;
if (reply != NULL) {
if ((is_err = reply->type == REDIS_REPLY_ERROR))
@ -6362,6 +6369,8 @@ assign_replicas:
}
success = 0;
goto cleanup;
} else if (err != NULL) {
zfree(err);
}
}
// Reset Nodes
@ -6405,7 +6414,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port,
ref_ip, ref_port);
// Check the existing cluster
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port);
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
@ -6429,7 +6438,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
}
// Add the new node
clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
clusterManagerNode *new_node = clusterManagerNewNode(ip, port, 0);
int added = 0;
if (!clusterManagerNodeConnect(new_node)) {
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
@ -6507,8 +6516,18 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
success = 0;
goto cleanup;
}
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
first_ip, first->port);
if (first->bus_port == 0 || (first->bus_port == first->port + CLUSTER_MANAGER_PORT_INCR)) {
/* CLUSTER MEET bus-port parameter was added in 4.0.
* So if (bus_port == 0) or (bus_port == port + CLUSTER_MANAGER_PORT_INCR),
* we just call CLUSTER MEET with 2 arguments, using the old form. */
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
first_ip, first->port);
} else {
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d %d",
first->ip, first->port, first->bus_port);
}
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
goto cleanup;
@ -6545,7 +6564,7 @@ static int clusterManagerCommandDeleteNode(int argc, char **argv) {
char *node_id = argv[1];
clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n",
node_id, ip, port);
clusterManagerNode *ref_node = clusterManagerNewNode(ip, port);
clusterManagerNode *ref_node = clusterManagerNewNode(ip, port, 0);
clusterManagerNode *node = NULL;
// Load cluster information
@ -6607,7 +6626,7 @@ static int clusterManagerCommandInfo(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return 1;
@ -6620,7 +6639,7 @@ static int clusterManagerCommandCheck(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return clusterManagerCheckCluster(0);
@ -6638,7 +6657,7 @@ static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
@ -6827,7 +6846,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
clusterManagerNode **weightedNodes = NULL;
list *involved = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int result = 1, i;
if (config.cluster_manager_command.weight != NULL) {
@ -7028,7 +7047,7 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) {
return 0;
}
// Load cluster information
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int ok_count = 0, err_count = 0;
@ -7098,7 +7117,7 @@ static int clusterManagerCommandImport(int argc, char **argv) {
clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n",
src_ip, src_port, ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
char *reply_err = NULL;
@ -7233,7 +7252,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
int port = 0, i;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
argc--;
argv++;
@ -7278,7 +7297,7 @@ static int clusterManagerCommandBackup(int argc, char **argv) {
int success = 1, port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
int no_issues = clusterManagerCheckCluster(0);
int cluster_errors_count = (no_issues ? 0 :

View File

@ -72,8 +72,8 @@ proc sanitizer_errors_from_file {filename} {
}
proc getInfoProperty {infostr property} {
if {[regexp "\r\n$property:(.*?)\r\n" $infostr _ value]} {
set _ $value
if {[regexp -lineanchor "^$property:(.*?)\r\n" $infostr _ value]} {
return $value
}
}

View File

@ -3,9 +3,7 @@
source tests/support/cli.tcl
proc cluster_info {r field} {
if {[regexp "^$field:(.*?)\r\n" [$r cluster info] _ value]} {
set _ $value
}
set _ [getInfoProperty [$r cluster info] $field]
}
# Provide easy access to CLUSTER INFO properties. Same semantic as "proc s".
@ -110,7 +108,7 @@ start_multiple_servers 3 [list overrides $base_conf] {
}
$node3_rd close
test "Run blocking command again on cluster node1" {
$node1 del key9184688
# key9184688 is mapped to slot 10923 which has been moved to node1
@ -123,9 +121,9 @@ start_multiple_servers 3 [list overrides $base_conf] {
fail "Client not blocked"
}
}
test "Kill a cluster node and wait for fail state" {
# kill node3 in cluster
# kill node3 in cluster
exec kill -SIGSTOP $node3_pid
wait_for_condition 1000 50 {
@ -135,7 +133,7 @@ start_multiple_servers 3 [list overrides $base_conf] {
fail "Cluster doesn't fail"
}
}
test "Verify command got unblocked after cluster failure" {
assert_error {*CLUSTERDOWN*} {$node1_rd read}
@ -208,7 +206,7 @@ start_multiple_servers 5 [list overrides $base_conf] {
127.0.0.1:[srv -4 port] \
127.0.0.1:[srv 0 port]
} e
assert_match {*node already contains functions*} $e
assert_match {*node already contains functions*} $e
}
} ;# stop servers
@ -315,6 +313,86 @@ test {Migrate the last slot away from a node using redis-cli} {
}
}
# Test redis-cli --cluster create, add-node with cluster-port.
# Create five nodes, three with custom cluster_port and two with default values.
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
# The first three are used to test --cluster create.
# The last two are used to test --cluster add-node
set node1_rd [redis_client 0]
set node2_rd [redis_client -1]
set node3_rd [redis_client -2]
set node4_rd [redis_client -3]
set node5_rd [redis_client -4]
test {redis-cli --cluster create with cluster-port} {
exec src/redis-cli --cluster-yes --cluster create \
127.0.0.1:[srv 0 port] \
127.0.0.1:[srv -1 port] \
127.0.0.1:[srv -2 port]
wait_for_condition 1000 50 {
[csi 0 cluster_state] eq {ok} &&
[csi -1 cluster_state] eq {ok} &&
[csi -2 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
# Make sure each node can meet other nodes
assert_equal 3 [csi 0 cluster_known_nodes]
assert_equal 3 [csi -1 cluster_known_nodes]
assert_equal 3 [csi -2 cluster_known_nodes]
}
test {redis-cli --cluster add-node with cluster-port} {
# Adding node to the cluster (without cluster-port)
exec src/redis-cli --cluster-yes --cluster add-node \
127.0.0.1:[srv -3 port] \
127.0.0.1:[srv 0 port]
wait_for_condition 1000 50 {
[csi 0 cluster_state] eq {ok} &&
[csi -1 cluster_state] eq {ok} &&
[csi -2 cluster_state] eq {ok} &&
[csi -3 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
# Adding node to the cluster (with cluster-port)
exec src/redis-cli --cluster-yes --cluster add-node \
127.0.0.1:[srv -4 port] \
127.0.0.1:[srv 0 port]
wait_for_condition 1000 50 {
[csi 0 cluster_state] eq {ok} &&
[csi -1 cluster_state] eq {ok} &&
[csi -2 cluster_state] eq {ok} &&
[csi -3 cluster_state] eq {ok} &&
[csi -4 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
# Make sure each node can meet other nodes
assert_equal 5 [csi 0 cluster_known_nodes]
assert_equal 5 [csi -1 cluster_known_nodes]
assert_equal 5 [csi -2 cluster_known_nodes]
assert_equal 5 [csi -3 cluster_known_nodes]
assert_equal 5 [csi -4 cluster_known_nodes]
}
# stop 5 servers
}
}
}
}
}
} ;# tags
set ::singledb $old_singledb