Fix the TLS and REPS issues about CLUSTER SLOTS cache (#581)

PR #53 introduced the cache of CLUSTER SLOTS response, but the cache has
some problems for different types of clients:

1. the RESP version is wrongly ignored:

    ```
    $./valkey-cli
    127.0.0.1:6379> cluster slots
    1) 1) (integer) 0
       2) (integer) 16383
       3) 1) ""
          2) (integer) 6379
          3) "f1aeceb352401ce57acd432c68c60b359c00ef85"
          4) (empty array)
    127.0.0.1:6379> hello 3
    1# "server" => "valkey"
    2# "version" => "255.255.255"
    3# "proto" => (integer) 3
    4# "id" => (integer) 3
    5# "mode" => "cluster"
    6# "role" => "master"
    7# "modules" => (empty array)
    127.0.0.1:6379> cluster slots
    1) 1) (integer) 0
       2) (integer) 16383
       3) 1) ""
          2) (integer) 6379
          3) "f1aeceb352401ce57acd432c68c60b359c00ef85"
          4) (empty array)
    ```

    RESP3 should get "empty hash" but get RESP2's "empty array"

3. we should use the original client's connect type, or lua/function and
module would get wrong port:

    ```
    $./valkey-cli --tls --insecure -p 6789
    127.0.0.1:6789> config get port tls-port
    1) "tls-port"
    2) "6789"
    3) "port"
    4) "6379"
    127.0.0.1:6789> cluster slots
    1) 1) (integer) 0
       2) (integer) 16383
       3) 1) ""
          2) (integer) 6789
          3) "f1aeceb352401ce57acd432c68c60b359c00ef85"
          4) (empty array)
    127.0.0.1:6789> eval "return redis.call('cluster','slots')" 0
    1) 1) (integer) 0
       2) (integer) 16383
       3) 1) ""
          2) (integer) 6379
          3) "f1aeceb352401ce57acd432c68c60b359c00ef85"
          4) (empty array)
        ```

---------

Signed-off-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com>
This commit is contained in:
zhaozhao.zz 2024-06-28 14:56:13 +08:00 committed by GitHub
parent 1269532fbd
commit 4fbe31ab87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 22 additions and 17 deletions

View File

@ -1354,15 +1354,17 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
void clearCachedClusterSlotsResponse(void) { void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) { for (int resp = 0; resp <= 3; resp++) {
sdsfree(server.cached_cluster_slot_info[conn_type]); if (server.cached_cluster_slot_info[conn_type][resp]) {
server.cached_cluster_slot_info[conn_type] = NULL; sdsfree(server.cached_cluster_slot_info[conn_type][resp]);
server.cached_cluster_slot_info[conn_type][resp] = NULL;
}
} }
} }
} }
sds generateClusterSlotResponse(void) { sds generateClusterSlotResponse(int resp) {
client *recording_client = createCachedResponseClient(); client *recording_client = createCachedResponseClient(resp);
clusterNode *n = NULL; clusterNode *n = NULL;
int num_primaries = 0, start = -1; int num_primaries = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(recording_client); void *slot_replylen = addReplyDeferredLen(recording_client);
@ -1392,8 +1394,8 @@ sds generateClusterSlotResponse(void) {
return cluster_slot_response; return cluster_slot_response;
} }
int verifyCachedClusterSlotsResponse(sds cached_response) { int verifyCachedClusterSlotsResponse(sds cached_response, int resp) {
sds generated_response = generateClusterSlotResponse(); sds generated_response = generateClusterSlotResponse(resp);
int is_equal = !sdscmp(generated_response, cached_response); int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */ /* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal) if (!is_equal)
@ -1413,16 +1415,16 @@ void clusterCommandSlots(client *c) {
* 3) node ID * 3) node ID
* ... continued until done * ... continued until done
*/ */
connTypeForCaching conn_type = connIsTLS(c->conn); connTypeForCaching conn_type = shouldReturnTlsInfo();
if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse(); if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();
sds cached_reply = server.cached_cluster_slot_info[conn_type]; sds cached_reply = server.cached_cluster_slot_info[conn_type][c->resp];
if (!cached_reply) { if (!cached_reply) {
cached_reply = generateClusterSlotResponse(); cached_reply = generateClusterSlotResponse(c->resp);
server.cached_cluster_slot_info[conn_type] = cached_reply; server.cached_cluster_slot_info[conn_type][c->resp] = cached_reply;
} else { } else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1); debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply, c->resp) == 1);
} }
addReplyProto(c, cached_reply, sdslen(cached_reply)); addReplyProto(c, cached_reply, sdslen(cached_reply));

View File

@ -104,7 +104,7 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node); long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length); clusterNode *clusterLookupNode(const char *name, int length);
int detectAndUpdateCachedNodeHealth(void); int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void); client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client); void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void); void clearCachedClusterSlotsResponse(void);
unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countKeysInSlot(unsigned int hashslot);

View File

@ -1033,7 +1033,9 @@ void clusterInit(void) {
server.cluster->mf_end = 0; server.cluster->mf_end = 0;
server.cluster->mf_replica = NULL; server.cluster->mf_replica = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL; for (int resp = 0; resp <= 3; resp++) {
server.cached_cluster_slot_info[conn_type][resp] = NULL;
}
} }
resetManualFailover(); resetManualFailover();
clusterUpdateMyselfFlags(); clusterUpdateMyselfFlags();

View File

@ -339,8 +339,9 @@ sds aggregateClientOutputBuffer(client *c) {
* to initiate caching of any command response. * to initiate caching of any command response.
* *
* It needs be paired with `deleteCachedResponseClient` function to stop caching. */ * It needs be paired with `deleteCachedResponseClient` function to stop caching. */
client *createCachedResponseClient(void) { client *createCachedResponseClient(int resp) {
struct client *recording_client = createClient(NULL); struct client *recording_client = createClient(NULL);
recording_client->resp = resp;
/* Allocating the `conn` allows to prepare the caching client before adding /* Allocating the `conn` allows to prepare the caching client before adding
* data to the clients output buffer by `prepareClientToWrite`. */ * data to the clients output buffer by `prepareClientToWrite`. */
recording_client->conn = zcalloc(sizeof(connection)); recording_client->conn = zcalloc(sizeof(connection));

View File

@ -2075,7 +2075,7 @@ struct valkeyServer {
* dropping packets of a specific type */ * dropping packets of a specific type */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */ /* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1; uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX][4]; /* Align to RESP3 */
/* Scripting */ /* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */ int pre_command_oom_state; /* OOM before command (script?) was started */
@ -2738,7 +2738,7 @@ void initSharedQueryBuf(void);
client *lookupClientByID(uint64_t id); client *lookupClientByID(uint64_t id);
int authRequired(client *c); int authRequired(client *c);
void putClientInPendingWriteQueue(client *c); void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void); client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client); void deleteCachedResponseClient(client *recording_client);
/* logreqres.c - logging of requests and responses */ /* logreqres.c - logging of requests and responses */