Support for reading from replicas in valkey-benchmark (#1392)

**Background**
When conducting performance tests using `valkey-benchmark`, reading from
replicas was not supported. Consequently, even in cluster mode, all
reads were directed to the primary nodes. This limitation made it
challenging to obtain accurate metrics during workload stress testing
for performance measurement or before a version upgrade.

Related issue : https://github.com/valkey-io/valkey/issues/900

**Changes**
1. Replaced the use of `CLUSTER NODES` with `CLUSTER SLOTS` when
fetching cluster configuration. This allows for easier identification of
replica slots.
2. Support for reading from replicas by executing the client in
`READONLY` mode.
3. Support reading from replicas even during slot migrations.
4. Introduced two CLI options `--rfr` to enable reading from replicas
only or all cluster nodes. A warning added to indicate that write
requests might not be handled correctly when using this option.

---------

Signed-off-by: bluayer <ijacsong98@gmail.com>
Signed-off-by: bluayer <bluayer@gmail.com>
Signed-off-by: Jungwoo Song <37579681+bluayer@users.noreply.github.com>
Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
This commit is contained in:
Jungwoo Song 2024-12-20 01:32:31 +09:00 committed by GitHub
parent 97029953a0
commit e9a1fe0b32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -77,6 +77,13 @@ struct benchmarkThread;
struct clusterNode;
struct serverConfig;
/* Read from replica options */
typedef enum readFromReplica {
FROM_PRIMARY_ONLY = 0, /* default option */
FROM_REPLICA_ONLY,
FROM_ALL
} readFromReplica;
static struct config {
aeEventLoop *el;
cliConnInfo conn_info;
@ -112,6 +119,7 @@ static struct config {
int num_threads;
struct benchmarkThread **threads;
int cluster_mode;
readFromReplica read_from_replica;
int cluster_node_count;
struct clusterNode **cluster_nodes;
struct serverConfig *redis_config;
@ -168,12 +176,6 @@ typedef struct clusterNode {
int *updated_slots; /* Used by updateClusterSlotsConfiguration */
int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
sds *importing; /* An array of sds where even strings are slots and odd
* strings are the source node IDs. */
int migrating_count; /* Length of the migrating array (migrating slots*2) */
int importing_count; /* Length of the importing array (importing slots*2) */
struct serverConfig *redis_config;
} clusterNode;
@ -228,6 +230,15 @@ static int dictSdsKeyCompare(const void *key1, const void *key2) {
return memcmp(key1, key2, l1) == 0;
}
static dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};
static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket) {
redisContext *ctx = NULL;
redisReply *reply = NULL;
@ -710,6 +721,15 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
c->prefix_pending++;
}
if (config.cluster_mode && (config.read_from_replica == FROM_REPLICA_ONLY || config.read_from_replica == FROM_ALL)) {
char *buf = NULL;
int len;
len = redisFormatCommand(&buf, "READONLY");
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
c->prefix_pending++;
}
c->prefixlen = sdslen(c->obuf);
/* Append the request itself. */
if (from) {
@ -835,7 +855,15 @@ static void showLatencyReport(void) {
printf(" %d bytes payload\n", config.datasize);
printf(" keep alive: %d\n", config.keepalive);
if (config.cluster_mode) {
printf(" cluster mode: yes (%d primaries)\n", config.cluster_node_count);
const char *node_roles = NULL;
if (config.read_from_replica == FROM_ALL) {
node_roles = "cluster";
} else if (config.read_from_replica == FROM_REPLICA_ONLY) {
node_roles = "replica";
} else {
node_roles = "primary";
}
printf(" cluster mode: yes (%d %s)\n", config.cluster_node_count, node_roles);
int m;
for (m = 0; m < config.cluster_node_count; m++) {
clusterNode *node = config.cluster_nodes[m];
@ -1009,26 +1037,13 @@ static clusterNode *createClusterNode(char *ip, int port) {
node->slots_count = 0;
node->updated_slots = NULL;
node->updated_slots_count = 0;
node->migrating = NULL;
node->importing = NULL;
node->migrating_count = 0;
node->importing_count = 0;
node->redis_config = NULL;
return node;
}
static void freeClusterNode(clusterNode *node) {
int i;
if (node->name) sdsfree(node->name);
if (node->replicate) sdsfree(node->replicate);
if (node->migrating != NULL) {
for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
zfree(node->migrating);
}
if (node->importing != NULL) {
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
zfree(node->importing);
}
/* If the node is not the reference node, that uses the address from
* config.conn_info.hostip and config.conn_info.hostport, then the node ip has been
* allocated by fetchClusterConfiguration, so it must be freed. */
@ -1056,157 +1071,85 @@ static clusterNode **addClusterNode(clusterNode *node) {
return config.cluster_nodes;
}
/* TODO: This should be refactored to use CLUSTER SLOTS, the migrating/importing
* information is anyway not used.
*/
static int fetchClusterConfiguration(void) {
int success = 1;
redisContext *ctx = NULL;
redisReply *reply = NULL;
dict *nodes = NULL;
const char *errmsg = "Failed to fetch cluster configuration";
size_t i, j;
ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
if (ctx == NULL) {
exit(1);
}
clusterNode *firstNode = createClusterNode((char *)config.conn_info.hostip, config.conn_info.hostport);
if (!firstNode) {
success = 0;
goto cleanup;
}
reply = redisCommand(ctx, "CLUSTER NODES");
success = (reply != NULL);
if (!success) goto cleanup;
success = (reply->type != REDIS_REPLY_ERROR);
if (!success) {
if (config.hostsocket == NULL) {
fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n", config.conn_info.hostip,
config.conn_info.hostport, reply->str);
} else {
fprintf(stderr, "Cluster node %s replied with error:\n%s\n", config.hostsocket, reply->str);
}
goto cleanup;
}
char *lines = reply->str, *p, *line;
while ((p = strstr(lines, "\n")) != NULL) {
*p = '\0';
line = lines;
lines = p + 1;
char *name = NULL, *addr = NULL, *flags = NULL, *primary_id = NULL;
int i = 0;
while ((p = strchr(line, ' ')) != NULL) {
*p = '\0';
char *token = line;
line = p + 1;
switch (i++) {
case 0: name = token; break;
case 1: addr = token; break;
case 2: flags = token; break;
case 3: primary_id = token; break;
}
if (i == 8) break; // Slots
}
if (!flags) {
fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n");
success = 0;
goto cleanup;
}
int myself = (strstr(flags, "myself") != NULL);
int is_replica = (strstr(flags, "slave") != NULL || (primary_id != NULL && primary_id[0] != '-'));
if (is_replica) continue;
if (addr == NULL) {
fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
success = 0;
goto cleanup;
}
clusterNode *node = NULL;
char *ip = NULL;
int port = 0;
char *paddr = strrchr(addr, ':');
if (paddr != NULL) {
*paddr = '\0';
ip = addr;
addr = paddr + 1;
/* If internal bus is specified, then just drop it. */
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
port = atoi(addr);
}
if (myself) {
node = firstNode;
if (ip != NULL && strcmp(node->ip, ip) != 0) {
node->ip = sdsnew(ip);
node->port = port;
}
} else {
node = createClusterNode(sdsnew(ip), port);
}
if (node == NULL) {
success = 0;
goto cleanup;
}
if (name != NULL) node->name = sdsnew(name);
if (i == 8) {
int remaining = strlen(line);
while (remaining > 0) {
p = strchr(line, ' ');
if (p == NULL) p = line + remaining;
remaining -= (p - line);
char *slotsdef = line;
*p = '\0';
if (remaining) {
line = p + 1;
remaining--;
} else
line = p;
char *dash = NULL;
if (slotsdef[0] == '[') {
slotsdef++;
if ((p = strstr(slotsdef, "->-"))) { // Migrating
*p = '\0';
p += 3;
char *closing_bracket = strchr(p, ']');
if (closing_bracket) *closing_bracket = '\0';
sds slot = sdsnew(slotsdef);
sds dst = sdsnew(p);
node->migrating_count += 2;
node->migrating = zrealloc(node->migrating, (node->migrating_count * sizeof(sds)));
node->migrating[node->migrating_count - 2] = slot;
node->migrating[node->migrating_count - 1] = dst;
} else if ((p = strstr(slotsdef, "-<-"))) { // Importing
*p = '\0';
p += 3;
char *closing_bracket = strchr(p, ']');
if (closing_bracket) *closing_bracket = '\0';
sds slot = sdsnew(slotsdef);
sds src = sdsnew(p);
node->importing_count += 2;
node->importing = zrealloc(node->importing, (node->importing_count * sizeof(sds)));
node->importing[node->importing_count - 2] = slot;
node->importing[node->importing_count - 1] = src;
}
} else if ((dash = strchr(slotsdef, '-')) != NULL) {
p = dash;
int start, stop;
*p = '\0';
start = atoi(slotsdef);
stop = atoi(p + 1);
while (start <= stop) {
int slot = start++;
node->slots[node->slots_count++] = slot;
}
} else if (p > slotsdef) {
int slot = atoi(slotsdef);
reply = redisCommand(ctx, "CLUSTER SLOTS");
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
success = 0;
if (reply) fprintf(stderr, "%s\nCLUSTER SLOTS ERROR: %s\n", errmsg, reply->str);
goto cleanup;
}
assert(reply->type == REDIS_REPLY_ARRAY);
nodes = dictCreate(&dtype);
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY);
assert(r->elements >= 3);
int from = r->element[0]->integer;
int to = r->element[1]->integer;
sds primary = NULL;
for (j = 2; j < r->elements; j++) {
redisReply *nr = r->element[j];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
assert(nr->element[0]->str != NULL);
assert(nr->element[2]->str != NULL);
int is_primary = (j == 2);
if (is_primary) primary = sdsnew(nr->element[2]->str);
int is_cluster_option_only = (config.read_from_replica == FROM_PRIMARY_ONLY);
if ((config.read_from_replica == FROM_REPLICA_ONLY && is_primary) || (is_cluster_option_only && !is_primary)) continue;
sds ip = sdsnew(nr->element[0]->str);
sds name = sdsnew(nr->element[2]->str);
int port = nr->element[1]->integer;
int slot_start = from;
int slot_end = to;
clusterNode *node = NULL;
dictEntry *entry = dictFind(nodes, name);
if (entry == NULL) {
node = createClusterNode(sdsnew(ip), port);
if (node == NULL) {
success = 0;
goto cleanup;
} else {
node->name = name;
if (!is_primary) node->replicate = sdsdup(primary);
}
} else {
node = dictGetVal(entry);
}
if (slot_start == slot_end) {
node->slots[node->slots_count++] = slot_start;
} else {
while (slot_start <= slot_end) {
int slot = slot_start++;
node->slots[node->slots_count++] = slot;
}
}
if (node->slots_count == 0) {
fprintf(stderr, "WARNING: Node %s:%d has no slots, skipping...\n", node->ip, node->port);
continue;
}
if (entry == NULL) {
dictReplace(nodes, node->name, node);
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
}
}
}
if (node->slots_count == 0) {
fprintf(stderr, "WARNING: Primary node %s:%d has no slots, skipping...\n", node->ip, node->port);
continue;
}
if (!addClusterNode(node)) {
success = 0;
goto cleanup;
}
sdsfree(primary);
}
cleanup:
if (ctx) redisFree(ctx);
@ -1214,6 +1157,7 @@ cleanup:
if (config.cluster_nodes) freeClusterNodes();
}
if (reply) freeReplyObject(reply);
if (nodes) dictRelease(nodes);
return success;
}
@ -1222,7 +1166,7 @@ cleanup:
static int fetchClusterSlotsConfiguration(client c) {
UNUSED(c);
int success = 1, is_fetching_slots = 0, last_update = 0;
size_t i;
size_t i, j;
last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed);
if (c->slots_last_update < last_update) {
@ -1236,16 +1180,9 @@ static int fetchClusterSlotsConfiguration(client c) {
atomic_store_explicit(&config.is_fetching_slots, 1, memory_order_relaxed);
fprintf(stderr, "WARNING: Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
dictSdsHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};
/* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
dict *primaries = dictCreate(&dtype);
dict *nodes = dictCreate(&dtype);
redisContext *ctx = NULL;
for (i = 0; i < (size_t)config.cluster_node_count; i++) {
clusterNode *node = config.cluster_nodes[i];
@ -1263,7 +1200,7 @@ static int fetchClusterSlotsConfiguration(client c) {
if (node->updated_slots != NULL) zfree(node->updated_slots);
node->updated_slots = NULL;
node->updated_slots_count = 0;
dictReplace(primaries, node->name, node);
dictReplace(nodes, node->name, node);
}
reply = redisCommand(ctx, "CLUSTER SLOTS");
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
@ -1279,30 +1216,44 @@ static int fetchClusterSlotsConfiguration(client c) {
int from, to, slot;
from = r->element[0]->integer;
to = r->element[1]->integer;
redisReply *nr = r->element[2];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
assert(nr->element[2]->str != NULL);
sds name = sdsnew(nr->element[2]->str);
dictEntry *entry = dictFind(primaries, name);
if (entry == NULL) {
success = 0;
fprintf(stderr,
"%s: could not find node with ID %s in current "
"configuration.\n",
errmsg, name);
if (name) sdsfree(name);
goto cleanup;
size_t start, end;
if (config.read_from_replica == FROM_ALL) {
start = 2;
end = r->elements;
} else if (config.read_from_replica == FROM_REPLICA_ONLY) {
start = 3;
end = r->elements;
} else {
start = 2;
end = 3;
}
for (j = start; j < end; j++) {
redisReply *nr = r->element[j];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
assert(nr->element[2]->str != NULL);
sds name = sdsnew(nr->element[2]->str);
dictEntry *entry = dictFind(nodes, name);
if (entry == NULL) {
success = 0;
fprintf(stderr,
"%s: could not find node with ID %s in current "
"configuration.\n",
errmsg, name);
if (name) sdsfree(name);
goto cleanup;
}
sdsfree(name);
clusterNode *node = dictGetVal(entry);
if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot;
}
sdsfree(name);
clusterNode *node = dictGetVal(entry);
if (node->updated_slots == NULL) node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
for (slot = from; slot <= to; slot++) node->updated_slots[node->updated_slots_count++] = slot;
}
updateClusterSlotsConfiguration();
cleanup:
freeReplyObject(reply);
redisFree(ctx);
dictRelease(primaries);
dictRelease(nodes);
atomic_store_explicit(&config.is_fetching_slots, 0, memory_order_relaxed);
return success;
}
@ -1460,6 +1411,19 @@ int parseOptions(int argc, char **argv) {
config.num_threads = 0;
} else if (!strcmp(argv[i], "--cluster")) {
config.cluster_mode = 1;
} else if (!strcmp(argv[i], "--rfr")) {
if (argv[++i]) {
if (!strcmp(argv[i], "all")) {
config.read_from_replica = FROM_ALL;
} else if (!strcmp(argv[i], "yes")) {
config.read_from_replica = FROM_REPLICA_ONLY;
} else if (!strcmp(argv[i], "no")) {
config.read_from_replica = FROM_PRIMARY_ONLY;
} else {
goto invalid;
}
} else
goto invalid;
} else if (!strcmp(argv[i], "--enable-tracking")) {
config.enable_tracking = 1;
} else if (!strcmp(argv[i], "--help")) {
@ -1557,6 +1521,14 @@ usage:
" If the command is supplied on the command line in cluster\n"
" mode, the key must contain \"{tag}\". Otherwise, the\n"
" command will not be sent to the right cluster node.\n"
" --rfr <mode> Enable read from replicas in cluster mode.\n"
" This command must be used with the --cluster option.\n"
" There are three modes for reading from replicas:\n"
" 'no' - sends read requests to primaries only (default) \n"
" 'yes' - sends read requests to replicas only.\n"
" 'all' - sends read requests to all nodes.\n"
" Since write commands will not be accepted by replicas,\n"
" it is recommended to enable read from replicas only for read command tests.\n"
" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD,\n"
@ -1698,6 +1670,7 @@ int main(int argc, char **argv) {
config.num_threads = 0;
config.threads = NULL;
config.cluster_mode = 0;
config.read_from_replica = FROM_PRIMARY_ONLY;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
config.redis_config = NULL;
@ -1742,7 +1715,15 @@ int main(int argc, char **argv) {
fprintf(stderr, "Invalid cluster: %d node(s).\n", config.cluster_node_count);
exit(1);
}
printf("Cluster has %d primary nodes:\n\n", config.cluster_node_count);
const char *node_roles = NULL;
if (config.read_from_replica == FROM_ALL) {
node_roles = "cluster";
} else if (config.read_from_replica == FROM_REPLICA_ONLY) {
node_roles = "replica";
} else {
node_roles = "primary";
}
printf("Cluster has %d %s nodes:\n\n", config.cluster_node_count, node_roles);
int i = 0;
for (; i < config.cluster_node_count; i++) {
clusterNode *node = config.cluster_nodes[i];
@ -1750,7 +1731,8 @@ int main(int argc, char **argv) {
fprintf(stderr, "Invalid cluster node #%d\n", i);
exit(1);
}
printf("Primary %d: ", i);
const char *node_type = (node->replicate == NULL ? "Primary" : "Replica");
printf("Node %d(%s): ", i, node_type);
if (node->name) printf("%s ", node->name);
printf("%s:%d\n", node->ip, node->port);
node->redis_config = getServerConfig(node->ip, node->port, NULL);