From 4fbe31ab87ba2a5fd1360328f5e75993986fa0d0 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 28 Jun 2024 14:56:13 +0800 Subject: [PATCH] Fix the TLS and REPS issues about CLUSTER SLOTS cache (#581) PR #53 introduced the cache of CLUSTER SLOTS response, but the cache has some problems for different types of clients: 1. the RESP version is wrongly ignored: ``` $./valkey-cli 127.0.0.1:6379> cluster slots 1) 1) (integer) 0 2) (integer) 16383 3) 1) "" 2) (integer) 6379 3) "f1aeceb352401ce57acd432c68c60b359c00ef85" 4) (empty array) 127.0.0.1:6379> hello 3 1# "server" => "valkey" 2# "version" => "255.255.255" 3# "proto" => (integer) 3 4# "id" => (integer) 3 5# "mode" => "cluster" 6# "role" => "master" 7# "modules" => (empty array) 127.0.0.1:6379> cluster slots 1) 1) (integer) 0 2) (integer) 16383 3) 1) "" 2) (integer) 6379 3) "f1aeceb352401ce57acd432c68c60b359c00ef85" 4) (empty array) ``` RESP3 should get "empty hash" but get RESP2's "empty array" 3. we should use the original client's connect type, or lua/function and module would get wrong port: ``` $./valkey-cli --tls --insecure -p 6789 127.0.0.1:6789> config get port tls-port 1) "tls-port" 2) "6789" 3) "port" 4) "6379" 127.0.0.1:6789> cluster slots 1) 1) (integer) 0 2) (integer) 16383 3) 1) "" 2) (integer) 6789 3) "f1aeceb352401ce57acd432c68c60b359c00ef85" 4) (empty array) 127.0.0.1:6789> eval "return redis.call('cluster','slots')" 0 1) 1) (integer) 0 2) (integer) 16383 3) 1) "" 2) (integer) 6379 3) "f1aeceb352401ce57acd432c68c60b359c00ef85" 4) (empty array) ``` --------- Signed-off-by: zhaozhao.zz --- src/cluster.c | 26 ++++++++++++++------------ src/cluster.h | 2 +- src/cluster_legacy.c | 4 +++- src/networking.c | 3 ++- src/server.h | 4 ++-- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d9da706c7..c4949c08e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1354,15 +1354,17 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in void clearCachedClusterSlotsResponse(void) { for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { - if (server.cached_cluster_slot_info[conn_type]) { - sdsfree(server.cached_cluster_slot_info[conn_type]); - server.cached_cluster_slot_info[conn_type] = NULL; + for (int resp = 0; resp <= 3; resp++) { + if (server.cached_cluster_slot_info[conn_type][resp]) { + sdsfree(server.cached_cluster_slot_info[conn_type][resp]); + server.cached_cluster_slot_info[conn_type][resp] = NULL; + } } } } -sds generateClusterSlotResponse(void) { - client *recording_client = createCachedResponseClient(); +sds generateClusterSlotResponse(int resp) { + client *recording_client = createCachedResponseClient(resp); clusterNode *n = NULL; int num_primaries = 0, start = -1; void *slot_replylen = addReplyDeferredLen(recording_client); @@ -1392,8 +1394,8 @@ sds generateClusterSlotResponse(void) { return cluster_slot_response; } -int verifyCachedClusterSlotsResponse(sds cached_response) { - sds generated_response = generateClusterSlotResponse(); +int verifyCachedClusterSlotsResponse(sds cached_response, int resp) { + sds generated_response = generateClusterSlotResponse(resp); int is_equal = !sdscmp(generated_response, cached_response); /* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */ if (!is_equal) @@ -1413,16 +1415,16 @@ void clusterCommandSlots(client *c) { * 3) node ID * ... continued until done */ - connTypeForCaching conn_type = connIsTLS(c->conn); + connTypeForCaching conn_type = shouldReturnTlsInfo(); if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse(); - sds cached_reply = server.cached_cluster_slot_info[conn_type]; + sds cached_reply = server.cached_cluster_slot_info[conn_type][c->resp]; if (!cached_reply) { - cached_reply = generateClusterSlotResponse(); - server.cached_cluster_slot_info[conn_type] = cached_reply; + cached_reply = generateClusterSlotResponse(c->resp); + server.cached_cluster_slot_info[conn_type][c->resp] = cached_reply; } else { - debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1); + debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply, c->resp) == 1); } addReplyProto(c, cached_reply, sdslen(cached_reply)); diff --git a/src/cluster.h b/src/cluster.h index ec22d1f7f..a83b4ac28 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -104,7 +104,7 @@ const char *clusterNodePreferredEndpoint(clusterNode *n); long long clusterNodeReplOffset(clusterNode *node); clusterNode *clusterLookupNode(const char *name, int length); int detectAndUpdateCachedNodeHealth(void); -client *createCachedResponseClient(void); +client *createCachedResponseClient(int resp); void deleteCachedResponseClient(client *recording_client); void clearCachedClusterSlotsResponse(void); unsigned int countKeysInSlot(unsigned int hashslot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b913cd567..17e7d235d 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1033,7 +1033,9 @@ void clusterInit(void) { server.cluster->mf_end = 0; server.cluster->mf_replica = NULL; for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { - server.cached_cluster_slot_info[conn_type] = NULL; + for (int resp = 0; resp <= 3; resp++) { + server.cached_cluster_slot_info[conn_type][resp] = NULL; + } } resetManualFailover(); clusterUpdateMyselfFlags(); diff --git a/src/networking.c b/src/networking.c index ba40db6c6..629e3aac7 100644 --- a/src/networking.c +++ b/src/networking.c @@ -339,8 +339,9 @@ sds aggregateClientOutputBuffer(client *c) { * to initiate caching of any command response. * * It needs be paired with `deleteCachedResponseClient` function to stop caching. */ -client *createCachedResponseClient(void) { +client *createCachedResponseClient(int resp) { struct client *recording_client = createClient(NULL); + recording_client->resp = resp; /* Allocating the `conn` allows to prepare the caching client before adding * data to the clients output buffer by `prepareClientToWrite`. */ recording_client->conn = zcalloc(sizeof(connection)); diff --git a/src/server.h b/src/server.h index f6be39acc..57ccd557e 100644 --- a/src/server.h +++ b/src/server.h @@ -2075,7 +2075,7 @@ struct valkeyServer { * dropping packets of a specific type */ /* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */ uint32_t debug_cluster_close_link_on_packet_drop : 1; - sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; + sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX][4]; /* Align to RESP3 */ /* Scripting */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ int pre_command_oom_state; /* OOM before command (script?) was started */ @@ -2738,7 +2738,7 @@ void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); -client *createCachedResponseClient(void); +client *createCachedResponseClient(int resp); void deleteCachedResponseClient(client *recording_client); /* logreqres.c - logging of requests and responses */