futriix/src/cluster_slot_stats.c
Binbin b803f7aeff
Cleaned up getSlotOrReply is return -1 instead of C_ERR (#1211)
Minor cleanup since getSlotOrReply return -1 on error, not return C_ERR.

Signed-off-by: Binbin <binloveplay1314@qq.com>
2024-10-23 17:11:42 +08:00

346 lines
14 KiB
C

/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/
#include "cluster_slot_stats.h"
#define UNASSIGNED_SLOT 0
typedef enum {
KEY_COUNT,
CPU_USEC,
NETWORK_BYTES_IN,
NETWORK_BYTES_OUT,
SLOT_STAT_COUNT,
INVALID
} slotStatType;
/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */
/* Struct used to temporarily hold slot statistics for sorting. */
typedef struct {
int slot;
uint64_t stat;
} slotStatForSort;
static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
return clusterNodeCoversSlot(primary, slot);
}
static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (doesSlotBelongToMyShard(slot)) {
assigned_slots[slot]++;
assigned_slots_count++;
}
}
return assigned_slots_count;
}
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
uint64_t slot_stat = 0;
switch (stat_type) {
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
case SLOT_STAT_COUNT:
case INVALID: serverPanic("Invalid slot stat type %d was found.", stat_type);
}
return slot_stat;
}
/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortAscCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
if (entry_a.stat == entry_b.stat) {
return entry_a.slot - entry_b.slot;
}
return entry_a.stat - entry_b.stat;
}
/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortDescCmp(const void *a, const void *b) {
slotStatForSort entry_a = *((slotStatForSort *)a);
slotStatForSort entry_b = *((slotStatForSort *)b);
if (entry_b.stat == entry_a.stat) {
return entry_a.slot - entry_b.slot;
}
return entry_b.stat - entry_a.stat;
}
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
int i = 0;
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (doesSlotBelongToMyShard(slot)) {
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
}
qsort(slot_stats, i, sizeof(slotStatForSort), (desc) ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}
static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? SLOT_STAT_COUNT
: 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}
/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int startslot, int endslot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int slot = startslot; slot <= endslot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}
static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}
static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
if (len < 0) serverAssert(server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)llabs(len));
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}
/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
* count. */
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
}
/* Decrement network bytes out for replication stream.
* This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
* This will decrement `len` value times the active replica count. */
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
}
/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}
/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}
/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
memset(&server.cluster->slot_stats[slot], 0, sizeof(slotStat));
}
void clusterSlotStatResetAll(void) {
memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}
/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
* This is due to their unique callstack, where the c->duration for
* EXEC, EVAL and FCALL already includes all of its nested commands.
* Meaning, the accumulation of cpu-usec for these nested commands
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
}
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].cpu_usec += duration;
}
/* For cross-slot scripting, its caller client's slot must be invalidated,
* such that its slot-stats aggregation is bypassed. */
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;
ctx->original_client->slot = -1;
}
static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}
/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;
if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}
void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 ||
(endslot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (startslot > endslot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", startslot, endslot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {UNASSIGNED_SLOT};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, startslot, endslot);
addReplySlotsRange(c, assigned_slots, startslot, endslot, assigned_slots_count);
} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit = CLUSTER_SLOTS;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);
} else {
addReplySubcommandSyntaxError(c);
}
}