From aaa783436244956fbf0d198439c91709f25765e6 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Mon, 29 Jul 2024 21:50:46 -0700 Subject: [PATCH] Handle underflow condition of network out slot stats metric (#840) Fixes test failure (https://github.com/valkey-io/valkey/actions/runs/10146979329/job/28056316421?pr=837) on 32 bit system for slot stats metric underflow on the following condition: ``` server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); ``` * Here listLength accesses `len` which is of `unsigned long` type and multiplied with `len` (which could be negative). This is a risky operation and behaves differently based on the architecture. ``` clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); ``` * `sdslen` method returns `size_t`. applying `-` operation to decrement network bytes out is also incorrect. This change adds assertion on `len` being negative and handles the wrapping of overall value. --------- Signed-off-by: Harkrishn Patro --- src/cluster_slot_stats.c | 16 +++++++++++++++- src/cluster_slot_stats.h | 3 ++- src/replication.c | 4 ++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 708999ed5..56da5225e 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -138,14 +138,28 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { } /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ -void clusterSlotStatsAddNetworkBytesOutForReplication(int len) { +static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; if (c == NULL || !canAddNetworkBytesOut(c)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + if (len < 0) serverAssert(server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)llabs(len)); server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); } +/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica + * count. */ +void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(len); +} + +/* Decrement network bytes out for replication stream. + * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command. + * This will decrement `len` value times the active replica count. */ +void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(-len); +} + /* Upon SPUBLISH, two egress events are triggered. * 1) Internal propagation, for clients that are subscribed to the current node. * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index d1a8c6b15..2e9da70aa 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -18,5 +18,6 @@ void clusterSlotStatsResetClusterMsgLength(void); /* network-bytes-out metric. */ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c); -void clusterSlotStatsAddNetworkBytesOutForReplication(int len); +void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len); +void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len); void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot); diff --git a/src/replication.c b/src/replication.c index 5d7374f0e..ce57e5f89 100644 --- a/src/replication.c +++ b/src/replication.c @@ -416,7 +416,7 @@ void feedReplicationBuffer(char *s, size_t len) { if (server.repl_backlog == NULL) return; - clusterSlotStatsAddNetworkBytesOutForReplication(len); + clusterSlotStatsIncrNetworkBytesOutForReplication(len); while (len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ @@ -574,7 +574,7 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { /* Although the SELECT command is not associated with any slot, * its per-slot network-bytes-out accumulation is made by the above function call. * To cancel-out this accumulation, below adjustment is made. */ - clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); + clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr)); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);