From 70602db1662de9ba3fceb4fa9002f2fe243ca048 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Tue, 1 Jun 2021 20:54:48 +0000 Subject: [PATCH] working diag tool prototype Former-commit-id: fefbe96816f6a18ca6f8c8d3794502e6e610650f --- src/keydb-diagnostic-tool.cpp | 1073 +++------------------------------ 1 file changed, 98 insertions(+), 975 deletions(-) diff --git a/src/keydb-diagnostic-tool.cpp b/src/keydb-diagnostic-tool.cpp index 8dea6cdbf..c41995284 100644 --- a/src/keydb-diagnostic-tool.cpp +++ b/src/keydb-diagnostic-tool.cpp @@ -100,7 +100,7 @@ static struct config { char *auth; const char *user; int precision; - int num_threads; + int max_threads; struct benchmarkThread **threads; int cluster_mode; int cluster_node_count; @@ -140,6 +140,7 @@ typedef struct _client { int thread_id; struct clusterNode *cluster_node; int slots_last_update; + redisReply *lastReply; } *client; /* Threads. */ @@ -181,26 +182,11 @@ int g_fInCrash = false; /* Prototypes */ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); -static void createMissingClients(client c); static benchmarkThread *createBenchmarkThread(int index); static void freeBenchmarkThread(benchmarkThread *thread); static void freeBenchmarkThreads(); -static void *execBenchmarkThread(void *ptr); -static clusterNode *createClusterNode(char *ip, int port); -static redisConfig *getRedisConfig(const char *ip, int port, - const char *hostsocket); static redisContext *getRedisContext(const char *ip, int port, const char *hostsocket); -static void freeRedisConfig(redisConfig *cfg); -static int fetchClusterSlotsConfiguration(client c); -static void updateClusterSlotsConfiguration(); -int showThroughput(struct aeEventLoop *eventLoop, long long id, - void *clientData); - -/* Dict callbacks */ -static uint64_t dictSdsHash(const void *key); -static int dictSdsKeyCompare(void *privdata, const void *key1, - const void *key2); /* Implementation */ static long long ustime(void) { @@ -213,32 +199,6 @@ static long long ustime(void) { return ust; } -static long long mstime(void) { - struct timeval tv; - long long mst; - - gettimeofday(&tv, NULL); - mst = ((long long)tv.tv_sec)*1000; - mst += tv.tv_usec/1000; - return mst; -} - -static uint64_t dictSdsHash(const void *key) { - return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); -} - -static int dictSdsKeyCompare(void *privdata, const void *key1, - const void *key2) -{ - int l1,l2; - DICT_NOTUSED(privdata); - - l1 = sdslen((sds)key1); - l2 = sdslen((sds)key2); - if (l1 != l2) return 0; - return memcmp(key1, key2, l1) == 0; -} - /* _serverAssert is needed by dict */ extern "C" void _serverAssert(const char *estr, const char *file, int line) { fprintf(stderr, "=== ASSERTION FAILED ==="); @@ -292,58 +252,6 @@ cleanup: return NULL; } -static redisConfig *getRedisConfig(const char *ip, int port, - const char *hostsocket) -{ - redisConfig *cfg = (redisConfig*)zcalloc(sizeof(*cfg)); - if (!cfg) return NULL; - redisContext *c = NULL; - redisReply *reply = NULL, *sub_reply = NULL; - c = getRedisContext(ip, port, hostsocket); - if (c == NULL) { - freeRedisConfig(cfg); - return NULL; - } - redisAppendCommand(c, "CONFIG GET %s", "save"); - redisAppendCommand(c, "CONFIG GET %s", "appendonly"); - - void *r; - for (int i=0; i < 2; i++) { - int res = redisGetReply(c, &r); - if (reply) freeReplyObject(reply); - reply = res == REDIS_OK ? ((redisReply *) r) : NULL; - if (res != REDIS_OK || !r) goto fail; - if (reply->type == REDIS_REPLY_ERROR) { - fprintf(stderr, "ERROR: %s\n", reply->str); - goto fail; - } - if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail; - sub_reply = reply->element[1]; - const char *value = sub_reply->str; - if (!value) value = ""; - switch (i) { - case 0: cfg->save = sdsnew(value); break; - case 1: cfg->appendonly = sdsnew(value); break; - } - } - freeReplyObject(reply); - redisFree(c); - return cfg; -fail: - fprintf(stderr, "ERROR: failed to fetch CONFIG from "); - if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port); - else fprintf(stderr, "%s\n", hostsocket); - freeReplyObject(reply); - redisFree(c); - freeRedisConfig(cfg); - return NULL; -} -static void freeRedisConfig(redisConfig *cfg) { - if (cfg->save) sdsfree(cfg->save); - if (cfg->appendonly) sdsfree(cfg->appendonly); - zfree(cfg); -} - static void freeClient(client c) { aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); listNode *ln; @@ -361,12 +269,12 @@ static void freeClient(client c) { zfree(c->randptr); zfree(c->stagptr); zfree(c); - if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); + if (config.max_threads) pthread_mutex_lock(&(config.liveclients_mutex)); config.liveclients--; ln = listSearchKey(config.clients,c); assert(ln != NULL); listDelNode(config.clients,ln); - if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex)); + if (config.max_threads) pthread_mutex_unlock(&(config.liveclients_mutex)); } static void freeAllClients(void) { @@ -406,53 +314,6 @@ static void randomizeClientKey(client c) { } } -static void setClusterKeyHashTag(client c) { - assert(c->thread_id >= 0); - clusterNode *node = c->cluster_node; - assert(node); - assert(node->current_slot_index < node->slots_count); - int is_updating_slots = 0; - atomicGet(config.is_updating_slots, is_updating_slots); - /* If updateClusterSlotsConfiguration is updating the slots array, - * call updateClusterSlotsConfiguration is order to block the thread - * since the mutex is locked. When the slots will be updated by the - * thread that's actually performing the update, the execution of - * updateClusterSlotsConfiguration won't actually do anything, since - * the updated_slots_count array will be already NULL. */ - if (is_updating_slots) updateClusterSlotsConfiguration(); - int slot = node->slots[node->current_slot_index]; - const char *tag = crc16_slot_table[slot]; - int taglen = strlen(tag); - size_t i; - for (i = 0; i < c->staglen; i++) { - char *p = c->stagptr[i] + 1; - p[0] = tag[0]; - p[1] = (taglen >= 2 ? tag[1] : '}'); - p[2] = (taglen == 3 ? tag[2] : '}'); - } -} - -static void clientDone(client c) { - int requests_finished = 0; - atomicGet(config.requests_finished, requests_finished); - if (requests_finished >= config.requests) { - freeClient(c); - if (!config.num_threads && config.el) aeStop(config.el); - return; - } - if (config.keepalive) { - resetClient(c); - } else { - if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex)); - config.liveclients--; - createMissingClients(c); - config.liveclients++; - if (config.num_threads) - pthread_mutex_unlock(&(config.liveclients_mutex)); - freeClient(c); - } -} - static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { client c = (client)privdata; void *reply = NULL; @@ -497,29 +358,6 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } - /* Try to update slots configuration if reply error is - * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command - * contain(s) the slot hash tag. */ - if (is_err && c->cluster_node && c->staglen) { - int fetch_slots = 0, do_wait = 0; - if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) - fetch_slots = 1; - else if (!strncmp(r->str,"CLUSTERDOWN",11)) { - /* Usually the cluster is able to recover itself after - * a CLUSTERDOWN error, so try to sleep one second - * before requesting the new configuration. */ - fetch_slots = 1; - do_wait = 1; - printf("Error from server %s:%d: %s\n", - c->cluster_node->ip, - c->cluster_node->port, - r->str); - } - if (do_wait) sleep(1); - if (fetch_slots && !fetchClusterSlotsConfiguration(c)) - exit(1); - } - freeReplyObject(reply); /* This is an OK for prefix commands such as auth and select.*/ if (c->prefix_pending > 0) { @@ -543,7 +381,7 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { config.latency[requests_finished] = c->latency; c->pending--; if (c->pending == 0) { - clientDone(c); + resetClient(c); break; } } else { @@ -571,7 +409,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* Really initialize: randomize keys and set start time. */ if (config.randomkeys) randomizeClientKey(c); - if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c); atomicGet(config.slots_last_update, c->slots_last_update); c->start = ustime(); c->latency = -1; @@ -628,7 +465,7 @@ static client createClient(const char *cmd, size_t len, client from, int thread_ port = config.hostport; } else { int node_idx = 0; - if (config.num_threads < config.cluster_node_count) + if (config.max_threads < config.cluster_node_count) node_idx = config.liveclients % config.cluster_node_count; else node_idx = thread_id % config.cluster_node_count; @@ -783,149 +620,16 @@ static client createClient(const char *cmd, size_t len, client from, int thread_ return c; } -static void createMissingClients(client c) { - int n = 0; - while(config.liveclients < config.numclients) { - int thread_id = -1; - if (config.num_threads) - thread_id = config.liveclients % config.num_threads; - createClient(NULL,0,c,thread_id); - - /* Listen backlog is quite limited on most systems */ - if (++n > 64) { - usleep(50000); - n = 0; - } - } -} - -static int compareLatency(const void *a, const void *b) { - return (*(long long*)a)-(*(long long*)b); -} - -static int ipow(int base, int exp) { - int result = 1; - while (exp) { - if (exp & 1) result *= base; - exp /= 2; - base *= base; - } - return result; -} - -static void showLatencyReport(void) { - int i, curlat = 0; - int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision); - float perc, reqpersec; - - reqpersec = (float)config.requests_finished/((float)config.totlatency/1000); - if (!config.quiet && !config.csv) { - printf("====== %s ======\n", config.title); - printf(" %d requests completed in %.2f seconds\n", config.requests_finished, - (float)config.totlatency/1000); - printf(" %d parallel clients\n", config.numclients); - printf(" %d bytes payload\n", config.datasize); - printf(" keep alive: %d\n", config.keepalive); - if (config.cluster_mode) { - printf(" cluster mode: yes (%d masters)\n", - config.cluster_node_count); - int m ; - for (m = 0; m < config.cluster_node_count; m++) { - clusterNode *node = config.cluster_nodes[m]; - redisConfig *cfg = node->redis_config; - if (cfg == NULL) continue; - printf(" node [%d] configuration:\n",m ); - printf(" save: %s\n", - sdslen(cfg->save) ? cfg->save : "NONE"); - printf(" appendonly: %s\n", cfg->appendonly); - } - } else { - if (config.redis_config) { - printf(" host configuration \"save\": %s\n", - config.redis_config->save); - printf(" host configuration \"appendonly\": %s\n", - config.redis_config->appendonly); - } - } - printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no")); - if (config.num_threads) - printf(" threads: %d\n", config.num_threads); - - printf("\n"); - - qsort(config.latency,config.requests,sizeof(long long),compareLatency); - for (i = 0; i < config.requests; i++) { - if (config.latency[i]/usbetweenlat != curlat || - i == (config.requests-1)) - { - /* After the 2 milliseconds latency to have percentages split - * by decimals will just add a lot of noise to the output. */ - if (config.latency[i] >= 2000) { - config.precision = 0; - usbetweenlat = ipow(10, - MAX_LATENCY_PRECISION-config.precision); - } - - curlat = config.latency[i]/usbetweenlat; - perc = ((float)(i+1)*100)/config.requests; - printf("%.2f%% <= %.*f milliseconds\n", perc, config.precision, - curlat/pow(10.0, config.precision)); - } - } - printf("%.2f requests per second\n\n", reqpersec); - } else if (config.csv) { - printf("\"%s\",\"%.2f\"\n", config.title, reqpersec); - } else { - printf("%s: %.2f requests per second\n", config.title, reqpersec); - } -} - static void initBenchmarkThreads() { int i; if (config.threads) freeBenchmarkThreads(); - config.threads = (benchmarkThread**)zmalloc(config.num_threads * sizeof(benchmarkThread*), MALLOC_LOCAL); - for (i = 0; i < config.num_threads; i++) { + config.threads = (benchmarkThread**)zmalloc(config.max_threads * sizeof(benchmarkThread*), MALLOC_LOCAL); + for (i = 0; i < config.max_threads; i++) { benchmarkThread *thread = createBenchmarkThread(i); config.threads[i] = thread; } } -static void startBenchmarkThreads() { - int i; - for (i = 0; i < config.num_threads; i++) { - benchmarkThread *t = config.threads[i]; - if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ - fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); - exit(1); - } - } - for (i = 0; i < config.num_threads; i++) - pthread_join(config.threads[i]->thread, NULL); -} - -static void benchmark(const char *title, const char *cmd, int len) { - client c; - - config.title = title; - config.requests_issued = 0; - config.requests_finished = 0; - - if (config.num_threads) initBenchmarkThreads(); - - int thread_id = config.num_threads > 0 ? 0 : -1; - c = createClient(cmd,len,NULL,thread_id); - createMissingClients(c); - - config.start = mstime(); - if (!config.num_threads) aeMain(config.el); - else startBenchmarkThreads(); - config.totlatency = mstime()-config.start; - - showLatencyReport(); - freeAllClients(); - if (config.threads) freeBenchmarkThreads(); -} - /* Thread functions. */ static benchmarkThread *createBenchmarkThread(int index) { @@ -933,7 +637,6 @@ static benchmarkThread *createBenchmarkThread(int index) { if (thread == NULL) return NULL; thread->index = index; thread->el = aeCreateEventLoop(1024*10); - aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL); return thread; } @@ -944,7 +647,7 @@ static void freeBenchmarkThread(benchmarkThread *thread) { static void freeBenchmarkThreads() { int i = 0; - for (; i < config.num_threads; i++) { + for (; i < config.max_threads; i++) { benchmarkThread *thread = config.threads[i]; if (thread) freeBenchmarkThread(thread); } @@ -958,360 +661,40 @@ static void *execBenchmarkThread(void *ptr) { return NULL; } -/* Cluster helper functions. */ - -static clusterNode *createClusterNode(char *ip, int port) { - clusterNode *node = (clusterNode*)zmalloc(sizeof(*node), MALLOC_LOCAL); - if (!node) return NULL; - node->ip = ip; - node->port = port; - node->name = NULL; - node->flags = 0; - node->replicate = NULL; - node->replicas_count = 0; - node->slots = (int*)zmalloc(CLUSTER_SLOTS * sizeof(int), MALLOC_LOCAL); - node->slots_count = 0; - node->current_slot_index = 0; - node->updated_slots = NULL; - node->updated_slots_count = 0; - node->migrating = NULL; - node->importing = NULL; - node->migrating_count = 0; - node->importing_count = 0; - node->redis_config = NULL; - return node; -} - -static void freeClusterNode(clusterNode *node) { - int i; - if (node->name) sdsfree(node->name); - if (node->replicate) sdsfree(node->replicate); - if (node->migrating != NULL) { - for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]); - zfree(node->migrating); - } - if (node->importing != NULL) { - for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]); - zfree(node->importing); - } - /* If the node is not the reference node, that uses the address from - * config.hostip and config.hostport, then the node ip has been - * allocated by fetchClusterConfiguration, so it must be freed. */ - if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip); - if (node->redis_config != NULL) freeRedisConfig(node->redis_config); - zfree(node->slots); - zfree(node); -} - -static void freeClusterNodes() { - int i = 0; - for (; i < config.cluster_node_count; i++) { - clusterNode *n = config.cluster_nodes[i]; - if (n) freeClusterNode(n); - } - zfree(config.cluster_nodes); +void initConfigDefaults() { + config.numclients = 50; + config.requests = 100000; + config.liveclients = 0; + config.el = aeCreateEventLoop(1024*10); + config.keepalive = 1; + config.datasize = 3; + config.pipeline = 1; + config.showerrors = 0; + config.randomkeys = 0; + config.randomkeys_keyspacelen = 0; + config.quiet = 0; + config.csv = 0; + config.loop = 0; + config.idlemode = 0; + config.latency = NULL; + config.clients = listCreate(); + config.hostip = "127.0.0.1"; + config.hostport = 6379; + config.hostsocket = NULL; + config.tests = NULL; + config.dbnum = 0; + config.auth = NULL; + config.precision = 1; + config.max_threads = MAX_THREADS; + config.threads = NULL; + config.cluster_mode = 0; + config.cluster_node_count = 0; config.cluster_nodes = NULL; -} - -static clusterNode **addClusterNode(clusterNode *node) { - int count = config.cluster_node_count + 1; - config.cluster_nodes = (clusterNode**)zrealloc(config.cluster_nodes, - count * sizeof(*node), MALLOC_LOCAL); - if (!config.cluster_nodes) return NULL; - config.cluster_nodes[config.cluster_node_count++] = node; - return config.cluster_nodes; -} - -static int fetchClusterConfiguration() { - int success = 1; - redisContext *ctx = NULL; - redisReply *reply = NULL; - char *lines = NULL; - char *line = NULL; - char *p = NULL; - ctx = getRedisContext(config.hostip, config.hostport, config.hostsocket); - if (ctx == NULL) { - exit(1); - } - clusterNode *firstNode = createClusterNode((char *) config.hostip, - config.hostport); - if (!firstNode) {success = 0; goto cleanup;} - reply = (redisReply*)redisCommand(ctx, "CLUSTER NODES"); - success = (reply != NULL); - if (!success) goto cleanup; - success = (reply->type != REDIS_REPLY_ERROR); - if (!success) { - if (config.hostsocket == NULL) { - fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n", - config.hostip, config.hostport, reply->str); - } else { - fprintf(stderr, "Cluster node %s replied with error:\n%s\n", - config.hostsocket, reply->str); - } - goto cleanup; - } - lines = reply->str; - while ((p = strstr(lines, "\n")) != NULL) { - *p = '\0'; - line = lines; - lines = p + 1; - char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL; - int i = 0; - while ((p = strchr(line, ' ')) != NULL) { - *p = '\0'; - char *token = line; - line = p + 1; - switch(i++){ - case 0: name = token; break; - case 1: addr = token; break; - case 2: flags = token; break; - case 3: master_id = token; break; - } - if (i == 8) break; // Slots - } - if (!flags) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n"); - success = 0; - goto cleanup; - } - int myself = (strstr(flags, "myself") != NULL); - int is_replica = (strstr(flags, "slave") != NULL || - (master_id != NULL && master_id[0] != '-')); - if (is_replica) continue; - if (addr == NULL) { - fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n"); - success = 0; - goto cleanup; - } - clusterNode *node = NULL; - char *ip = NULL; - int port = 0; - char *paddr = strchr(addr, ':'); - if (paddr != NULL) { - *paddr = '\0'; - ip = addr; - addr = paddr + 1; - /* If internal bus is specified, then just drop it. */ - if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0'; - port = atoi(addr); - } - if (myself) { - node = firstNode; - if (node->ip == NULL && ip != NULL) { - node->ip = ip; - node->port = port; - } - } else { - node = createClusterNode(sdsnew(ip), port); - } - if (node == NULL) { - success = 0; - goto cleanup; - } - if (name != NULL) node->name = sdsnew(name); - if (i == 8) { - int remaining = strlen(line); - while (remaining > 0) { - p = strchr(line, ' '); - if (p == NULL) p = line + remaining; - remaining -= (p - line); - - char *slotsdef = line; - *p = '\0'; - if (remaining) { - line = p + 1; - remaining--; - } else line = p; - char *dash = NULL; - if (slotsdef[0] == '[') { - slotsdef++; - if ((p = strstr(slotsdef, "->-"))) { // Migrating - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds dst = sdsnew(p); - node->migrating_count += 2; - node->migrating = - (char**)zrealloc(node->migrating, - (node->migrating_count * sizeof(sds)), MALLOC_LOCAL); - node->migrating[node->migrating_count - 2] = - slot; - node->migrating[node->migrating_count - 1] = - dst; - } else if ((p = strstr(slotsdef, "-<-"))) {//Importing - *p = '\0'; - p += 3; - char *closing_bracket = strchr(p, ']'); - if (closing_bracket) *closing_bracket = '\0'; - sds slot = sdsnew(slotsdef); - sds src = sdsnew(p); - node->importing_count += 2; - node->importing = (char**)zrealloc(node->importing, - (node->importing_count * sizeof(sds)), MALLOC_LOCAL); - node->importing[node->importing_count - 2] = - slot; - node->importing[node->importing_count - 1] = - src; - } - } else if ((dash = strchr(slotsdef, '-')) != NULL) { - p = dash; - int start, stop; - *p = '\0'; - start = atoi(slotsdef); - stop = atoi(p + 1); - while (start <= stop) { - int slot = start++; - node->slots[node->slots_count++] = slot; - } - } else if (p > slotsdef) { - int slot = atoi(slotsdef); - node->slots[node->slots_count++] = slot; - } - } - } - if (node->slots_count == 0) { - printf("WARNING: master node %s:%d has no slots, skipping...\n", - node->ip, node->port); - continue; - } - if (!addClusterNode(node)) { - success = 0; - goto cleanup; - } - } -cleanup: - if (ctx) redisFree(ctx); - if (!success) { - if (config.cluster_nodes) freeClusterNodes(); - } - if (reply) freeReplyObject(reply); - return success; -} - -/* Request the current cluster slots configuration by calling CLUSTER SLOTS - * and atomically update the slots after a successful reply. */ -static int fetchClusterSlotsConfiguration(client c) { - UNUSED(c); - int success = 1, is_fetching_slots = 0, last_update = 0; - size_t i; - atomicGet(config.slots_last_update, last_update); - if (c->slots_last_update < last_update) { - c->slots_last_update = last_update; - return -1; - } - redisReply *reply = NULL; - atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); - if (is_fetching_slots) return -1; //TODO: use other codes || errno ? - atomicSet(config.is_fetching_slots, 1); - if (config.showerrors) - printf("Cluster slots configuration changed, fetching new one...\n"); - const char *errmsg = "Failed to update cluster slots configuration"; - static dictType dtype = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - NULL, /* key destructor */ - NULL /* val destructor */ - }; - /* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */ - dict *masters = dictCreate(&dtype, NULL); - redisContext *ctx = NULL; - for (i = 0; i < (size_t) config.cluster_node_count; i++) { - clusterNode *node = config.cluster_nodes[i]; - assert(node->ip != NULL); - assert(node->name != NULL); - assert(node->port); - /* Use first node as entry point to connect to. */ - if (ctx == NULL) { - ctx = getRedisContext(node->ip, node->port, NULL); - if (!ctx) { - success = 0; - goto cleanup; - } - } - if (node->updated_slots != NULL) - zfree(node->updated_slots); - node->updated_slots = NULL; - node->updated_slots_count = 0; - dictReplace(masters, node->name, node) ; - } - reply = (redisReply*)redisCommand(ctx, "CLUSTER SLOTS"); - if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { - success = 0; - if (reply) - fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str); - goto cleanup; - } - assert(reply->type == REDIS_REPLY_ARRAY); - for (i = 0; i < reply->elements; i++) { - redisReply *r = reply->element[i]; - assert(r->type == REDIS_REPLY_ARRAY); - assert(r->elements >= 3); - int from, to, slot; - from = r->element[0]->integer; - to = r->element[1]->integer; - redisReply *nr = r->element[2]; - assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3); - assert(nr->element[2]->str != NULL); - sds name = sdsnew(nr->element[2]->str); - dictEntry *entry = dictFind(masters, name); - if (entry == NULL) { - success = 0; - fprintf(stderr, "%s: could not find node with ID %s in current " - "configuration.\n", errmsg, name); - if (name) sdsfree(name); - goto cleanup; - } - sdsfree(name); - clusterNode *node = (clusterNode*)dictGetVal(entry); - if (node->updated_slots == NULL) - node->updated_slots = (int*)zcalloc(CLUSTER_SLOTS * sizeof(int), MALLOC_LOCAL); - for (slot = from; slot <= to; slot++) - node->updated_slots[node->updated_slots_count++] = slot; - } - updateClusterSlotsConfiguration(); -cleanup: - freeReplyObject(reply); - redisFree(ctx); - dictRelease(masters); - atomicSet(config.is_fetching_slots, 0); - return success; -} - -/* Atomically update the new slots configuration. */ -static void updateClusterSlotsConfiguration() { - pthread_mutex_lock(&config.is_updating_slots_mutex); - atomicSet(config.is_updating_slots, 1); - int i; - for (i = 0; i < config.cluster_node_count; i++) { - clusterNode *node = config.cluster_nodes[i]; - if (node->updated_slots != NULL) { - int *oldslots = node->slots; - node->slots = node->updated_slots; - node->slots_count = node->updated_slots_count; - node->current_slot_index = 0; - node->updated_slots = NULL; - node->updated_slots_count = 0; - zfree(oldslots); - } - } - atomicSet(config.is_updating_slots, 0); - atomicIncr(config.slots_last_update, 1); - pthread_mutex_unlock(&config.is_updating_slots_mutex); -} - -/* Generate random data for redis benchmark. See #7196. */ -static void genBenchmarkRandomData(char *data, int count) { - static uint32_t state = 1234; - int i = 0; - - while (count--) { - state = (state*1103515245+12345); - data[i++] = '0'+((state>>16)&63); - } + config.redis_config = NULL; + config.is_fetching_slots = 0; + config.is_updating_slots = 0; + config.slots_last_update = 0; + config.enable_tracking = 0; } /* Returns number of consumed options. */ @@ -1399,12 +782,12 @@ int parseOptions(int argc, const char **argv) { if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION; } else if (!strcmp(argv[i],"--threads")) { if (lastarg) goto invalid; - config.num_threads = atoi(argv[++i]); - if (config.num_threads > MAX_THREADS) { + config.max_threads = atoi(argv[++i]); + if (config.max_threads > MAX_THREADS) { printf("WARNING: too many threads, limiting threads to %d.\n", MAX_THREADS); - config.num_threads = MAX_THREADS; - } else if (config.num_threads < 0) config.num_threads = 0; + config.max_threads = MAX_THREADS; + } else if (config.max_threads < 0) config.max_threads = 0; } else if (!strcmp(argv[i],"--cluster")) { config.cluster_mode = 1; } else if (!strcmp(argv[i],"--enable-tracking")) { @@ -1478,98 +861,26 @@ usage: exit(exit_status); } -int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) { - UNUSED(eventLoop); - UNUSED(id); - UNUSED(clientData); - int liveclients = 0; - int requests_finished = 0; - atomicGet(config.liveclients, liveclients); - atomicGet(config.requests_finished, requests_finished); - - if (liveclients == 0 && requests_finished != config.requests) { - fprintf(stderr,"All clients disconnected... aborting.\n"); - exit(1); - } - if (config.num_threads && requests_finished >= config.requests) { - aeStop(eventLoop); - return AE_NOMORE; - } - if (config.csv) return 250; - if (config.idlemode == 1) { - printf("clients: %d\r", config.liveclients); - fflush(stdout); - return 250; - } - float dt = (float)(mstime()-config.start)/1000.0; - float rps = (float)requests_finished/dt; - printf("%s: %.2f\r", config.title, rps); - fflush(stdout); - return 250; /* every 250ms */ -} - -/* Return true if the named test was selected using the -t command line - * switch, or if all the tests are selected (no -t passed by user). */ -int test_is_selected(const char *name) { - char buf[256]; - int l = strlen(name); - - if (config.tests == NULL) return 1; - buf[0] = ','; - memcpy(buf+1,name,l); - buf[l+1] = ','; - buf[l+2] = '\0'; - return strstr(config.tests,buf) != NULL; +int extractPropertyFromInfo(const char *info, const char *key, double &val) { + char *line = strstr((char*)info, key); + if (line == nullptr) return 1; + line += strlen(key) + 1; // Skip past key name and following colon + char *newline = strchr(line, '\n'); + *newline = 0; // Terminate string after relevant line + val = strtod(line, nullptr); + return 0; } int main(int argc, const char **argv) { int i; - char *data, *cmd; - const char *tag; - int len; - - client c; - + storage_init(NULL, 0); srandom(time(NULL)); signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); - config.numclients = 50; - config.requests = 100000; - config.liveclients = 0; - config.el = aeCreateEventLoop(1024*10); - aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL); - config.keepalive = 1; - config.datasize = 3; - config.pipeline = 1; - config.showerrors = 0; - config.randomkeys = 0; - config.randomkeys_keyspacelen = 0; - config.quiet = 0; - config.csv = 0; - config.loop = 0; - config.idlemode = 0; - config.latency = NULL; - config.clients = listCreate(); - config.hostip = "127.0.0.1"; - config.hostport = 6379; - config.hostsocket = NULL; - config.tests = NULL; - config.dbnum = 0; - config.auth = NULL; - config.precision = 1; - config.num_threads = 0; - config.threads = NULL; - config.cluster_mode = 0; - config.cluster_node_count = 0; - config.cluster_nodes = NULL; - config.redis_config = NULL; - config.is_fetching_slots = 0; - config.is_updating_slots = 0; - config.slots_last_update = 0; - config.enable_tracking = 0; + initConfigDefaults(); i = parseOptions(argc,argv); argc -= i; @@ -1577,58 +888,7 @@ int main(int argc, const char **argv) { config.latency = (long long*)zmalloc(sizeof(long long)*config.requests, MALLOC_LOCAL); - tag = ""; - - if (config.cluster_mode) { - // We only include the slot placeholder {tag} if cluster mode is enabled - tag = ":{tag}"; - - /* Fetch cluster configuration. */ - if (!fetchClusterConfiguration() || !config.cluster_nodes) { - if (!config.hostsocket) { - fprintf(stderr, "Failed to fetch cluster configuration from " - "%s:%d\n", config.hostip, config.hostport); - } else { - fprintf(stderr, "Failed to fetch cluster configuration from " - "%s\n", config.hostsocket); - } - exit(1); - } - if (config.cluster_node_count <= 1) { - fprintf(stderr, "Invalid cluster: %d node(s).\n", - config.cluster_node_count); - exit(1); - } - printf("Cluster has %d master nodes:\n\n", config.cluster_node_count); - int i = 0; - for (; i < config.cluster_node_count; i++) { - clusterNode *node = config.cluster_nodes[i]; - if (!node) { - fprintf(stderr, "Invalid cluster node #%d\n", i); - exit(1); - } - printf("Master %d: ", i); - if (node->name) printf("%s ", node->name); - printf("%s:%d\n", node->ip, node->port); - node->redis_config = getRedisConfig(node->ip, node->port, NULL); - if (node->redis_config == NULL) { - fprintf(stderr, "WARN: could not fetch node CONFIG %s:%d\n", - node->ip, node->port); - } - } - printf("\n"); - /* Automatically set thread number to node count if not specified - * by the user. */ - if (config.num_threads == 0) - config.num_threads = config.cluster_node_count; - } else { - config.redis_config = - getRedisConfig(config.hostip, config.hostport, config.hostsocket); - if (config.redis_config == NULL) - fprintf(stderr, "WARN: could not fetch server CONFIG\n"); - } - - if (config.num_threads > 0) { + if (config.max_threads > 0) { int err = 0; err |= pthread_mutex_init(&(config.requests_issued_mutex), NULL); err |= pthread_mutex_init(&(config.requests_finished_mutex), NULL); @@ -1644,187 +904,50 @@ int main(int argc, const char **argv) { } } - if (config.keepalive == 0) { - printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n"); + const char *set_value = "abcdefghijklmnopqrstuvwxyz"; + int threads_used = 0; + char command[63]; + + initBenchmarkThreads(); + redisContext *ctx = getRedisContext(config.hostip, config.hostport, config.hostsocket); + double cpu_usage; + + while (threads_used < config.max_threads) { + printf("Creating clients for thread %d...\n", threads_used); + for (int i = 0; i < config.numclients; i++) { + sprintf(command, "SET %d %s\r\n", threads_used * config.numclients + i, set_value); + createClient(command, strlen(command), NULL,threads_used); + } + + printf("Starting thread %d\n", threads_used); + + benchmarkThread *t = config.threads[threads_used]; + if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ + fprintf(stderr, "FATAL: Failed to start thread %d.\n", threads_used); + exit(1); + } + threads_used++; + + sleep(1); + + redisReply *reply = (redisReply*)redisCommand(ctx, "INFO"); + if (reply->type != REDIS_REPLY_STRING) { + freeReplyObject(reply); + printf("Error executing INFO command. Exiting.\r\n"); + break; + } + if (extractPropertyFromInfo(reply->str, "used_cpu_sys", cpu_usage)) { + printf("Error reading CPU usage from INFO command. Exiting.\r\n"); + break; + } + printf("CPU Usage: %f\r\n", cpu_usage); + freeReplyObject(reply); } - if (config.idlemode) { - printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - int thread_id = -1, use_threads = (config.num_threads > 0); - if (use_threads) { - thread_id = 0; - initBenchmarkThreads(); - } - c = createClient("",0,NULL,thread_id); /* will never receive a reply */ - createMissingClients(c); - if (use_threads) startBenchmarkThreads(); - else aeMain(config.el); - /* and will wait for every */ - } + printf("Done.\n"); - /* Run benchmark with command in the remainder of the arguments. */ - if (argc) { - sds title = sdsnew(argv[0]); - for (i = 1; i < argc; i++) { - title = sdscatlen(title, " ", 1); - title = sdscatlen(title, (char*)argv[i], strlen(argv[i])); - } - - do { - len = redisFormatCommandArgv(&cmd,argc,argv,NULL); - benchmark(title,cmd,len); - free(cmd); - } while(config.loop); - - if (config.redis_config != NULL) freeRedisConfig(config.redis_config); - return 0; - } - - /* Run default benchmark suite. */ - data = (char*)zmalloc(config.datasize+1, MALLOC_LOCAL); - do { - genBenchmarkRandomData(data, config.datasize); - data[config.datasize] = '\0'; - - if (test_is_selected("ping_inline") || test_is_selected("ping")) - benchmark("PING_INLINE","PING\r\n",6); - - if (test_is_selected("ping_mbulk") || test_is_selected("ping")) { - len = redisFormatCommand(&cmd,"PING"); - benchmark("PING_BULK",cmd,len); - free(cmd); - } - - if (test_is_selected("set")) { - len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data); - benchmark("SET",cmd,len); - free(cmd); - } - - if (test_is_selected("get")) { - len = redisFormatCommand(&cmd,"GET key%s:__rand_int__",tag); - benchmark("GET",cmd,len); - free(cmd); - } - - if (test_is_selected("incr")) { - len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag); - benchmark("INCR",cmd,len); - free(cmd); - } - - if (test_is_selected("lpush")) { - len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data); - benchmark("LPUSH",cmd,len); - free(cmd); - } - - if (test_is_selected("rpush")) { - len = redisFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data); - benchmark("RPUSH",cmd,len); - free(cmd); - } - - if (test_is_selected("lpop")) { - len = redisFormatCommand(&cmd,"LPOP mylist%s",tag); - benchmark("LPOP",cmd,len); - free(cmd); - } - - if (test_is_selected("rpop")) { - len = redisFormatCommand(&cmd,"RPOP mylist%s",tag); - benchmark("RPOP",cmd,len); - free(cmd); - } - - if (test_is_selected("sadd")) { - len = redisFormatCommand(&cmd, - "SADD myset%s element:__rand_int__",tag); - benchmark("SADD",cmd,len); - free(cmd); - } - - if (test_is_selected("hset")) { - len = redisFormatCommand(&cmd, - "HSET myhash%s element:__rand_int__ %s",tag,data); - benchmark("HSET",cmd,len); - free(cmd); - } - - if (test_is_selected("spop")) { - len = redisFormatCommand(&cmd,"SPOP myset%s",tag); - benchmark("SPOP",cmd,len); - free(cmd); - } - - if (test_is_selected("zadd")) { - const char *score = "0"; - if (config.randomkeys) score = "__rand_int__"; - len = redisFormatCommand(&cmd, - "ZADD myzset%s %s element:__rand_int__",tag,score); - benchmark("ZADD",cmd,len); - free(cmd); - } - - if (test_is_selected("zpopmin")) { - len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s",tag); - benchmark("ZPOPMIN",cmd,len); - free(cmd); - } - - if (test_is_selected("lrange") || - test_is_selected("lrange_100") || - test_is_selected("lrange_300") || - test_is_selected("lrange_500") || - test_is_selected("lrange_600")) - { - len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data); - benchmark("LPUSH (needed to benchmark LRANGE)",cmd,len); - free(cmd); - } - - if (test_is_selected("lrange") || test_is_selected("lrange_100")) { - len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag); - benchmark("LRANGE_100 (first 100 elements)",cmd,len); - free(cmd); - } - - if (test_is_selected("lrange") || test_is_selected("lrange_300")) { - len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag); - benchmark("LRANGE_300 (first 300 elements)",cmd,len); - free(cmd); - } - - if (test_is_selected("lrange") || test_is_selected("lrange_500")) { - len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag); - benchmark("LRANGE_500 (first 450 elements)",cmd,len); - free(cmd); - } - - if (test_is_selected("lrange") || test_is_selected("lrange_600")) { - len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag); - benchmark("LRANGE_600 (first 600 elements)",cmd,len); - free(cmd); - } - - if (test_is_selected("mset")) { - const char *cmd_argv[21]; - cmd_argv[0] = "MSET"; - sds key_placeholder = sdscatprintf(sdsnew(""),"key%s:__rand_int__",tag); - for (i = 1; i < 21; i += 2) { - cmd_argv[i] = key_placeholder; - cmd_argv[i+1] = data; - } - len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL); - benchmark("MSET (10 keys)",cmd,len); - free(cmd); - sdsfree(key_placeholder); - } - - if (!config.csv) printf("\n"); - } while(config.loop); - - if (config.redis_config != NULL) freeRedisConfig(config.redis_config); + freeAllClients(); + freeBenchmarkThreads(); return 0; }