Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) (#720)
Adds two new metrics for per-slot statistics, network-bytes-in and network-bytes-out. The network bytes are inclusive of replication bytes but exclude other types of network traffic such as clusterbus traffic. #### network-bytes-in The metric tracks network ingress bytes under per-slot context, by reverse calculation of `c->argv_len_sum` and `c->argc`, stored under a newly introduced field `c->net_input_bytes_curr_cmd`. #### network-bytes-out The metric tracks network egress bytes under per-slot context, by hooking onto COB buffer mutations. #### sample response Both metrics are reported under the `CLUSTER SLOT-STATS` command. ``` 127.0.0.1:6379> cluster slot-stats slotsrange 0 0 1) 1) (integer) 0 2) 1) "key-count" 2) (integer) 0 3) "cpu-usec" 4) (integer) 0 5) "network-bytes-in" 6) (integer) 0 7) "network-bytes-out" 8) (integer) 0 ``` --------- Signed-off-by: Kyle Kim <kimkyle@amazon.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
parent
e745e9c240
commit
e1d936b339
@ -361,6 +361,8 @@ struct _clusterNode {
|
||||
/* Struct used for storing slot statistics. */
|
||||
typedef struct slotStat {
|
||||
uint64_t cpu_usec;
|
||||
uint64_t network_bytes_in;
|
||||
uint64_t network_bytes_out;
|
||||
} slotStat;
|
||||
|
||||
struct clusterState {
|
||||
@ -414,5 +416,4 @@ struct clusterState {
|
||||
slotStat slot_stats[CLUSTER_SLOTS];
|
||||
};
|
||||
|
||||
|
||||
#endif // CLUSTER_LEGACY_H
|
||||
|
@ -8,7 +8,7 @@
|
||||
|
||||
#define UNASSIGNED_SLOT 0
|
||||
|
||||
typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;
|
||||
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatType;
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* CLUSTER SLOT-STATS command
|
||||
@ -38,13 +38,15 @@ static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_s
|
||||
return assigned_slots_count;
|
||||
}
|
||||
|
||||
static uint64_t getSlotStat(int slot, int stat_type) {
|
||||
serverAssert(stat_type != INVALID);
|
||||
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
|
||||
uint64_t slot_stat = 0;
|
||||
if (stat_type == KEY_COUNT) {
|
||||
slot_stat = countKeysInSlot(slot);
|
||||
} else if (stat_type == CPU_USEC) {
|
||||
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
|
||||
switch (stat_type) {
|
||||
case KEY_COUNT: slot_stat = countKeysInSlot(slot); break;
|
||||
case CPU_USEC: slot_stat = server.cluster->slot_stats[slot].cpu_usec; break;
|
||||
case NETWORK_BYTES_IN: slot_stat = server.cluster->slot_stats[slot].network_bytes_in; break;
|
||||
case NETWORK_BYTES_OUT: slot_stat = server.cluster->slot_stats[slot].network_bytes_out; break;
|
||||
case SLOT_STAT_COUNT:
|
||||
case INVALID: serverPanic("Invalid slot stat type %d was found.", stat_type);
|
||||
}
|
||||
return slot_stat;
|
||||
}
|
||||
@ -69,7 +71,7 @@ static int slotStatForSortDescCmp(const void *a, const void *b) {
|
||||
return entry_b.stat - entry_a.stat;
|
||||
}
|
||||
|
||||
static void collectAndSortSlotStats(slotStatForSort slot_stats[], int order_by, int desc) {
|
||||
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
|
||||
int i = 0;
|
||||
|
||||
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
|
||||
@ -96,6 +98,10 @@ static void addReplySlotStat(client *c, int slot) {
|
||||
if (server.cluster_slot_stats_enabled) {
|
||||
addReplyBulkCString(c, "cpu-usec");
|
||||
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
|
||||
addReplyBulkCString(c, "network-bytes-in");
|
||||
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
|
||||
addReplyBulkCString(c, "network-bytes-out");
|
||||
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,9 +125,56 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
|
||||
}
|
||||
}
|
||||
|
||||
static int canAddNetworkBytesOut(client *c) {
|
||||
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
|
||||
}
|
||||
|
||||
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
|
||||
if (!canAddNetworkBytesOut(c)) return;
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
|
||||
}
|
||||
|
||||
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
|
||||
client *c = server.current_client;
|
||||
if (c == NULL || !canAddNetworkBytesOut(c)) return;
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
|
||||
}
|
||||
|
||||
/* Upon SPUBLISH, two egress events are triggered.
|
||||
* 1) Internal propagation, for clients that are subscribed to the current node.
|
||||
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
|
||||
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
|
||||
* This function covers the internal propagation component. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
|
||||
/* For a blocked client, c->slot could be pre-filled.
|
||||
* Thus c->slot is backed-up for restoration after aggregation is completed. */
|
||||
int _slot = c->slot;
|
||||
c->slot = slot;
|
||||
if (!canAddNetworkBytesOut(c)) {
|
||||
/* c->slot should not change as a side effect of this function,
|
||||
* regardless of the function's early return condition. */
|
||||
c->slot = _slot;
|
||||
return;
|
||||
}
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
|
||||
|
||||
/* For sharded pubsub, the client's network bytes metrics must be reset here,
|
||||
* as resetClient() is not called until subscription ends. */
|
||||
c->net_output_bytes_curr_cmd = 0;
|
||||
c->slot = _slot;
|
||||
}
|
||||
|
||||
/* Adds reply for the ORDERBY variant.
|
||||
* Response is ordered based on the sort result. */
|
||||
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
|
||||
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
|
||||
slotStatForSort slot_stats[CLUSTER_SLOTS];
|
||||
collectAndSortSlotStats(slot_stats, order_by, desc);
|
||||
addReplySortedSlotStats(c, slot_stats, limit);
|
||||
@ -167,8 +220,35 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
|
||||
ctx->original_client->slot = -1;
|
||||
}
|
||||
|
||||
static int canAddNetworkBytesIn(client *c) {
|
||||
/* First, cluster mode must be enabled.
|
||||
* Second, command should target a specific slot.
|
||||
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
|
||||
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
|
||||
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
|
||||
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
|
||||
!server.in_exec;
|
||||
}
|
||||
|
||||
/* Adds network ingress bytes of the current command in execution,
|
||||
* calculated earlier within networking.c layer.
|
||||
*
|
||||
* Note: Below function should only be called once c->slot is parsed.
|
||||
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
|
||||
* */
|
||||
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
|
||||
if (!canAddNetworkBytesIn(c)) return;
|
||||
|
||||
if (c->cmd->proc == execCommand) {
|
||||
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
|
||||
c->net_input_bytes_curr_cmd += 15;
|
||||
}
|
||||
|
||||
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
|
||||
}
|
||||
|
||||
void clusterSlotStatsCommand(client *c) {
|
||||
if (server.cluster_enabled == 0) {
|
||||
if (!server.cluster_enabled) {
|
||||
addReplyError(c, "This instance has cluster support disabled");
|
||||
return;
|
||||
}
|
||||
@ -192,11 +272,16 @@ void clusterSlotStatsCommand(client *c) {
|
||||
|
||||
} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
|
||||
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
|
||||
int desc = 1, order_by = INVALID;
|
||||
int desc = 1;
|
||||
slotStatType order_by = INVALID;
|
||||
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
|
||||
order_by = KEY_COUNT;
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
|
||||
order_by = CPU_USEC;
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
|
||||
order_by = NETWORK_BYTES_IN;
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
|
||||
order_by = NETWORK_BYTES_OUT;
|
||||
} else {
|
||||
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
|
||||
return;
|
||||
|
@ -3,7 +3,20 @@
|
||||
#include "script.h"
|
||||
#include "cluster_legacy.h"
|
||||
|
||||
/* General use-cases. */
|
||||
void clusterSlotStatReset(int slot);
|
||||
void clusterSlotStatResetAll(void);
|
||||
|
||||
/* cpu-usec metric. */
|
||||
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
|
||||
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);
|
||||
|
||||
/* network-bytes-in metric. */
|
||||
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
|
||||
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
|
||||
void clusterSlotStatsResetClusterMsgLength(void);
|
||||
|
||||
/* network-bytes-out metric. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
|
||||
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
|
||||
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
|
||||
|
@ -38,6 +38,12 @@
|
||||
},
|
||||
"cpu-usec": {
|
||||
"type": "integer"
|
||||
},
|
||||
"network-bytes-in": {
|
||||
"type": "integer"
|
||||
},
|
||||
"network-bytes-out": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
16
src/db.c
16
src/db.c
@ -231,15 +231,27 @@ int calculateKeySlot(sds key) {
|
||||
int getKeySlot(sds key) {
|
||||
/* This is performance optimization that uses pre-set slot id from the current command,
|
||||
* in order to avoid calculation of the key hash.
|
||||
*
|
||||
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
|
||||
* It only gets set during the execution of command under `call` method. Other flows requesting
|
||||
* the key slot would fallback to calculateKeySlot.
|
||||
*
|
||||
* Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots,
|
||||
* so we must always recompute the slot for commands coming from the primary.
|
||||
*/
|
||||
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
|
||||
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
|
||||
!server.current_client->flag.primary) {
|
||||
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
|
||||
return server.current_client->slot;
|
||||
}
|
||||
return calculateKeySlot(key);
|
||||
int slot = calculateKeySlot(key);
|
||||
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
|
||||
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
|
||||
* we are able to backfill c->slot here, where the key's hash calculation is made. */
|
||||
if (server.current_client && server.current_client->flag.primary) {
|
||||
server.current_client->slot = slot;
|
||||
}
|
||||
return slot;
|
||||
}
|
||||
|
||||
/* This is a special version of dbAdd() that is used only when loading
|
||||
|
@ -29,6 +29,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
#include "script.h"
|
||||
#include "fpconv_dtoa.h"
|
||||
#include "fmtargs.h"
|
||||
@ -231,7 +232,9 @@ client *createClient(connection *conn) {
|
||||
if (conn) linkClient(c);
|
||||
initClientMultiState(c);
|
||||
c->net_input_bytes = 0;
|
||||
c->net_input_bytes_curr_cmd = 0;
|
||||
c->net_output_bytes = 0;
|
||||
c->net_output_bytes_curr_cmd = 0;
|
||||
c->commands_processed = 0;
|
||||
return c;
|
||||
}
|
||||
@ -449,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
|
||||
return;
|
||||
}
|
||||
|
||||
c->net_output_bytes_curr_cmd += len;
|
||||
|
||||
/* We call it here because this function may affect the reply
|
||||
* buffer offset (see function comment) */
|
||||
reqresSaveClientReplyOffset(c);
|
||||
@ -2477,10 +2482,12 @@ void resetClient(client *c) {
|
||||
c->cur_script = NULL;
|
||||
c->reqtype = 0;
|
||||
c->multibulklen = 0;
|
||||
c->net_input_bytes_curr_cmd = 0;
|
||||
c->bulklen = -1;
|
||||
c->slot = -1;
|
||||
c->flag.executing_command = 0;
|
||||
c->flag.replication_done = 0;
|
||||
c->net_output_bytes_curr_cmd = 0;
|
||||
|
||||
/* Make sure the duration has been recorded to some command. */
|
||||
serverAssert(c->duration == 0);
|
||||
@ -2623,6 +2630,21 @@ void processInlineBuffer(client *c) {
|
||||
c->argv_len_sum += sdslen(argv[j]);
|
||||
}
|
||||
zfree(argv);
|
||||
|
||||
/* Per-slot network bytes-in calculation.
|
||||
*
|
||||
* We calculate and store the current command's ingress bytes under
|
||||
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
|
||||
* until c->slot is parsed later within processCommand().
|
||||
*
|
||||
* Calculation: For inline buffer, every whitespace is of length 1,
|
||||
* with the exception of the trailing '\r\n' being length 2.
|
||||
*
|
||||
* For example;
|
||||
* Command) SET key value
|
||||
* Inline) SET key value\r\n
|
||||
* */
|
||||
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
|
||||
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
|
||||
}
|
||||
|
||||
@ -2694,7 +2716,8 @@ void processMultibulkBuffer(client *c) {
|
||||
/* We know for sure there is a whole line since newline != NULL,
|
||||
* so go ahead and find out the multi bulk length. */
|
||||
serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*');
|
||||
ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll);
|
||||
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
|
||||
ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll);
|
||||
if (!ok || ll > INT_MAX) {
|
||||
c->read_flags |= READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN;
|
||||
return;
|
||||
@ -2717,6 +2740,39 @@ void processMultibulkBuffer(client *c) {
|
||||
c->argv_len = min(c->multibulklen, 1024);
|
||||
c->argv = zmalloc(sizeof(robj *) * c->argv_len);
|
||||
c->argv_len_sum = 0;
|
||||
|
||||
/* Per-slot network bytes-in calculation.
|
||||
*
|
||||
* We calculate and store the current command's ingress bytes under
|
||||
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
|
||||
* until c->slot is parsed later within processCommand().
|
||||
*
|
||||
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
|
||||
*
|
||||
* 1) multibulklen_slen + 1
|
||||
* Cumulative string length (and not the value of) of multibulklen,
|
||||
* including +1 from RESP first byte.
|
||||
* 2) bulklen_slen + c->argc
|
||||
* Cumulative string length (and not the value of) of bulklen,
|
||||
* including +1 from RESP first byte per argument count.
|
||||
* 3) c->argv_len_sum
|
||||
* Cumulative string length of all argument vectors.
|
||||
* 4) c->argc * 4 + 2
|
||||
* Cumulative string length of all white-spaces, for which there exists a total of
|
||||
* 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen.
|
||||
*
|
||||
* For example;
|
||||
* Command) SET key value
|
||||
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
|
||||
*
|
||||
* 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1).
|
||||
* 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc).
|
||||
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
|
||||
* 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2).
|
||||
*
|
||||
* The 1st component is calculated within the below line.
|
||||
* */
|
||||
c->net_input_bytes_curr_cmd += (multibulklen_slen + 1);
|
||||
}
|
||||
|
||||
serverAssertWithInfo(c, NULL, c->multibulklen > 0);
|
||||
@ -2740,7 +2796,8 @@ void processMultibulkBuffer(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
|
||||
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
|
||||
ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll);
|
||||
if (!ok || ll < 0 || (!(is_primary) && ll > server.proto_max_bulk_len)) {
|
||||
c->read_flags |= READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN;
|
||||
return;
|
||||
@ -2780,6 +2837,9 @@ void processMultibulkBuffer(client *c) {
|
||||
}
|
||||
}
|
||||
c->bulklen = ll;
|
||||
/* Per-slot network bytes-in calculation, 2nd component.
|
||||
* c->argc portion is deferred, as it may not have been fully populated at this point. */
|
||||
c->net_input_bytes_curr_cmd += bulklen_slen;
|
||||
}
|
||||
|
||||
/* Read bulk argument */
|
||||
@ -2816,7 +2876,12 @@ void processMultibulkBuffer(client *c) {
|
||||
}
|
||||
|
||||
/* We're done when c->multibulk == 0 */
|
||||
if (c->multibulklen == 0) c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
|
||||
if (c->multibulklen == 0) {
|
||||
/* Per-slot network bytes-in calculation, 3rd and 4th components.
|
||||
* Here, the deferred c->argc from 2nd component is added, resulting in c->argc * 5 instead of * 4. */
|
||||
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 5 + 2));
|
||||
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
|
||||
}
|
||||
}
|
||||
|
||||
/* Perform necessary tasks after a command was executed:
|
||||
@ -2835,6 +2900,7 @@ void commandProcessed(client *c) {
|
||||
if (c->flag.blocked) return;
|
||||
|
||||
reqresAppendResponse(c);
|
||||
clusterSlotStatsAddNetworkBytesInForUserClient(c);
|
||||
resetClient(c);
|
||||
|
||||
long long prev_offset = c->reploff;
|
||||
|
@ -29,6 +29,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
|
||||
/* Structure to hold the pubsub related metadata. Currently used
|
||||
* for pubsub and pubsubshard feature. */
|
||||
@ -475,13 +476,13 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
|
||||
int receivers = 0;
|
||||
dictEntry *de;
|
||||
dictIterator *di;
|
||||
unsigned int slot = 0;
|
||||
int slot = -1;
|
||||
|
||||
/* Send to clients listening for that channel */
|
||||
if (server.cluster_enabled && type.shard) {
|
||||
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
|
||||
}
|
||||
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
|
||||
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
|
||||
if (de) {
|
||||
dict *clients = dictGetVal(de);
|
||||
dictEntry *entry;
|
||||
@ -489,6 +490,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
|
||||
while ((entry = dictNext(iter)) != NULL) {
|
||||
client *c = dictGetKey(entry);
|
||||
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
|
||||
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
|
||||
updateClientMemUsageAndBucket(c);
|
||||
receivers++;
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
#include "cluster_slot_stats.h"
|
||||
#include "bio.h"
|
||||
#include "functions.h"
|
||||
#include "connection.h"
|
||||
@ -415,6 +416,8 @@ void feedReplicationBuffer(char *s, size_t len) {
|
||||
|
||||
if (server.repl_backlog == NULL) return;
|
||||
|
||||
clusterSlotStatsAddNetworkBytesOutForReplication(len);
|
||||
|
||||
while (len > 0) {
|
||||
size_t start_pos = 0; /* The position of referenced block to start sending. */
|
||||
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
|
||||
@ -568,6 +571,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {
|
||||
|
||||
feedReplicationBufferWithObject(selectcmd);
|
||||
|
||||
/* Although the SELECT command is not associated with any slot,
|
||||
* its per-slot network-bytes-out accumulation is made by the above function call.
|
||||
* To cancel-out this accumulation, below adjustment is made. */
|
||||
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));
|
||||
|
||||
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);
|
||||
|
||||
server.replicas_eldb = dictid;
|
||||
@ -3972,6 +3980,10 @@ void replicationSendAck(void) {
|
||||
addReplyBulkLongLong(c, server.fsynced_reploff);
|
||||
}
|
||||
c->flag.primary_force_reply = 0;
|
||||
|
||||
/* Accumulation from above replies must be reset back to 0 manually,
|
||||
* as this subroutine does not invoke resetClient(). */
|
||||
c->net_output_bytes_curr_cmd = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3694,6 +3694,8 @@ void afterCommand(client *c) {
|
||||
/* Flush pending tracking invalidations. */
|
||||
trackingHandlePendingKeyInvalidations();
|
||||
|
||||
clusterSlotStatsAddNetworkBytesOutForUserClient(c);
|
||||
|
||||
/* Flush other pending push messages. only when we are not in nested call.
|
||||
* So the messages are not interleaved with transaction response. */
|
||||
if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages);
|
||||
|
10
src/server.h
10
src/server.h
@ -1372,9 +1372,13 @@ typedef struct client {
|
||||
#ifdef LOG_REQ_RES
|
||||
clientReqResInfo reqres;
|
||||
#endif
|
||||
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
|
||||
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
|
||||
unsigned long long commands_processed; /* Total count of commands this client executed. */
|
||||
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
|
||||
unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the
|
||||
* execution of this client's current command. */
|
||||
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
|
||||
unsigned long long commands_processed; /* Total count of commands this client executed. */
|
||||
unsigned long long
|
||||
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
|
||||
} client;
|
||||
|
||||
/* ACL information */
|
||||
|
@ -68,7 +68,7 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_
|
||||
}
|
||||
}
|
||||
|
||||
proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} {
|
||||
proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} {
|
||||
set slot_stats_1 [convert_array_into_dict $slot_stats_1]
|
||||
set slot_stats_2 [convert_array_into_dict $slot_stats_2]
|
||||
assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}
|
||||
@ -76,8 +76,15 @@ proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} {
|
||||
dict for {slot stats_1} $slot_stats_1 {
|
||||
assert {[dict exists $slot_stats_2 $slot]}
|
||||
set stats_2 [dict get $slot_stats_2 $slot]
|
||||
foreach metric_name $metrics_to_assert {
|
||||
assert {[dict get $stats_1 $metric_name] == [dict get $stats_2 $metric_name]}
|
||||
|
||||
# For deterministic metrics, we assert their equality.
|
||||
foreach metric $deterministic_metrics {
|
||||
assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]}
|
||||
}
|
||||
# For non-deterministic metrics, we assert their non-zeroness as a best-effort.
|
||||
foreach metric $non_deterministic_metrics {
|
||||
assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \
|
||||
([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -98,17 +105,6 @@ proc assert_slot_visibility {slot_stats expected_slots} {
|
||||
assert_all_slots_have_been_seen $expected_slots
|
||||
}
|
||||
|
||||
proc assert_slot_stats_key_count {slot_stats expected_slots_key_count} {
|
||||
set slot_stats [convert_array_into_dict $slot_stats]
|
||||
dict for {slot stats} $slot_stats {
|
||||
if {[dict exists $expected_slots_key_count $slot]} {
|
||||
set key_count [dict get $stats key-count]
|
||||
set key_count_expected [dict get $expected_slots_key_count $slot]
|
||||
assert {$key_count == $key_count_expected}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc assert_slot_stats_monotonic_order {slot_stats orderby is_desc} {
|
||||
# For Tcl dict, the order of iteration is the order in which the keys were inserted into the dictionary
|
||||
# Thus, the response ordering is preserved upon calling 'convert_array_into_dict()'.
|
||||
@ -347,10 +343,349 @@ start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-en
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
|
||||
# Test cases for CLUSTER SLOT-STATS network-bytes-in.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "key"
|
||||
set key_slot [R 0 cluster keyslot $key]
|
||||
set metrics_to_assert [list network-bytes-in]
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." {
|
||||
# *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
|
||||
R 0 SET $key value
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 33
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." {
|
||||
set rd [valkey_deferring_client]
|
||||
# SET key value\r\n --> 15 bytes.
|
||||
$rd write "SET $key value\r\n"
|
||||
$rd flush
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 15
|
||||
]
|
||||
]
|
||||
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, blocking command." {
|
||||
set rd [valkey_deferring_client]
|
||||
# *3\r\n$5\r\nblpop\r\n$3\r\nkey\r\n$1\r\n0\r\n --> 31 bytes.
|
||||
$rd BLPOP $key 0
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
# Slot-stats must be empty here, as the client is yet to be unblocked.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
|
||||
# *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes.
|
||||
R 0 LPUSH $key value
|
||||
wait_for_blocked_clients_count 0
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 66 ;# 31 + 35 bytes.
|
||||
]
|
||||
]
|
||||
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, multi-exec transaction." {
|
||||
set r [valkey_client]
|
||||
# *1\r\n$5\r\nmulti\r\n --> 15 bytes.
|
||||
$r MULTI
|
||||
# *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
|
||||
assert {[$r SET $key value] eq {QUEUED}}
|
||||
# *1\r\n$4\r\nexec\r\n --> 14 bytes.
|
||||
assert {[$r EXEC] eq {OK}}
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 62 ;# 15 + 33 + 14 bytes.
|
||||
]
|
||||
]
|
||||
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, non slot specific command." {
|
||||
R 0 INFO
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, pub/sub." {
|
||||
# PUB/SUB does not get accumulated at per-slot basis,
|
||||
# as it is cluster-wide and is not slot specific.
|
||||
set rd [valkey_deferring_client]
|
||||
$rd subscribe channel
|
||||
R 0 publish channel message
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
}
|
||||
|
||||
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
set channel "channel"
|
||||
set key_slot [R 0 cluster keyslot $channel]
|
||||
set metrics_to_assert [list network-bytes-in]
|
||||
|
||||
# Setup replication.
|
||||
assert {[s -1 role] eq {slave}}
|
||||
wait_for_condition 1000 50 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Instance #1 master link status is not up"
|
||||
}
|
||||
R 1 readonly
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-in, sharded pub/sub." {
|
||||
set slot [R 0 cluster keyslot $channel]
|
||||
set primary [Rn 0]
|
||||
set replica [Rn 1]
|
||||
set replica_subcriber [valkey_deferring_client -1]
|
||||
$replica_subcriber SSUBSCRIBE $channel
|
||||
# *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes.
|
||||
$primary SPUBLISH $channel hello
|
||||
# *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
|
||||
|
||||
set slot_stats [$primary CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 42
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
|
||||
set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-in 34
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key_slot [R 0 cluster keyslot $key]
|
||||
set expected_slots_to_key_count [dict create $key_slot 1]
|
||||
set metrics_to_assert [list network-bytes-out]
|
||||
R 0 CONFIG SET cluster-slot-stats-enabled yes
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." {
|
||||
R 0 INFO
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." {
|
||||
R 0 SET $key value
|
||||
# +OK\r\n --> 5 bytes
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 5
|
||||
]
|
||||
]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." {
|
||||
set rd [valkey_deferring_client]
|
||||
$rd BLPOP $key 0
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
# Assert empty slot stats here, since COB is yet to be flushed due to the block.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats $slot_stats $metrics_to_assert
|
||||
|
||||
# Unblock the command.
|
||||
# LPUSH client) :1\r\n --> 4 bytes.
|
||||
# BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking.
|
||||
R 0 LPUSH $key value
|
||||
wait_for_blocked_clients_count 0
|
||||
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 28 ;# 4 + 24 bytes.
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 0 FLUSHALL
|
||||
}
|
||||
|
||||
start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key_slot [R 0 CLUSTER KEYSLOT $key]
|
||||
set metrics_to_assert [list network-bytes-out]
|
||||
R 0 CONFIG SET cluster-slot-stats-enabled yes
|
||||
|
||||
# Setup replication.
|
||||
assert {[s -1 role] eq {slave}}
|
||||
wait_for_condition 1000 50 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Instance #1 master link status is not up"
|
||||
}
|
||||
R 1 readonly
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." {
|
||||
assert_equal [R 0 SET $key VALUE] {OK}
|
||||
# Local client) +OK\r\n --> 5 bytes.
|
||||
# Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 38 ;# 5 + 33 bytes.
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
}
|
||||
|
||||
start_cluster 1 1 {tags {external:skip cluster}} {
|
||||
|
||||
# Define shared variables.
|
||||
set channel "channel"
|
||||
set key_slot [R 0 cluster keyslot $channel]
|
||||
set channel_secondary "channel2"
|
||||
set key_slot_secondary [R 0 cluster keyslot $channel_secondary]
|
||||
set metrics_to_assert [list network-bytes-out]
|
||||
R 0 CONFIG SET cluster-slot-stats-enabled yes
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." {
|
||||
set slot [R 0 cluster keyslot $channel]
|
||||
set publisher [Rn 0]
|
||||
set subscriber [valkey_client]
|
||||
set replica [valkey_deferring_client -1]
|
||||
|
||||
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
|
||||
$subscriber SSUBSCRIBE $channel
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 38
|
||||
]
|
||||
]
|
||||
R 0 CONFIG RESETSTAT
|
||||
|
||||
# Publisher client) :1\r\n --> 4 bytes.
|
||||
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
|
||||
assert_equal 1 [$publisher SPUBLISH $channel hello]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 46 ;# 4 + 42 bytes.
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
$subscriber QUIT
|
||||
R 0 FLUSHALL
|
||||
R 0 CONFIG RESETSTAT
|
||||
|
||||
test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." {
|
||||
set slot [R 0 cluster keyslot $channel]
|
||||
set publisher [Rn 0]
|
||||
set subscriber [valkey_client]
|
||||
set replica [valkey_deferring_client -1]
|
||||
|
||||
# Stack multi-slot subscriptions against a single client.
|
||||
# For primary channel;
|
||||
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
|
||||
# For secondary channel;
|
||||
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes
|
||||
$subscriber SSUBSCRIBE $channel
|
||||
$subscriber SSUBSCRIBE $channel_secondary
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create \
|
||||
$key_slot [ \
|
||||
dict create network-bytes-out 38
|
||||
] \
|
||||
$key_slot_secondary [ \
|
||||
dict create network-bytes-out 39
|
||||
]
|
||||
]
|
||||
R 0 CONFIG RESETSTAT
|
||||
|
||||
# For primary channel;
|
||||
# Publisher client) :1\r\n --> 4 bytes.
|
||||
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
|
||||
# For secondary channel;
|
||||
# Publisher client) :1\r\n --> 4 bytes.
|
||||
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes.
|
||||
assert_equal 1 [$publisher SPUBLISH $channel hello]
|
||||
assert_equal 1 [$publisher SPUBLISH $channel_secondary hello]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
set expected_slot_stats [
|
||||
dict create \
|
||||
$key_slot [ \
|
||||
dict create network-bytes-out 46 ;# 4 + 42 bytes.
|
||||
] \
|
||||
$key_slot_secondary [ \
|
||||
dict create network-bytes-out 47 ;# 4 + 43 bytes.
|
||||
]
|
||||
]
|
||||
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
|
||||
}
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
@ -435,7 +770,9 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
# Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
set metrics [list "key-count" "cpu-usec" "network-bytes-in" "network-bytes-out"]
|
||||
|
||||
# SET keys for target hashslots, to encourage ordering.
|
||||
set hash_tags [list 0 1 2 3 4]
|
||||
@ -456,32 +793,36 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" {
|
||||
set orderby "key-count"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC LIMIT -1}
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
|
||||
assert_slot_stats_monotonic_descent $slot_stats $orderby
|
||||
foreach orderby $metrics {
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
|
||||
assert_slot_stats_monotonic_descent $slot_stats $orderby
|
||||
}
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" {
|
||||
set orderby "key-count"
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
|
||||
assert_slot_stats_monotonic_ascent $slot_stats $orderby
|
||||
foreach orderby $metrics {
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
|
||||
assert_slot_stats_monotonic_ascent $slot_stats $orderby
|
||||
}
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" {
|
||||
R 0 FLUSHALL SYNC
|
||||
R 0 CONFIG RESETSTAT
|
||||
|
||||
set limit 5
|
||||
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC]
|
||||
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC]
|
||||
set slot_stats_desc_length [llength $slot_stats_desc]
|
||||
set slot_stats_asc_length [llength $slot_stats_asc]
|
||||
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}
|
||||
foreach orderby $metrics {
|
||||
set limit 5
|
||||
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
|
||||
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
|
||||
set slot_stats_desc_length [llength $slot_stats_desc]
|
||||
set slot_stats_asc_length [llength $slot_stats_asc]
|
||||
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}
|
||||
|
||||
# The key count of all slots is 0, so we will order by slot in ascending order.
|
||||
set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
|
||||
assert_slot_visibility $slot_stats_desc $expected_slots
|
||||
assert_slot_visibility $slot_stats_asc $expected_slots
|
||||
# All slot statistics have been reset to 0, so we will order by slot in ascending order.
|
||||
set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
|
||||
assert_slot_visibility $slot_stats_desc $expected_slots
|
||||
assert_slot_visibility $slot_stats_asc $expected_slots
|
||||
}
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" {
|
||||
@ -490,28 +831,39 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
R 0 CLUSTER FLUSHSLOTS
|
||||
R 0 CLUSTER ADDSLOTS 100 101
|
||||
|
||||
set num_assigned_slots 2
|
||||
set limit 5
|
||||
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit DESC]
|
||||
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY key-count LIMIT $limit ASC]
|
||||
set slot_stats_desc_length [llength $slot_stats_desc]
|
||||
set slot_stats_asc_length [llength $slot_stats_asc]
|
||||
set expected_response_length [expr min($num_assigned_slots, $limit)]
|
||||
assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}
|
||||
foreach orderby $metrics {
|
||||
set num_assigned_slots 2
|
||||
set limit 5
|
||||
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
|
||||
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
|
||||
set slot_stats_desc_length [llength $slot_stats_desc]
|
||||
set slot_stats_asc_length [llength $slot_stats_asc]
|
||||
set expected_response_length [expr min($num_assigned_slots, $limit)]
|
||||
assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}
|
||||
|
||||
set expected_slots [dict create 100 0 101 0]
|
||||
assert_slot_visibility $slot_stats_desc $expected_slots
|
||||
assert_slot_visibility $slot_stats_asc $expected_slots
|
||||
set expected_slots [dict create 100 0 101 0]
|
||||
assert_slot_visibility $slot_stats_desc $expected_slots
|
||||
assert_slot_visibility $slot_stats_asc $expected_slots
|
||||
}
|
||||
}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY unsupported sort metric." {
|
||||
set orderby "non-existent-metric"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
|
||||
test "CLUSTER SLOT-STATS ORDERBY arg sanity check." {
|
||||
# Non-existent argument.
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg}
|
||||
# Negative LIMIT.
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1}
|
||||
# Non-existent ORDERBY metric.
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric}
|
||||
# When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics.
|
||||
R 0 CONFIG SET cluster-slot-stats-enabled no
|
||||
set orderby "cpu-usec"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
set orderby "network-bytes-in"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
set orderby "network-bytes-out"
|
||||
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@ -521,14 +873,23 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
|
||||
|
||||
# Define shared variables.
|
||||
set key "FOO"
|
||||
set key "key"
|
||||
set key_slot [R 0 CLUSTER KEYSLOT $key]
|
||||
set primary [Rn 0]
|
||||
set replica [Rn 1]
|
||||
|
||||
# For replication, only those metrics that are deterministic upon replication are asserted.
|
||||
# * key-count is asserted, as both the primary and its replica must hold the same number of keys.
|
||||
# * cpu-usec is not asserted, as its micro-seconds command duration is not guaranteed to be exact
|
||||
# between the primary and its replica.
|
||||
set metrics_to_assert [list key-count]
|
||||
# For replication, assertions are split between deterministic and non-deterministic metrics.
|
||||
# * For deterministic metrics, strict equality assertions are made.
|
||||
# * For non-deterministic metrics, non-zeroness assertions are made.
|
||||
# Non-zeroness as in, both primary and replica should either have some value, or no value at all.
|
||||
#
|
||||
# * key-count is deterministic between primary and its replica.
|
||||
# * cpu-usec is non-deterministic between primary and its replica.
|
||||
# * network-bytes-in is deterministic between primary and its replica.
|
||||
# * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck().
|
||||
set deterministic_metrics [list key-count network-bytes-in]
|
||||
set non_deterministic_metrics [list cpu-usec]
|
||||
set empty_metrics [list network-bytes-out]
|
||||
|
||||
# Setup replication.
|
||||
assert {[s -1 role] eq {slave}}
|
||||
@ -539,39 +900,75 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
|
||||
}
|
||||
R 1 readonly
|
||||
|
||||
test "CLUSTER SLOT-STATS key-count replication for new keys" {
|
||||
test "CLUSTER SLOT-STATS metrics replication for new keys" {
|
||||
# *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
|
||||
R 0 SET $key VALUE
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create key-count 1 network-bytes-in 33
|
||||
]
|
||||
]
|
||||
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
|
||||
|
||||
set expected_slots_key_count [dict create $key_slot 1]
|
||||
assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count
|
||||
wait_for_replica_key_exists $key 1
|
||||
|
||||
wait_for_condition 500 10 {
|
||||
[string match {*calls=1,*} [cmdrstat set $replica]]
|
||||
} else {
|
||||
fail "Replica did not receive the command."
|
||||
}
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
|
||||
assert_empty_slot_stats $slot_stats_replica $empty_metrics
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 1 CONFIG RESETSTAT
|
||||
|
||||
test "CLUSTER SLOT-STATS key-count replication for existing keys" {
|
||||
test "CLUSTER SLOT-STATS metrics replication for existing keys" {
|
||||
# *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$13\r\nvalue_updated\r\n --> 42 bytes.
|
||||
R 0 SET $key VALUE_UPDATED
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create key-count 1 network-bytes-in 42
|
||||
]
|
||||
]
|
||||
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
|
||||
|
||||
set expected_slots_key_count [dict create $key_slot 1]
|
||||
assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count
|
||||
wait_for_replica_key_exists $key 1
|
||||
|
||||
wait_for_condition 500 10 {
|
||||
[string match {*calls=1,*} [cmdrstat set $replica]]
|
||||
} else {
|
||||
fail "Replica did not receive the command."
|
||||
}
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
|
||||
assert_empty_slot_stats $slot_stats_replica $empty_metrics
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 1 CONFIG RESETSTAT
|
||||
|
||||
test "CLUSTER SLOT-STATS key-count replication for deleting keys" {
|
||||
test "CLUSTER SLOT-STATS metrics replication for deleting keys" {
|
||||
# *2\r\n$3\r\ndel\r\n$3\r\nkey\r\n --> 22 bytes.
|
||||
R 0 DEL $key
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create key-count 0 network-bytes-in 22
|
||||
]
|
||||
]
|
||||
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
|
||||
|
||||
set expected_slots_key_count [dict create $key_slot 0]
|
||||
assert_slot_stats_key_count $slot_stats_master $expected_slots_key_count
|
||||
wait_for_replica_key_exists $key 0
|
||||
|
||||
wait_for_condition 500 10 {
|
||||
[string match {*calls=1,*} [cmdrstat del $replica]]
|
||||
} else {
|
||||
fail "Replica did not receive the command."
|
||||
}
|
||||
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
|
||||
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
|
||||
assert_empty_slot_stats $slot_stats_replica $empty_metrics
|
||||
}
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 1 CONFIG RESETSTAT
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user