diff --git a/.gitignore b/.gitignore index 6c2d3a16e..1ec8b6100 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ redis-check-rdb keydb-check-rdb redis-check-dump keydb-check-dump +keydb-diagnostic-tool redis-cli redis-sentinel redis-server diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 8a1405e41..4f6cb9978 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,40 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.2.3 Released Mon May 3 19:00:00 IST 2021 +================================================================================ + +Upgrade urgency: SECURITY, Contains fixes to security issues that affect +authenticated client connections. LOW otherwise. + +Integer overflow in STRALGO LCS command (CVE-2021-29477): +An integer overflow bug in Redis version 6.0 or newer could be exploited using +the STRALGO LCS command to corrupt the heap and potentially result in remote +code execution. The integer overflow bug exists in all versions of Redis +starting with 6.0. + +Integer overflow in COPY command for large intsets (CVE-2021-29478): +An integer overflow bug in Redis 6.2 could be exploited to corrupt the heap and +potentially result with remote code execution. The vulnerability involves +changing the default set-max-intset-entries configuration value, creating a +large set key that consists of integer values and using the COPY command to +duplicate it. The integer overflow bug exists in all versions of Redis starting +with 2.6, where it could result with a corrupted RDB or DUMP payload, but not +exploited through COPY (which did not exist before 6.2). + +Bug fixes that are only applicable to previous releases of Redis 6.2: +* Fix memory leak in moduleDefragGlobals (#8853) +* Fix memory leak when doing lazy freeing client tracking table (#8822) +* Block abusive replicas from sending command that could assert and crash redis (#8868) + +Other bug fixes: +* Use a monotonic clock to check for Lua script timeout (#8812) +* redis-cli: Do not use unix socket when we got redirected in cluster mode (#8870) + +Modules: +* Fix RM_GetClusterNodeInfo() to correctly populate master id (#8846) + ================================================================================ Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021 ================================================================================ diff --git a/TLS.md b/TLS.md index 6373ff871..9d083bb3a 100644 --- a/TLS.md +++ b/TLS.md @@ -28,8 +28,8 @@ To manually run a Redis server with TLS mode (assuming `gen-test-certs.sh` was invoked so sample certificates/keys are available): ./src/keydb-server --tls-port 6379 --port 0 \ - --tls-cert-file ./tests/tls/keydb.crt \ - --tls-key-file ./tests/tls/keydb.key \ + --tls-cert-file ./tests/tls/client.crt \ + --tls-key-file ./tests/tls/client.key \ --tls-ca-cert-file ./tests/tls/ca.crt To connect to this Redis server with `keydb-cli`: diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index b6e2f8c6d..343198244 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -241,7 +241,7 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr) { int free_in_slab = extent_nfree_get(slab); if (free_in_slab) { const bin_info_t *bin_info = &bin_infos[binind]; - int curslabs = binshard->stats.curslabs; + ssize_t curslabs = binshard->stats.curslabs; size_t curregs = binshard->stats.curregs; if (binshard->slabcur) { /* remove slabcur from the overall utilization */ diff --git a/runtest b/runtest index c6349d118..dec3ee0cb 100755 --- a/runtest +++ b/runtest @@ -10,7 +10,7 @@ done if [ -z $TCLSH ] then - echo "You need tcl 8.5 or newer in order to run the Redis test" + echo "You need tcl 8.5 or newer in order to run the KeyDB test" exit 1 fi $TCLSH tests/test_helper.tcl "${@}" diff --git a/runtest-sentinel b/runtest-sentinel index 3fb1ef615..57a6c75bb 100755 --- a/runtest-sentinel +++ b/runtest-sentinel @@ -8,7 +8,7 @@ done if [ -z $TCLSH ] then - echo "You need tcl 8.5 or newer in order to run the Redis Sentinel test" + echo "You need tcl 8.5 or newer in order to run the KeyDB Sentinel test" exit 1 fi $TCLSH tests/sentinel/run.tcl $* diff --git a/src/Makefile b/src/Makefile index 9a0d282c6..179bb46e7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -125,6 +125,10 @@ DEBUG=-g -ggdb ifneq ($(uname_S),Darwin) FINAL_LIBS+=-latomic endif +# Linux ARM32 needs -latomic at linking time +ifneq (,$(findstring armv,$(uname_M))) + FINAL_LIBS+=-latomic +endif ifeq ($(uname_S),SunOS) @@ -340,8 +344,10 @@ REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX) REDIS_CHECK_AOF_NAME=keydb-check-aof$(PROG_SUFFIX) +KEYDB_DIAGNOSTIC_NAME=keydb-diagnostic-tool$(PROG_SUFFIX) +KEYDB_DIAGNOSTIC_OBJ=ae.o anet.o keydb-diagnostic-tool.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o keydb-diagnostic-tool.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) -all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) +all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) $(KEYDB_DIAGNOSTIC_NAME) @echo "" @echo "Hint: It's a good idea to run 'make test' ;)" @echo "" @@ -413,6 +419,10 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) +# keydb-diagnostic-tool +$(KEYDB_DIAGNOSTIC_NAME): $(KEYDB_DIAGNOSTIC_OBJ) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a $(FINAL_LIBS) + DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) -include $(DEP) @@ -429,7 +439,7 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ $(KEYDB_AS) $< -o $@ clean: - rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov KeyDB.info lcov-html Makefile.dep + rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) $(KEYDB_DIAGNOSTIC_NAME) *.o *.gcda *.gcno *.gcov KeyDB.info lcov-html Makefile.dep rm -f $(DEP) .PHONY: clean @@ -492,4 +502,4 @@ install: all @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME) uninstall: - rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)} + rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME),$(KEYDB_DIAGNOSTIC_NAME)} diff --git a/src/blocked.cpp b/src/blocked.cpp index c6d157a9c..f3d4ec9e5 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -173,12 +173,11 @@ void queueClientForReprocessing(client *c) { /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ serverAssert(GlobalLocksAcquired()); - fastlock_lock(&c->lock); + std::unique_lock ul(c->lock); if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; listAddNodeTail(g_pserver->rgthreadvar[c->iel].unblocked_clients,c); } - fastlock_unlock(&c->lock); } /* Unblock a client calling the right function depending on the kind @@ -792,4 +791,4 @@ void signalKeyAsReady(redisDb *db, sds key, int type) { redisObjectStack o; initStaticStringObject(o, key); signalKeyAsReady(db, &o, type); -} \ No newline at end of file +} diff --git a/src/cluster.cpp b/src/cluster.cpp index 98ae754d0..1acd9fdbf 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -561,7 +561,7 @@ void clusterInit(void) { serverAssert(serverTL == &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]); if (createSocketAcceptHandler(&g_pserver->cfd, clusterAcceptHandler) != C_OK) { - serverPanic("Unrecoverable error creating Redis Cluster socket accept handler."); + serverPanic("Unrecoverable error creating KeyDB Cluster socket accept handler."); } /* The slots -> keys map is a radix tree. Initialize it here. */ @@ -5567,9 +5567,10 @@ try_again: if (ttl < 1) ttl = 1; } - /* Relocate valid (non expired) keys into the array in successive + /* Relocate valid (non expired) keys and values into the array in successive * positions to remove holes created by the keys that were present * in the first lookup but are now expired after the second lookup. */ + ov[non_expired] = ov[j]; kv[non_expired++] = kv[j]; serverAssertWithInfo(c,NULL, diff --git a/src/db.cpp b/src/db.cpp index 0eccc5d0d..c69e19506 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1403,9 +1403,8 @@ void copyCommand(client *c) { } dbAdd(dst,newkey,newobj); - if (expire != nullptr) { - if (expire != nullptr) setExpire(c, dst, newkey, expire->duplicate()); - } + if (expire != nullptr) + setExpire(c, dst, newkey, expire->duplicate()); /* OK! key copied */ signalModifiedKey(c,dst,c->argv[2]); @@ -1771,7 +1770,7 @@ int keyIsExpired(redisDb *db, robj *key) { * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ if (g_pserver->lua_caller) { - now = g_pserver->lua_time_start; + now = g_pserver->lua_time_snapshot; } /* If we are in the middle of a command execution, we still want to use * a reference time that does not change: in that case we just use the @@ -1832,14 +1831,17 @@ int expireIfNeeded(redisDb *db, robj *key) { if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; /* Delete the key */ + if (g_pserver->lazyfree_lazy_expire) { + dbAsyncDelete(db,key); + } else { + dbSyncDelete(db,key); + } g_pserver->stat_expiredkeys++; propagateExpire(db,key,g_pserver->lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); - if (retval) signalModifiedKey(NULL,db,key); - return retval; + signalModifiedKey(NULL,db,key); + return 1; } /* ----------------------------------------------------------------------------- diff --git a/src/debug.cpp b/src/debug.cpp index 021cf9c4a..7b7fe29e1 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -1038,7 +1038,6 @@ void _serverPanic(const char *file, int line, const char *msg, ...) { vsnprintf(fmtmsg,sizeof(fmtmsg),msg,ap); va_end(ap); - g_fInCrash = true; bugReportStart(); serverLog(LL_WARNING,"------------------------------------------------"); serverLog(LL_WARNING,"!!! Software Failure. Press left mouse button to continue"); diff --git a/src/expire.cpp b/src/expire.cpp index 06b34616a..540c6eac7 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -821,8 +821,8 @@ void expireEntryFat::expireSubKey(const char *szSubkey, long long when) fFound = true; } if (fFound) { - m_vecexpireEntries.erase(itr); dictDelete(m_dictIndex, szSubkey); + m_vecexpireEntries.erase(itr); break; } ++itr; diff --git a/src/intset.c b/src/intset.c index 93963209e..fd634ed9d 100644 --- a/src/intset.c +++ b/src/intset.c @@ -281,7 +281,7 @@ uint32_t intsetLen(const intset *is) { /* Return intset blob size in bytes. */ size_t intsetBlobLen(intset *is) { - return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding); + return sizeof(intset)+(size_t)intrev32ifbe(is->length)*intrev32ifbe(is->encoding); } /* Validate the integrity of the data structure. diff --git a/src/keydb-diagnostic-tool.cpp b/src/keydb-diagnostic-tool.cpp new file mode 100644 index 000000000..06e6d938a --- /dev/null +++ b/src/keydb-diagnostic-tool.cpp @@ -0,0 +1,967 @@ +/* KeyDB diagnostic utility. + * + * Copyright (c) 2009-2021, Salvatore Sanfilippo + * Copyright (c) 2021, EQ Alpha Technology Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "fmacros.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include /* Use hiredis sds. */ +#include +#include "hiredis.h" +} +#include "ae.h" +#include "adlist.h" +#include "dict.h" +#include "zmalloc.h" +#include "storage.h" +#include "atomicvar.h" +#include "crc16_slottable.h" + +#define UNUSED(V) ((void) V) +#define RANDPTR_INITIAL_SIZE 8 +#define MAX_LATENCY_PRECISION 3 +#define MAX_THREADS 500 +#define CLUSTER_SLOTS 16384 + +#define CLIENT_GET_EVENTLOOP(c) \ + (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el) + +struct benchmarkThread; +struct clusterNode; +struct redisConfig; + +int g_fTestMode = false; + +static struct config { + aeEventLoop *el; + const char *hostip; + int hostport; + const char *hostsocket; + int numclients; + int liveclients; + int period_ms; + int requests; + int requests_issued; + int requests_finished; + int keysize; + int datasize; + int randomkeys; + int randomkeys_keyspacelen; + int keepalive; + int pipeline; + int showerrors; + long long start; + long long totlatency; + long long *latency; + const char *title; + list *clients; + int quiet; + int csv; + int loop; + int idlemode; + int dbnum; + sds dbnumstr; + char *tests; + char *auth; + const char *user; + int precision; + int max_threads; + struct benchmarkThread **threads; + int cluster_mode; + int cluster_node_count; + struct clusterNode **cluster_nodes; + struct redisConfig *redis_config; + int is_fetching_slots; + int is_updating_slots; + int slots_last_update; + int enable_tracking; + /* Thread mutexes to be used as fallbacks by atomicvar.h */ + pthread_mutex_t requests_issued_mutex; + pthread_mutex_t requests_finished_mutex; + pthread_mutex_t liveclients_mutex; + pthread_mutex_t is_fetching_slots_mutex; + pthread_mutex_t is_updating_slots_mutex; + pthread_mutex_t updating_slots_mutex; + pthread_mutex_t slots_last_update_mutex; +} config; + +typedef struct _client { + redisContext *context; + sds obuf; + char **randptr; /* Pointers to :rand: strings inside the command buf */ + size_t randlen; /* Number of pointers in client->randptr */ + size_t randfree; /* Number of unused pointers in client->randptr */ + char **stagptr; /* Pointers to slot hashtags (cluster mode only) */ + size_t staglen; /* Number of pointers in client->stagptr */ + size_t stagfree; /* Number of unused pointers in client->stagptr */ + size_t written; /* Bytes of 'obuf' already written */ + long long start; /* Start time of a request */ + long long latency; /* Request latency */ + int pending; /* Number of pending requests (replies to consume) */ + int prefix_pending; /* If non-zero, number of pending prefix commands. Commands + such as auth and select are prefixed to the pipeline of + benchmark commands and discarded after the first send. */ + int prefixlen; /* Size in bytes of the pending prefix commands */ + int thread_id; + struct clusterNode *cluster_node; + int slots_last_update; + redisReply *lastReply; +} *client; + +/* Threads. */ + +typedef struct benchmarkThread { + int index; + pthread_t thread; + aeEventLoop *el; +} benchmarkThread; + +/* Cluster. */ +typedef struct clusterNode { + char *ip; + int port; + sds name; + int flags; + sds replicate; /* Master ID if node is a replica */ + int *slots; + int slots_count; + int current_slot_index; + int *updated_slots; /* Used by updateClusterSlotsConfiguration */ + int updated_slots_count; /* Used by updateClusterSlotsConfiguration */ + int replicas_count; + sds *migrating; /* An array of sds where even strings are slots and odd + * strings are the destination node IDs. */ + sds *importing; /* An array of sds where even strings are slots and odd + * strings are the source node IDs. */ + int migrating_count; /* Length of the migrating array (migrating slots*2) */ + int importing_count; /* Length of the importing array (importing slots*2) */ + struct redisConfig *redis_config; +} clusterNode; + +typedef struct redisConfig { + sds save; + sds appendonly; +} redisConfig; + +int g_fInCrash = false; + +/* Prototypes */ +static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); +static benchmarkThread *createBenchmarkThread(int index); +static void freeBenchmarkThread(benchmarkThread *thread); +static void freeBenchmarkThreads(); +static redisContext *getRedisContext(const char *ip, int port, + const char *hostsocket); + +/* Implementation */ +static long long ustime(void) { + struct timeval tv; + long long ust; + + gettimeofday(&tv, NULL); + ust = ((long)tv.tv_sec)*1000000; + ust += tv.tv_usec; + return ust; +} + +/* _serverAssert is needed by dict */ +extern "C" void _serverAssert(const char *estr, const char *file, int line) { + fprintf(stderr, "=== ASSERTION FAILED ==="); + fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr); + *((char*)-1) = 'x'; +} + +static redisContext *getRedisContext(const char *ip, int port, + const char *hostsocket) +{ + redisContext *ctx = NULL; + redisReply *reply = NULL; + if (hostsocket == NULL) + ctx = redisConnect(ip, port); + else + ctx = redisConnectUnix(hostsocket); + if (ctx == NULL || ctx->err) { + fprintf(stderr,"Could not connect to Redis at "); + const char *err = (ctx != NULL ? ctx->errstr : ""); + if (hostsocket == NULL) + fprintf(stderr,"%s:%d: %s\n",ip,port,err); + else + fprintf(stderr,"%s: %s\n",hostsocket,err); + goto cleanup; + } + if (config.auth == NULL) + return ctx; + if (config.user == NULL) + reply = (redisReply*)redisCommand(ctx,"AUTH %s", config.auth); + else + reply = (redisReply*)redisCommand(ctx,"AUTH %s %s", config.user, config.auth); + if (reply != NULL) { + if (reply->type == REDIS_REPLY_ERROR) { + if (hostsocket == NULL) + fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str); + else + fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str); + goto cleanup; + } + freeReplyObject(reply); + return ctx; + } + fprintf(stderr, "ERROR: failed to fetch reply from "); + if (hostsocket == NULL) + fprintf(stderr, "%s:%d\n", ip, port); + else + fprintf(stderr, "%s\n", hostsocket); +cleanup: + freeReplyObject(reply); + redisFree(ctx); + return NULL; +} + +static void freeClient(client c) { + aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); + listNode *ln; + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeDeleteFileEvent(el,c->context->fd,AE_READABLE); + if (c->thread_id >= 0) { + int requests_finished = 0; + atomicGet(config.requests_finished, requests_finished); + if (requests_finished >= config.requests) { + aeStop(el); + } + } + redisFree(c->context); + sdsfree(c->obuf); + zfree(c->randptr); + zfree(c->stagptr); + zfree(c); + if (config.max_threads) pthread_mutex_lock(&(config.liveclients_mutex)); + config.liveclients--; + ln = listSearchKey(config.clients,c); + assert(ln != NULL); + listDelNode(config.clients,ln); + if (config.max_threads) pthread_mutex_unlock(&(config.liveclients_mutex)); +} + +static void freeAllClients(void) { + listNode *ln = config.clients->head, *next; + + while(ln) { + next = ln->next; + freeClient((client)ln->value); + ln = next; + } +} + +static void resetClient(client c) { + aeEventLoop *el = CLIENT_GET_EVENTLOOP(c); + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeDeleteFileEvent(el,c->context->fd,AE_READABLE); + aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); + c->written = 0; + c->pending = config.pipeline; +} + +static void randomizeClientKey(client c) { + size_t i; + + for (i = 0; i < c->randlen; i++) { + char *p = c->randptr[i]+11; + size_t r = 0; + if (config.randomkeys_keyspacelen != 0) + r = random() % config.randomkeys_keyspacelen; + size_t j; + + for (j = 0; j < 12; j++) { + *p = '0'+r%10; + r/=10; + p--; + } + } +} + +static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + client c = (client)privdata; + void *reply = NULL; + UNUSED(el); + UNUSED(fd); + UNUSED(mask); + + /* Calculate latency only for the first read event. This means that the + * server already sent the reply and we need to parse it. Parsing overhead + * is not part of the latency, so calculate it only once, here. */ + if (c->latency < 0) c->latency = ustime()-(c->start); + + if (redisBufferRead(c->context) != REDIS_OK) { + fprintf(stderr,"Error: %s\n",c->context->errstr); + exit(1); + } else { + while(c->pending) { + if (redisGetReply(c->context,&reply) != REDIS_OK) { + fprintf(stderr,"Error: %s\n",c->context->errstr); + exit(1); + } + if (reply != NULL) { + if (reply == (void*)REDIS_REPLY_ERROR) { + fprintf(stderr,"Unexpected error reply, exiting...\n"); + exit(1); + } + redisReply *r = (redisReply*)reply; + int is_err = (r->type == REDIS_REPLY_ERROR); + + if (is_err && config.showerrors) { + /* TODO: static lasterr_time not thread-safe */ + static time_t lasterr_time = 0; + time_t now = time(NULL); + if (lasterr_time != now) { + lasterr_time = now; + if (c->cluster_node) { + printf("Error from server %s:%d: %s\n", + c->cluster_node->ip, + c->cluster_node->port, + r->str); + } else printf("Error from server: %s\n", r->str); + } + } + + freeReplyObject(reply); + /* This is an OK for prefix commands such as auth and select.*/ + if (c->prefix_pending > 0) { + c->prefix_pending--; + c->pending--; + /* Discard prefix commands on first response.*/ + if (c->prefixlen > 0) { + size_t j; + sdsrange(c->obuf, c->prefixlen, -1); + /* We also need to fix the pointers to the strings + * we need to randomize. */ + for (j = 0; j < c->randlen; j++) + c->randptr[j] -= c->prefixlen; + c->prefixlen = 0; + } + continue; + } + int requests_finished = 0; + atomicGetIncr(config.requests_finished, requests_finished, 1); + if (requests_finished < config.requests) + config.latency[requests_finished] = c->latency; + c->pending--; + if (c->pending == 0) { + resetClient(c); + break; + } + } else { + break; + } + } + } +} + +static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + client c = (client)privdata; + UNUSED(el); + UNUSED(fd); + UNUSED(mask); + + /* Initialize request when nothing was written. */ + if (c->written == 0) { + /* Really initialize: randomize keys and set start time. */ + if (config.randomkeys) randomizeClientKey(c); + atomicGet(config.slots_last_update, c->slots_last_update); + c->start = ustime(); + c->latency = -1; + } + if (sdslen(c->obuf) > c->written) { + void *ptr = c->obuf+c->written; + ssize_t nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written); + if (nwritten == -1) { + if (errno != EPIPE) + fprintf(stderr, "Writing to socket: %s\n", strerror(errno)); + freeClient(c); + return; + } + c->written += nwritten; + if (sdslen(c->obuf) == c->written) { + aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE); + aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c); + } + } +} + +/* Create a benchmark client, configured to send the command passed as 'cmd' of + * 'len' bytes. + * + * The command is copied N times in the client output buffer (that is reused + * again and again to send the request to the server) accordingly to the configured + * pipeline size. + * + * Also an initial SELECT command is prepended in order to make sure the right + * database is selected, if needed. The initial SELECT will be discarded as soon + * as the first reply is received. + * + * To create a client from scratch, the 'from' pointer is set to NULL. If instead + * we want to create a client using another client as reference, the 'from' pointer + * points to the client to use as reference. In such a case the following + * information is take from the 'from' client: + * + * 1) The command line to use. + * 2) The offsets of the __rand_int__ elements inside the command line, used + * for arguments randomization. + * + * Even when cloning another client, prefix commands are applied if needed.*/ +static client createClient(const char *cmd, size_t len, client from, int thread_id) { + int j; + int is_cluster_client = (config.cluster_mode && thread_id >= 0); + client c = (client)zmalloc(sizeof(struct _client), MALLOC_LOCAL); + + const char *ip = NULL; + int port = 0; + c->cluster_node = NULL; + if (config.hostsocket == NULL || is_cluster_client) { + if (!is_cluster_client) { + ip = config.hostip; + port = config.hostport; + } else { + int node_idx = 0; + if (config.max_threads < config.cluster_node_count) + node_idx = config.liveclients % config.cluster_node_count; + else + node_idx = thread_id % config.cluster_node_count; + clusterNode *node = config.cluster_nodes[node_idx]; + assert(node != NULL); + ip = (const char *) node->ip; + port = node->port; + c->cluster_node = node; + } + c->context = redisConnectNonBlock(ip,port); + } else { + c->context = redisConnectUnixNonBlock(config.hostsocket); + } + if (c->context->err) { + fprintf(stderr,"Could not connect to Redis at "); + if (config.hostsocket == NULL || is_cluster_client) + fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr); + else + fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr); + exit(1); + } + c->thread_id = thread_id; + /* Suppress hiredis cleanup of unused buffers for max speed. */ + c->context->reader->maxbuf = 0; + + /* Build the request buffer: + * Queue N requests accordingly to the pipeline size, or simply clone + * the example client buffer. */ + c->obuf = sdsempty(); + /* Prefix the request buffer with AUTH and/or SELECT commands, if applicable. + * These commands are discarded after the first response, so if the client is + * reused the commands will not be used again. */ + c->prefix_pending = 0; + if (config.auth) { + char *buf = NULL; + int len; + if (config.user == NULL) + len = redisFormatCommand(&buf, "AUTH %s", config.auth); + else + len = redisFormatCommand(&buf, "AUTH %s %s", + config.user, config.auth); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + + if (config.enable_tracking) { + char *buf = NULL; + int len = redisFormatCommand(&buf, "CLIENT TRACKING on"); + c->obuf = sdscatlen(c->obuf, buf, len); + free(buf); + c->prefix_pending++; + } + + /* If a DB number different than zero is selected, prefix our request + * buffer with the SELECT command, that will be discarded the first + * time the replies are received, so if the client is reused the + * SELECT command will not be used again. */ + if (config.dbnum != 0 && !is_cluster_client) { + c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + (int)sdslen(config.dbnumstr),config.dbnumstr); + c->prefix_pending++; + } + c->prefixlen = sdslen(c->obuf); + /* Append the request itself. */ + if (from) { + c->obuf = sdscatlen(c->obuf, + from->obuf+from->prefixlen, + sdslen(from->obuf)-from->prefixlen); + } else { + for (j = 0; j < config.pipeline; j++) + c->obuf = sdscatlen(c->obuf,cmd,len); + } + + c->written = 0; + c->pending = config.pipeline+c->prefix_pending; + c->randptr = NULL; + c->randlen = 0; + c->stagptr = NULL; + c->staglen = 0; + + /* Find substrings in the output buffer that need to be randomized. */ + if (config.randomkeys) { + if (from) { + c->randlen = from->randlen; + c->randfree = 0; + c->randptr = (char**)zmalloc(sizeof(char*)*c->randlen, MALLOC_LOCAL); + /* copy the offsets. */ + for (j = 0; j < (int)c->randlen; j++) { + c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf); + /* Adjust for the different select prefix length. */ + c->randptr[j] += c->prefixlen - from->prefixlen; + } + } else { + char *p = c->obuf; + + c->randlen = 0; + c->randfree = RANDPTR_INITIAL_SIZE; + c->randptr = (char**)zmalloc(sizeof(char*)*c->randfree, MALLOC_LOCAL); + while ((p = strstr(p,"__rand_int__")) != NULL) { + if (c->randfree == 0) { + c->randptr = (char**)zrealloc(c->randptr,sizeof(char*)*c->randlen*2, MALLOC_LOCAL); + c->randfree += c->randlen; + } + c->randptr[c->randlen++] = p; + c->randfree--; + p += 12; /* 12 is strlen("__rand_int__). */ + } + } + } + /* If cluster mode is enabled, set slot hashtags pointers. */ + if (config.cluster_mode) { + if (from) { + c->staglen = from->staglen; + c->stagfree = 0; + c->stagptr = (char**)zmalloc(sizeof(char*)*c->staglen, MALLOC_LOCAL); + /* copy the offsets. */ + for (j = 0; j < (int)c->staglen; j++) { + c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf); + /* Adjust for the different select prefix length. */ + c->stagptr[j] += c->prefixlen - from->prefixlen; + } + } else { + char *p = c->obuf; + + c->staglen = 0; + c->stagfree = RANDPTR_INITIAL_SIZE; + c->stagptr = (char**)zmalloc(sizeof(char*)*c->stagfree, MALLOC_LOCAL); + while ((p = strstr(p,"{tag}")) != NULL) { + if (c->stagfree == 0) { + c->stagptr = (char**)zrealloc(c->stagptr, + sizeof(char*) * c->staglen*2, MALLOC_LOCAL); + c->stagfree += c->staglen; + } + c->stagptr[c->staglen++] = p; + c->stagfree--; + p += 5; /* 5 is strlen("{tag}"). */ + } + } + } + aeEventLoop *el = NULL; + if (thread_id < 0) el = config.el; + else { + benchmarkThread *thread = config.threads[thread_id]; + el = thread->el; + } + if (config.idlemode == 0) + aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c); + listAddNodeTail(config.clients,c); + atomicIncr(config.liveclients, 1); + atomicGet(config.slots_last_update, c->slots_last_update); + return c; +} + +static void initBenchmarkThreads() { + int i; + if (config.threads) freeBenchmarkThreads(); + config.threads = (benchmarkThread**)zmalloc(config.max_threads * sizeof(benchmarkThread*), MALLOC_LOCAL); + for (i = 0; i < config.max_threads; i++) { + benchmarkThread *thread = createBenchmarkThread(i); + config.threads[i] = thread; + } +} + +/* Thread functions. */ + +static benchmarkThread *createBenchmarkThread(int index) { + benchmarkThread *thread = (benchmarkThread*)zmalloc(sizeof(*thread), MALLOC_LOCAL); + if (thread == NULL) return NULL; + thread->index = index; + thread->el = aeCreateEventLoop(1024*10); + return thread; +} + +static void freeBenchmarkThread(benchmarkThread *thread) { + if (thread->el) aeDeleteEventLoop(thread->el); + zfree(thread); +} + +static void freeBenchmarkThreads() { + int i = 0; + for (; i < config.max_threads; i++) { + benchmarkThread *thread = config.threads[i]; + if (thread) freeBenchmarkThread(thread); + } + zfree(config.threads); + config.threads = NULL; +} + +static void *execBenchmarkThread(void *ptr) { + benchmarkThread *thread = (benchmarkThread *) ptr; + aeMain(thread->el); + return NULL; +} + +void initConfigDefaults() { + config.numclients = 50; + config.requests = 100000; + config.liveclients = 0; + config.el = aeCreateEventLoop(1024*10); + config.keepalive = 1; + config.datasize = 3; + config.pipeline = 1; + config.period_ms = 5000; + config.showerrors = 0; + config.randomkeys = 0; + config.randomkeys_keyspacelen = 0; + config.quiet = 0; + config.csv = 0; + config.loop = 0; + config.idlemode = 0; + config.latency = NULL; + config.clients = listCreate(); + config.hostip = "127.0.0.1"; + config.hostport = 6379; + config.hostsocket = NULL; + config.tests = NULL; + config.dbnum = 0; + config.auth = NULL; + config.precision = 1; + config.max_threads = MAX_THREADS; + config.threads = NULL; + config.cluster_mode = 0; + config.cluster_node_count = 0; + config.cluster_nodes = NULL; + config.redis_config = NULL; + config.is_fetching_slots = 0; + config.is_updating_slots = 0; + config.slots_last_update = 0; + config.enable_tracking = 0; +} + +/* Returns number of consumed options. */ +int parseOptions(int argc, const char **argv) { + int i; + int lastarg; + int exit_status = 1; + + for (i = 1; i < argc; i++) { + lastarg = (i == (argc-1)); + + if (!strcmp(argv[i],"-c") || !strcmp(argv[i],"--clients")) { + if (lastarg) goto invalid; + config.numclients = atoi(argv[++i]); + } else if (!strcmp(argv[i],"--time")) { + if (lastarg) goto invalid; + config.period_ms = atoi(argv[++i]); + if (config.period_ms <= 0) { + printf("Warning: Invalid value for thread time. Defaulting to 5000ms.\n"); + config.period_ms = 5000; + } + } else if (!strcmp(argv[i],"-h") || !strcmp(argv[i],"--host")) { + if (lastarg) goto invalid; + config.hostip = strdup(argv[++i]); + } else if (!strcmp(argv[i],"-p") || !strcmp(argv[i],"--port")) { + if (lastarg) goto invalid; + config.hostport = atoi(argv[++i]); + } else if (!strcmp(argv[i],"-s")) { + if (lastarg) goto invalid; + config.hostsocket = strdup(argv[++i]); + } else if (!strcmp(argv[i],"--password") ) { + if (lastarg) goto invalid; + config.auth = strdup(argv[++i]); + } else if (!strcmp(argv[i],"--user")) { + if (lastarg) goto invalid; + config.user = argv[++i]; + } else if (!strcmp(argv[i],"--dbnum")) { + if (lastarg) goto invalid; + config.dbnum = atoi(argv[++i]); + config.dbnumstr = sdsfromlonglong(config.dbnum); + } else if (!strcmp(argv[i],"-t") || !strcmp(argv[i],"--threads")) { + if (lastarg) goto invalid; + config.max_threads = atoi(argv[++i]); + if (config.max_threads > MAX_THREADS) { + printf("Warning: Too many threads, limiting threads to %d.\n", MAX_THREADS); + config.max_threads = MAX_THREADS; + } else if (config.max_threads <= 0) { + printf("Warning: Invalid value for max threads. Defaulting to %d.\n", MAX_THREADS); + config.max_threads = MAX_THREADS; + } + } else if (!strcmp(argv[i],"--help")) { + exit_status = 0; + goto usage; + } else { + /* Assume the user meant to provide an option when the arg starts + * with a dash. We're done otherwise and should use the remainder + * as the command and arguments for running the benchmark. */ + if (argv[i][0] == '-') goto invalid; + return i; + } + } + + return i; + +invalid: + printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]); + +usage: + printf( +"Usage: keydb-benchmark [-h ] [-p ] [-c ] [-n ] [-k ]\n\n" +" -h, --host Server hostname (default 127.0.0.1)\n" +" -p, --port Server port (default 6379)\n" +" -c Number of parallel connections (default 50)\n" +" -t, --threads Maximum number of threads to start before ending\n" +" --time