Merge 4db74afdb26a74eb3a0621ab695e99a0b790727c into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
wuranxx 2025-02-02 18:01:31 +02:00 committed by GitHub
commit 0b6424a273
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 144 additions and 6 deletions

View File

@ -94,7 +94,7 @@ void moduleCallClusterReceivers(const char *sender_id,
const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot, int lazy);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
@ -123,6 +123,7 @@ int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);
/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
@ -2767,7 +2768,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
for (int j = 0; j < dirty_slots_count; j++) {
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j],
myself->name, myself->human_nodename, myself->shard_id);
delKeysInSlot(dirty_slots[j]);
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del);
}
}
}
@ -5753,7 +5754,7 @@ int verifyClusterConfigWithData(void) {
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name,
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id);
}
delKeysInSlot(j);
delKeysInSlot(j, server.lazyfree_lazy_server_del);
}
}
if (update_config) clusterSaveConfigOrDie(1);
@ -6340,13 +6341,14 @@ void removeChannelsInSlot(unsigned int slot) {
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int delKeysInSlot(unsigned int hashslot, int lazy) {
if (!countKeysInSlot(hashslot)) return 0;
/* We may lose a slot during the pause. We need to track this
* state so that we don't assert in propagateNow(). */
server.server_del_keys_in_slot = 1;
unsigned int j = 0;
int before_execution_nesting = server.execution_nesting;
kvstoreHashtableIterator *kvs_di = NULL;
void *next;
@ -6356,8 +6358,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
if (lazy) {
dbAsyncDelete(&server.db[0], key);
} else {
dbSyncDelete(&server.db[0], key);
}
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
propagateDeletion(&server.db[0], key, lazy);
signalModifiedKey(NULL, &server.db[0], key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
@ -6372,7 +6379,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
kvstoreReleaseHashtableIterator(kvs_di);
server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
serverAssert(server.execution_nesting == before_execution_nesting);
return j;
}
@ -7118,6 +7125,9 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
clusterCommandFlushslot(c);
} else {
return 0;
}
@ -7317,3 +7327,21 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {
return C_OK;
}
void clusterCommandFlushslot(client *c) {
int slot;
int lazy = server.lazyfree_lazy_user_flush;
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
if (c->argc == 4) {
if (!strcasecmp(c->argv[3]->ptr, "async")) {
lazy = 1;
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
lazy = 0;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
delKeysInSlot(slot, lazy);
addReply(c, shared.ok);
}

View File

@ -518,6 +518,35 @@ struct COMMAND_ARG CLUSTER_FAILOVER_Args[] = {
{MAKE_ARG("options",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FAILOVER_options_Subargs},
};
/********** CLUSTER FLUSHSLOT ********************/
#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER FLUSHSLOT history */
#define CLUSTER_FLUSHSLOT_History NULL
#endif
#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER FLUSHSLOT tips */
#define CLUSTER_FLUSHSLOT_Tips NULL
#endif
#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER FLUSHSLOT key specs */
#define CLUSTER_FLUSHSLOT_Keyspecs NULL
#endif
/* CLUSTER FLUSHSLOT flush_type argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_flush_type_Subargs[] = {
{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
};
/* CLUSTER FLUSHSLOT argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FLUSHSLOT_flush_type_Subargs},
};
/********** CLUSTER FLUSHSLOTS ********************/
#ifndef SKIP_CMD_HISTORY_TABLE
@ -1012,6 +1041,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args},
{MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args},
{MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args},
{MAKE_CMD("flushslot","Remove all keys from the target slot.","O(N) where N is the number of keys in the target slot","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOT_History,0,CLUSTER_FLUSHSLOT_Tips,0,clusterCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,CLUSTER_FLUSHSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_FLUSHSLOT_Args},
{MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)},
{MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args},
{MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args},

View File

@ -0,0 +1,46 @@
{
"FLUSHSLOT": {
"summary": "Remove all keys from the target slot.",
"complexity": "O(N) where N is the number of keys in the target slot",
"group": "cluster",
"since": "8.1.0",
"arity": -3,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"WRITE"
],
"acl_categories": [
"KEYSPACE",
"DANGEROUS"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "slot",
"type": "integer"
},
{
"name": "flush-type",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "async",
"type": "pure-token",
"token": "ASYNC",
"since": "8.1.0"
},
{
"name": "sync",
"type": "pure-token",
"token": "SYNC",
"since": "8.1.0"
}
]
}
]
}
}

View File

@ -0,0 +1,34 @@
start_cluster 2 2 {tags {external:skip cluster}} {
test "SYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]
# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num
# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot SYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
}
test "ASYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]
# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num
# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot ASYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
}
}