Gossip forgotten nodes on CLUSTER FORGET (#10869)

Gossip the cluster node blacklist in ping and pong messages.
This means that CLUSTER FORGET doesn't need to be sent to all nodes in a cluster.
It can be sent to one or more nodes and then be propagated to the rest of them.

For each blacklisted node, its node id and its remaining blacklist TTL is gossiped in a
cluster bus ping extension (introduced in #9530).
This commit is contained in:
Viktor Söderqvist 2022-07-26 09:28:13 +02:00 committed by GitHub
parent 33bd8fb981
commit 5032de50f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 6 deletions

View File

@ -2039,6 +2039,21 @@ int writeHostnamePingExt(clusterMsgPingExt **cursor) {
return extension_size; return extension_size;
} }
/* Write the forgotten node ping extension at the start of the cursor, update
* the cursor to point to the end of the written extension and return the number
* of bytes written. */
int writeForgottenNodePingExt(clusterMsgPingExt **cursor, sds name, uint64_t ttl) {
serverAssert(sdslen(name) == CLUSTER_NAMELEN);
clusterMsgPingExtForgottenNode *ext = &(*cursor)->ext[0].forgotten_node;
memcpy(ext->name, name, CLUSTER_NAMELEN);
ext->ttl = htonu64(ttl);
uint32_t extension_size = sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode);
(*cursor)->type = htons(CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE);
(*cursor)->length = htonl(extension_size);
*cursor = (clusterMsgPingExt *) (ext->name + sizeof(clusterMsgPingExtForgottenNode));
return extension_size;
}
/* We previously validated the extensions, so this function just needs to /* We previously validated the extensions, so this function just needs to
* handle the extensions. */ * handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) { void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
@ -2052,6 +2067,19 @@ void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) { if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname); clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname; ext_hostname = hostname_ext->hostname;
} else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) {
sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
dictEntry *de = dictAddRaw(server.cluster->nodes_black_list, id, NULL);
serverAssert(de != NULL);
uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
dictSetUnsignedIntegerVal(de, expire);
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
}
} else { } else {
/* Unknown type, we will ignore it but log what happened. */ /* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type); serverLog(LL_WARNING, "Received unknown extension type %d", type);
@ -2951,6 +2979,8 @@ void clusterSendPing(clusterLink *link, int type) {
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData); estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted)); estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
estlen += getHostnamePingExtSize(); estlen += getHostnamePingExtSize();
estlen += dictSize(server.cluster->nodes_black_list) *
(sizeof(clusterMsgPingExt) + sizeof(clusterMsgPingExtForgottenNode));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */ * sizeof(clusterMsg) or more. */
@ -3031,6 +3061,22 @@ void clusterSendPing(clusterLink *link, int type) {
extensions++; extensions++;
} }
/* Gossip forgotten nodes */
if (dictSize(server.cluster->nodes_black_list) > 0) {
dictIterator *di = dictGetIterator(server.cluster->nodes_black_list);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
sds name = dictGetKey(de);
uint64_t expire = dictGetUnsignedIntegerVal(de);
if ((time_t)expire < server.unixtime) continue; /* already expired */
uint64_t ttl = expire - server.unixtime;
hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
totlen += writeForgottenNodePingExt(&cursor, name, ttl);
extensions++;
}
dictReleaseIterator(di);
}
/* Compute the actual total length and send! */ /* Compute the actual total length and send! */
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount); totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
@ -5639,7 +5685,12 @@ NULL
/* CLUSTER FORGET <NODE ID> */ /* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) { if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); if (clusterBlacklistExists((char*)c->argv[2]->ptr))
/* Already forgotten. The deletion may have been gossipped by
* another node, so we pretend it succeeded. */
addReply(c,shared.ok);
else
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return; return;
} else if (n == myself) { } else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself..."); addReplyError(c,"I tried hard but I can't forget myself...");

View File

@ -253,6 +253,7 @@ typedef struct {
* consistent manner. */ * consistent manner. */
typedef enum { typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME, CLUSTERMSG_EXT_TYPE_HOSTNAME,
CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE,
} clusterMsgPingtypes; } clusterMsgPingtypes;
/* Helper function for making sure extensions are eight byte aligned. */ /* Helper function for making sure extensions are eight byte aligned. */
@ -262,12 +263,20 @@ typedef struct {
char hostname[1]; /* The announced hostname, ends with \0. */ char hostname[1]; /* The announced hostname, ends with \0. */
} clusterMsgPingExtHostname; } clusterMsgPingExtHostname;
typedef struct {
char name[CLUSTER_NAMELEN]; /* Node name. */
uint64_t ttl; /* Remaining time to blacklist the node, in seconds. */
} clusterMsgPingExtForgottenNode;
static_assert(sizeof(clusterMsgPingExtForgottenNode) % 8 == 0, "");
typedef struct { typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */ uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */ uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */ uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union { union {
clusterMsgPingExtHostname hostname; clusterMsgPingExtHostname hostname;
clusterMsgPingExtForgottenNode forgotten_node;
} ext[]; /* Actual extension information, formatted so that the data is 8 } ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */ * byte aligned, regardless of its content. */
} clusterMsgPingExt; } clusterMsgPingExt;

View File

@ -1,12 +1,25 @@
# Returns 1 if no node knows node_id, 0 if any node knows it.
proc node_is_forgotten {node_id} {
for {set j 0} {$j < [llength $::servers]} {incr j} {
set cluster_nodes [R $j CLUSTER NODES]
if { [string match "*$node_id*" $cluster_nodes] } {
return 0
}
}
return 1
}
# Isolate a node from the cluster and give it a new nodeid # Isolate a node from the cluster and give it a new nodeid
proc isolate_node {id} { proc isolate_node {id} {
set node_id [R $id CLUSTER MYID] set node_id [R $id CLUSTER MYID]
R $id CLUSTER RESET HARD R $id CLUSTER RESET HARD
for {set j 0} {$j < [llength $::servers]} {incr j} { # Here we additionally test that CLUSTER FORGET propagates to all nodes.
if { $j eq $id } { set other_id [expr $id == 0 ? 1 : 0]
continue R $other_id CLUSTER FORGET $node_id
} wait_for_condition 50 100 {
R $j CLUSTER FORGET $node_id [node_is_forgotten $node_id]
} else {
fail "CLUSTER FORGET was not propagated to all nodes"
} }
} }