Handle underflow condition of network out slot stats metric (#840)

Fixes test failure
(https://github.com/valkey-io/valkey/actions/runs/10146979329/job/28056316421?pr=837)
on 32 bit system for slot stats metric underflow on the following
condition:

```
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
```
* Here listLength accesses `len` which is of `unsigned long` type and
multiplied with `len` (which could be negative). This is a risky
operation and behaves differently based on the architecture.

```
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));
```
* `sdslen` method returns `size_t`. applying `-` operation to decrement
network bytes out is also incorrect.

This change adds assertion on `len` being negative and handles the
wrapping of overall value.

---------

Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
This commit is contained in:
Harkrishn Patro 2024-07-29 21:50:46 -07:00 committed by GitHub
parent b4d96caa78
commit aaa7834362
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19 additions and 4 deletions

View File

@ -138,14 +138,28 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
}
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
if (len < 0) serverAssert(server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)llabs(len));
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}
/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
* count. */
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
}
/* Decrement network bytes out for replication stream.
* This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
* This will decrement `len` value times the active replica count. */
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
}
/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).

View File

@ -18,5 +18,6 @@ void clusterSlotStatsResetClusterMsgLength(void);
/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);

View File

@ -416,7 +416,7 @@ void feedReplicationBuffer(char *s, size_t len) {
if (server.repl_backlog == NULL) return;
clusterSlotStatsAddNetworkBytesOutForReplication(len);
clusterSlotStatsIncrNetworkBytesOutForReplication(len);
while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
@ -574,7 +574,7 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {
/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));
clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr));
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);