diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 708999ed5..56da5225e 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -138,14 +138,28 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { } /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ -void clusterSlotStatsAddNetworkBytesOutForReplication(int len) { +static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; if (c == NULL || !canAddNetworkBytesOut(c)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + if (len < 0) serverAssert(server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)llabs(len)); server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); } +/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica + * count. */ +void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(len); +} + +/* Decrement network bytes out for replication stream. + * This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command. + * This will decrement `len` value times the active replica count. */ +void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { + clusterSlotStatsUpdateNetworkBytesOutForReplication(-len); +} + /* Upon SPUBLISH, two egress events are triggered. * 1) Internal propagation, for clients that are subscribed to the current node. * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index d1a8c6b15..2e9da70aa 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -18,5 +18,6 @@ void clusterSlotStatsResetClusterMsgLength(void); /* network-bytes-out metric. */ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c); -void clusterSlotStatsAddNetworkBytesOutForReplication(int len); +void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len); +void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len); void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot); diff --git a/src/replication.c b/src/replication.c index 5d7374f0e..ce57e5f89 100644 --- a/src/replication.c +++ b/src/replication.c @@ -416,7 +416,7 @@ void feedReplicationBuffer(char *s, size_t len) { if (server.repl_backlog == NULL) return; - clusterSlotStatsAddNetworkBytesOutForReplication(len); + clusterSlotStatsIncrNetworkBytesOutForReplication(len); while (len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ @@ -574,7 +574,7 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { /* Although the SELECT command is not associated with any slot, * its per-slot network-bytes-out accumulation is made by the above function call. * To cancel-out this accumulation, below adjustment is made. */ - clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); + clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr)); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);