Merge remote-tracking branch 'origin/unstable' into ci-flags-fix

Former-commit-id: da1f09e9b551cacdfd24dc839ee659a5e5e1e1de
This commit is contained in:
christianEQ 2021-07-14 22:56:15 +00:00
commit c502e6f2a1
37 changed files with 1210 additions and 154 deletions

1
.gitignore vendored
View File

@ -30,6 +30,7 @@ redis-check-rdb
keydb-check-rdb
redis-check-dump
keydb-check-dump
keydb-diagnostic-tool
redis-cli
redis-sentinel
redis-server

View File

@ -11,6 +11,40 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release.
--------------------------------------------------------------------------------
================================================================================
Redis 6.2.3 Released Mon May 3 19:00:00 IST 2021
================================================================================
Upgrade urgency: SECURITY, Contains fixes to security issues that affect
authenticated client connections. LOW otherwise.
Integer overflow in STRALGO LCS command (CVE-2021-29477):
An integer overflow bug in Redis version 6.0 or newer could be exploited using
the STRALGO LCS command to corrupt the heap and potentially result in remote
code execution. The integer overflow bug exists in all versions of Redis
starting with 6.0.
Integer overflow in COPY command for large intsets (CVE-2021-29478):
An integer overflow bug in Redis 6.2 could be exploited to corrupt the heap and
potentially result with remote code execution. The vulnerability involves
changing the default set-max-intset-entries configuration value, creating a
large set key that consists of integer values and using the COPY command to
duplicate it. The integer overflow bug exists in all versions of Redis starting
with 2.6, where it could result with a corrupted RDB or DUMP payload, but not
exploited through COPY (which did not exist before 6.2).
Bug fixes that are only applicable to previous releases of Redis 6.2:
* Fix memory leak in moduleDefragGlobals (#8853)
* Fix memory leak when doing lazy freeing client tracking table (#8822)
* Block abusive replicas from sending command that could assert and crash redis (#8868)
Other bug fixes:
* Use a monotonic clock to check for Lua script timeout (#8812)
* redis-cli: Do not use unix socket when we got redirected in cluster mode (#8870)
Modules:
* Fix RM_GetClusterNodeInfo() to correctly populate master id (#8846)
================================================================================
Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021
================================================================================

4
TLS.md
View File

@ -28,8 +28,8 @@ To manually run a Redis server with TLS mode (assuming `gen-test-certs.sh` was
invoked so sample certificates/keys are available):
./src/keydb-server --tls-port 6379 --port 0 \
--tls-cert-file ./tests/tls/keydb.crt \
--tls-key-file ./tests/tls/keydb.key \
--tls-cert-file ./tests/tls/client.crt \
--tls-key-file ./tests/tls/client.key \
--tls-ca-cert-file ./tests/tls/ca.crt
To connect to this Redis server with `keydb-cli`:

View File

@ -241,7 +241,7 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr) {
int free_in_slab = extent_nfree_get(slab);
if (free_in_slab) {
const bin_info_t *bin_info = &bin_infos[binind];
int curslabs = binshard->stats.curslabs;
ssize_t curslabs = binshard->stats.curslabs;
size_t curregs = binshard->stats.curregs;
if (binshard->slabcur) {
/* remove slabcur from the overall utilization */

View File

@ -10,7 +10,7 @@ done
if [ -z $TCLSH ]
then
echo "You need tcl 8.5 or newer in order to run the Redis test"
echo "You need tcl 8.5 or newer in order to run the KeyDB test"
exit 1
fi
$TCLSH tests/test_helper.tcl "${@}"

View File

@ -8,7 +8,7 @@ done
if [ -z $TCLSH ]
then
echo "You need tcl 8.5 or newer in order to run the Redis Sentinel test"
echo "You need tcl 8.5 or newer in order to run the KeyDB Sentinel test"
exit 1
fi
$TCLSH tests/sentinel/run.tcl $*

View File

@ -125,6 +125,10 @@ DEBUG=-g -ggdb
ifneq ($(uname_S),Darwin)
FINAL_LIBS+=-latomic
endif
# Linux ARM32 needs -latomic at linking time
ifneq (,$(findstring armv,$(uname_M)))
FINAL_LIBS+=-latomic
endif
ifeq ($(uname_S),SunOS)
@ -340,8 +344,10 @@ REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX)
REDIS_CHECK_AOF_NAME=keydb-check-aof$(PROG_SUFFIX)
KEYDB_DIAGNOSTIC_NAME=keydb-diagnostic-tool$(PROG_SUFFIX)
KEYDB_DIAGNOSTIC_OBJ=ae.o anet.o keydb-diagnostic-tool.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o keydb-diagnostic-tool.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME)
all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) $(KEYDB_DIAGNOSTIC_NAME)
@echo ""
@echo "Hint: It's a good idea to run 'make test' ;)"
@echo ""
@ -413,6 +419,10 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
$(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS)
# keydb-diagnostic-tool
$(KEYDB_DIAGNOSTIC_NAME): $(KEYDB_DIAGNOSTIC_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a $(FINAL_LIBS)
DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d)
-include $(DEP)
@ -429,7 +439,7 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ
$(KEYDB_AS) $< -o $@
clean:
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov KeyDB.info lcov-html Makefile.dep
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) $(KEYDB_DIAGNOSTIC_NAME) *.o *.gcda *.gcno *.gcov KeyDB.info lcov-html Makefile.dep
rm -f $(DEP)
.PHONY: clean
@ -492,4 +502,4 @@ install: all
@ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME)
uninstall:
rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)}
rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME),$(KEYDB_DIAGNOSTIC_NAME)}

View File

@ -173,12 +173,11 @@ void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
serverAssert(GlobalLocksAcquired());
fastlock_lock(&c->lock);
std::unique_lock<fastlock> ul(c->lock);
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(g_pserver->rgthreadvar[c->iel].unblocked_clients,c);
}
fastlock_unlock(&c->lock);
}
/* Unblock a client calling the right function depending on the kind
@ -792,4 +791,4 @@ void signalKeyAsReady(redisDb *db, sds key, int type) {
redisObjectStack o;
initStaticStringObject(o, key);
signalKeyAsReady(db, &o, type);
}
}

View File

@ -561,7 +561,7 @@ void clusterInit(void) {
serverAssert(serverTL == &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]);
if (createSocketAcceptHandler(&g_pserver->cfd, clusterAcceptHandler) != C_OK) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
serverPanic("Unrecoverable error creating KeyDB Cluster socket accept handler.");
}
/* The slots -> keys map is a radix tree. Initialize it here. */
@ -5567,9 +5567,10 @@ try_again:
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys into the array in successive
/* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,

View File

@ -1403,9 +1403,8 @@ void copyCommand(client *c) {
}
dbAdd(dst,newkey,newobj);
if (expire != nullptr) {
if (expire != nullptr) setExpire(c, dst, newkey, expire->duplicate());
}
if (expire != nullptr)
setExpire(c, dst, newkey, expire->duplicate());
/* OK! key copied */
signalModifiedKey(c,dst,c->argv[2]);
@ -1771,7 +1770,7 @@ int keyIsExpired(redisDb *db, robj *key) {
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
if (g_pserver->lua_caller) {
now = g_pserver->lua_time_start;
now = g_pserver->lua_time_snapshot;
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
@ -1832,14 +1831,17 @@ int expireIfNeeded(redisDb *db, robj *key) {
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
/* Delete the key */
if (g_pserver->lazyfree_lazy_expire) {
dbAsyncDelete(db,key);
} else {
dbSyncDelete(db,key);
}
g_pserver->stat_expiredkeys++;
propagateExpire(db,key,g_pserver->lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
signalModifiedKey(NULL,db,key);
return 1;
}
/* -----------------------------------------------------------------------------

View File

@ -1038,7 +1038,6 @@ void _serverPanic(const char *file, int line, const char *msg, ...) {
vsnprintf(fmtmsg,sizeof(fmtmsg),msg,ap);
va_end(ap);
g_fInCrash = true;
bugReportStart();
serverLog(LL_WARNING,"------------------------------------------------");
serverLog(LL_WARNING,"!!! Software Failure. Press left mouse button to continue");

View File

@ -821,8 +821,8 @@ void expireEntryFat::expireSubKey(const char *szSubkey, long long when)
fFound = true;
}
if (fFound) {
m_vecexpireEntries.erase(itr);
dictDelete(m_dictIndex, szSubkey);
m_vecexpireEntries.erase(itr);
break;
}
++itr;

View File

@ -281,7 +281,7 @@ uint32_t intsetLen(const intset *is) {
/* Return intset blob size in bytes. */
size_t intsetBlobLen(intset *is) {
return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
return sizeof(intset)+(size_t)intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
}
/* Validate the integrity of the data structure.

View File

@ -0,0 +1,967 @@
/* KeyDB diagnostic utility.
*
* Copyright (c) 2009-2021, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2021, EQ Alpha Technology Ltd. <john at eqalpha dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "fmacros.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <signal.h>
#include <assert.h>
#include <math.h>
#include <pthread.h>
#include <deque>
extern "C" {
#include <sds.h> /* Use hiredis sds. */
#include <sdscompat.h>
#include "hiredis.h"
}
#include "ae.h"
#include "adlist.h"
#include "dict.h"
#include "zmalloc.h"
#include "storage.h"
#include "atomicvar.h"
#include "crc16_slottable.h"
#define UNUSED(V) ((void) V)
#define RANDPTR_INITIAL_SIZE 8
#define MAX_LATENCY_PRECISION 3
#define MAX_THREADS 500
#define CLUSTER_SLOTS 16384
#define CLIENT_GET_EVENTLOOP(c) \
(c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
struct benchmarkThread;
struct clusterNode;
struct redisConfig;
int g_fTestMode = false;
static struct config {
aeEventLoop *el;
const char *hostip;
int hostport;
const char *hostsocket;
int numclients;
int liveclients;
int period_ms;
int requests;
int requests_issued;
int requests_finished;
int keysize;
int datasize;
int randomkeys;
int randomkeys_keyspacelen;
int keepalive;
int pipeline;
int showerrors;
long long start;
long long totlatency;
long long *latency;
const char *title;
list *clients;
int quiet;
int csv;
int loop;
int idlemode;
int dbnum;
sds dbnumstr;
char *tests;
char *auth;
const char *user;
int precision;
int max_threads;
struct benchmarkThread **threads;
int cluster_mode;
int cluster_node_count;
struct clusterNode **cluster_nodes;
struct redisConfig *redis_config;
int is_fetching_slots;
int is_updating_slots;
int slots_last_update;
int enable_tracking;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
pthread_mutex_t is_fetching_slots_mutex;
pthread_mutex_t is_updating_slots_mutex;
pthread_mutex_t updating_slots_mutex;
pthread_mutex_t slots_last_update_mutex;
} config;
typedef struct _client {
redisContext *context;
sds obuf;
char **randptr; /* Pointers to :rand: strings inside the command buf */
size_t randlen; /* Number of pointers in client->randptr */
size_t randfree; /* Number of unused pointers in client->randptr */
char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
size_t staglen; /* Number of pointers in client->stagptr */
size_t stagfree; /* Number of unused pointers in client->stagptr */
size_t written; /* Bytes of 'obuf' already written */
long long start; /* Start time of a request */
long long latency; /* Request latency */
int pending; /* Number of pending requests (replies to consume) */
int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
such as auth and select are prefixed to the pipeline of
benchmark commands and discarded after the first send. */
int prefixlen; /* Size in bytes of the pending prefix commands */
int thread_id;
struct clusterNode *cluster_node;
int slots_last_update;
redisReply *lastReply;
} *client;
/* Threads. */
typedef struct benchmarkThread {
int index;
pthread_t thread;
aeEventLoop *el;
} benchmarkThread;
/* Cluster. */
typedef struct clusterNode {
char *ip;
int port;
sds name;
int flags;
sds replicate; /* Master ID if node is a replica */
int *slots;
int slots_count;
int current_slot_index;
int *updated_slots; /* Used by updateClusterSlotsConfiguration */
int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
int replicas_count;
sds *migrating; /* An array of sds where even strings are slots and odd
* strings are the destination node IDs. */
sds *importing; /* An array of sds where even strings are slots and odd
* strings are the source node IDs. */
int migrating_count; /* Length of the migrating array (migrating slots*2) */
int importing_count; /* Length of the importing array (importing slots*2) */
struct redisConfig *redis_config;
} clusterNode;
typedef struct redisConfig {
sds save;
sds appendonly;
} redisConfig;
int g_fInCrash = false;
/* Prototypes */
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
static benchmarkThread *createBenchmarkThread(int index);
static void freeBenchmarkThread(benchmarkThread *thread);
static void freeBenchmarkThreads();
static redisContext *getRedisContext(const char *ip, int port,
const char *hostsocket);
/* Implementation */
static long long ustime(void) {
struct timeval tv;
long long ust;
gettimeofday(&tv, NULL);
ust = ((long)tv.tv_sec)*1000000;
ust += tv.tv_usec;
return ust;
}
/* _serverAssert is needed by dict */
extern "C" void _serverAssert(const char *estr, const char *file, int line) {
fprintf(stderr, "=== ASSERTION FAILED ===");
fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
*((char*)-1) = 'x';
}
static redisContext *getRedisContext(const char *ip, int port,
const char *hostsocket)
{
redisContext *ctx = NULL;
redisReply *reply = NULL;
if (hostsocket == NULL)
ctx = redisConnect(ip, port);
else
ctx = redisConnectUnix(hostsocket);
if (ctx == NULL || ctx->err) {
fprintf(stderr,"Could not connect to Redis at ");
const char *err = (ctx != NULL ? ctx->errstr : "");
if (hostsocket == NULL)
fprintf(stderr,"%s:%d: %s\n",ip,port,err);
else
fprintf(stderr,"%s: %s\n",hostsocket,err);
goto cleanup;
}
if (config.auth == NULL)
return ctx;
if (config.user == NULL)
reply = (redisReply*)redisCommand(ctx,"AUTH %s", config.auth);
else
reply = (redisReply*)redisCommand(ctx,"AUTH %s %s", config.user, config.auth);
if (reply != NULL) {
if (reply->type == REDIS_REPLY_ERROR) {
if (hostsocket == NULL)
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
else
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
goto cleanup;
}
freeReplyObject(reply);
return ctx;
}
fprintf(stderr, "ERROR: failed to fetch reply from ");
if (hostsocket == NULL)
fprintf(stderr, "%s:%d\n", ip, port);
else
fprintf(stderr, "%s\n", hostsocket);
cleanup:
freeReplyObject(reply);
redisFree(ctx);
return NULL;
}
static void freeClient(client c) {
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
listNode *ln;
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
if (c->thread_id >= 0) {
int requests_finished = 0;
atomicGet(config.requests_finished, requests_finished);
if (requests_finished >= config.requests) {
aeStop(el);
}
}
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
zfree(c->stagptr);
zfree(c);
if (config.max_threads) pthread_mutex_lock(&(config.liveclients_mutex));
config.liveclients--;
ln = listSearchKey(config.clients,c);
assert(ln != NULL);
listDelNode(config.clients,ln);
if (config.max_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
}
static void freeAllClients(void) {
listNode *ln = config.clients->head, *next;
while(ln) {
next = ln->next;
freeClient((client)ln->value);
ln = next;
}
}
static void resetClient(client c) {
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
c->written = 0;
c->pending = config.pipeline;
}
static void randomizeClientKey(client c) {
size_t i;
for (i = 0; i < c->randlen; i++) {
char *p = c->randptr[i]+11;
size_t r = 0;
if (config.randomkeys_keyspacelen != 0)
r = random() % config.randomkeys_keyspacelen;
size_t j;
for (j = 0; j < 12; j++) {
*p = '0'+r%10;
r/=10;
p--;
}
}
}
static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
client c = (client)privdata;
void *reply = NULL;
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
/* Calculate latency only for the first read event. This means that the
* server already sent the reply and we need to parse it. Parsing overhead
* is not part of the latency, so calculate it only once, here. */
if (c->latency < 0) c->latency = ustime()-(c->start);
if (redisBufferRead(c->context) != REDIS_OK) {
fprintf(stderr,"Error: %s\n",c->context->errstr);
exit(1);
} else {
while(c->pending) {
if (redisGetReply(c->context,&reply) != REDIS_OK) {
fprintf(stderr,"Error: %s\n",c->context->errstr);
exit(1);
}
if (reply != NULL) {
if (reply == (void*)REDIS_REPLY_ERROR) {
fprintf(stderr,"Unexpected error reply, exiting...\n");
exit(1);
}
redisReply *r = (redisReply*)reply;
int is_err = (r->type == REDIS_REPLY_ERROR);
if (is_err && config.showerrors) {
/* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0;
time_t now = time(NULL);
if (lasterr_time != now) {
lasterr_time = now;
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
}
}
freeReplyObject(reply);
/* This is an OK for prefix commands such as auth and select.*/
if (c->prefix_pending > 0) {
c->prefix_pending--;
c->pending--;
/* Discard prefix commands on first response.*/
if (c->prefixlen > 0) {
size_t j;
sdsrange(c->obuf, c->prefixlen, -1);
/* We also need to fix the pointers to the strings
* we need to randomize. */
for (j = 0; j < c->randlen; j++)
c->randptr[j] -= c->prefixlen;
c->prefixlen = 0;
}
continue;
}
int requests_finished = 0;
atomicGetIncr(config.requests_finished, requests_finished, 1);
if (requests_finished < config.requests)
config.latency[requests_finished] = c->latency;
c->pending--;
if (c->pending == 0) {
resetClient(c);
break;
}
} else {
break;
}
}
}
}
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
client c = (client)privdata;
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
/* Initialize request when nothing was written. */
if (c->written == 0) {
/* Really initialize: randomize keys and set start time. */
if (config.randomkeys) randomizeClientKey(c);
atomicGet(config.slots_last_update, c->slots_last_update);
c->start = ustime();
c->latency = -1;
}
if (sdslen(c->obuf) > c->written) {
void *ptr = c->obuf+c->written;
ssize_t nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written);
if (nwritten == -1) {
if (errno != EPIPE)
fprintf(stderr, "Writing to socket: %s\n", strerror(errno));
freeClient(c);
return;
}
c->written += nwritten;
if (sdslen(c->obuf) == c->written) {
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
}
}
}
/* Create a benchmark client, configured to send the command passed as 'cmd' of
* 'len' bytes.
*
* The command is copied N times in the client output buffer (that is reused
* again and again to send the request to the server) accordingly to the configured
* pipeline size.
*
* Also an initial SELECT command is prepended in order to make sure the right
* database is selected, if needed. The initial SELECT will be discarded as soon
* as the first reply is received.
*
* To create a client from scratch, the 'from' pointer is set to NULL. If instead
* we want to create a client using another client as reference, the 'from' pointer
* points to the client to use as reference. In such a case the following
* information is take from the 'from' client:
*
* 1) The command line to use.
* 2) The offsets of the __rand_int__ elements inside the command line, used
* for arguments randomization.
*
* Even when cloning another client, prefix commands are applied if needed.*/
static client createClient(const char *cmd, size_t len, client from, int thread_id) {
int j;
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
client c = (client)zmalloc(sizeof(struct _client), MALLOC_LOCAL);
const char *ip = NULL;
int port = 0;
c->cluster_node = NULL;
if (config.hostsocket == NULL || is_cluster_client) {
if (!is_cluster_client) {
ip = config.hostip;
port = config.hostport;
} else {
int node_idx = 0;
if (config.max_threads < config.cluster_node_count)
node_idx = config.liveclients % config.cluster_node_count;
else
node_idx = thread_id % config.cluster_node_count;
clusterNode *node = config.cluster_nodes[node_idx];
assert(node != NULL);
ip = (const char *) node->ip;
port = node->port;
c->cluster_node = node;
}
c->context = redisConnectNonBlock(ip,port);
} else {
c->context = redisConnectUnixNonBlock(config.hostsocket);
}
if (c->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
if (config.hostsocket == NULL || is_cluster_client)
fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
else
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
}
c->thread_id = thread_id;
/* Suppress hiredis cleanup of unused buffers for max speed. */
c->context->reader->maxbuf = 0;
/* Build the request buffer:
* Queue N requests accordingly to the pipeline size, or simply clone
* the example client buffer. */
c->obuf = sdsempty();
/* Prefix the request buffer with AUTH and/or SELECT commands, if applicable.
* These commands are discarded after the first response, so if the client is
* reused the commands will not be used again. */
c->prefix_pending = 0;
if (config.auth) {
char *buf = NULL;
int len;
if (config.user == NULL)
len = redisFormatCommand(&buf, "AUTH %s", config.auth);
else
len = redisFormatCommand(&buf, "AUTH %s %s",
config.user, config.auth);
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
c->prefix_pending++;
}
if (config.enable_tracking) {
char *buf = NULL;
int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
c->prefix_pending++;
}
/* If a DB number different than zero is selected, prefix our request
* buffer with the SELECT command, that will be discarded the first
* time the replies are received, so if the client is reused the
* SELECT command will not be used again. */
if (config.dbnum != 0 && !is_cluster_client) {
c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
(int)sdslen(config.dbnumstr),config.dbnumstr);
c->prefix_pending++;
}
c->prefixlen = sdslen(c->obuf);
/* Append the request itself. */
if (from) {
c->obuf = sdscatlen(c->obuf,
from->obuf+from->prefixlen,
sdslen(from->obuf)-from->prefixlen);
} else {
for (j = 0; j < config.pipeline; j++)
c->obuf = sdscatlen(c->obuf,cmd,len);
}
c->written = 0;
c->pending = config.pipeline+c->prefix_pending;
c->randptr = NULL;
c->randlen = 0;
c->stagptr = NULL;
c->staglen = 0;
/* Find substrings in the output buffer that need to be randomized. */
if (config.randomkeys) {
if (from) {
c->randlen = from->randlen;
c->randfree = 0;
c->randptr = (char**)zmalloc(sizeof(char*)*c->randlen, MALLOC_LOCAL);
/* copy the offsets. */
for (j = 0; j < (int)c->randlen; j++) {
c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf);
/* Adjust for the different select prefix length. */
c->randptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;
c->randlen = 0;
c->randfree = RANDPTR_INITIAL_SIZE;
c->randptr = (char**)zmalloc(sizeof(char*)*c->randfree, MALLOC_LOCAL);
while ((p = strstr(p,"__rand_int__")) != NULL) {
if (c->randfree == 0) {
c->randptr = (char**)zrealloc(c->randptr,sizeof(char*)*c->randlen*2, MALLOC_LOCAL);
c->randfree += c->randlen;
}
c->randptr[c->randlen++] = p;
c->randfree--;
p += 12; /* 12 is strlen("__rand_int__). */
}
}
}
/* If cluster mode is enabled, set slot hashtags pointers. */
if (config.cluster_mode) {
if (from) {
c->staglen = from->staglen;
c->stagfree = 0;
c->stagptr = (char**)zmalloc(sizeof(char*)*c->staglen, MALLOC_LOCAL);
/* copy the offsets. */
for (j = 0; j < (int)c->staglen; j++) {
c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
/* Adjust for the different select prefix length. */
c->stagptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;
c->staglen = 0;
c->stagfree = RANDPTR_INITIAL_SIZE;
c->stagptr = (char**)zmalloc(sizeof(char*)*c->stagfree, MALLOC_LOCAL);
while ((p = strstr(p,"{tag}")) != NULL) {
if (c->stagfree == 0) {
c->stagptr = (char**)zrealloc(c->stagptr,
sizeof(char*) * c->staglen*2, MALLOC_LOCAL);
c->stagfree += c->staglen;
}
c->stagptr[c->staglen++] = p;
c->stagfree--;
p += 5; /* 5 is strlen("{tag}"). */
}
}
}
aeEventLoop *el = NULL;
if (thread_id < 0) el = config.el;
else {
benchmarkThread *thread = config.threads[thread_id];
el = thread->el;
}
if (config.idlemode == 0)
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
listAddNodeTail(config.clients,c);
atomicIncr(config.liveclients, 1);
atomicGet(config.slots_last_update, c->slots_last_update);
return c;
}
static void initBenchmarkThreads() {
int i;
if (config.threads) freeBenchmarkThreads();
config.threads = (benchmarkThread**)zmalloc(config.max_threads * sizeof(benchmarkThread*), MALLOC_LOCAL);
for (i = 0; i < config.max_threads; i++) {
benchmarkThread *thread = createBenchmarkThread(i);
config.threads[i] = thread;
}
}
/* Thread functions. */
static benchmarkThread *createBenchmarkThread(int index) {
benchmarkThread *thread = (benchmarkThread*)zmalloc(sizeof(*thread), MALLOC_LOCAL);
if (thread == NULL) return NULL;
thread->index = index;
thread->el = aeCreateEventLoop(1024*10);
return thread;
}
static void freeBenchmarkThread(benchmarkThread *thread) {
if (thread->el) aeDeleteEventLoop(thread->el);
zfree(thread);
}
static void freeBenchmarkThreads() {
int i = 0;
for (; i < config.max_threads; i++) {
benchmarkThread *thread = config.threads[i];
if (thread) freeBenchmarkThread(thread);
}
zfree(config.threads);
config.threads = NULL;
}
static void *execBenchmarkThread(void *ptr) {
benchmarkThread *thread = (benchmarkThread *) ptr;
aeMain(thread->el);
return NULL;
}
void initConfigDefaults() {
config.numclients = 50;
config.requests = 100000;
config.liveclients = 0;
config.el = aeCreateEventLoop(1024*10);
config.keepalive = 1;
config.datasize = 3;
config.pipeline = 1;
config.period_ms = 5000;
config.showerrors = 0;
config.randomkeys = 0;
config.randomkeys_keyspacelen = 0;
config.quiet = 0;
config.csv = 0;
config.loop = 0;
config.idlemode = 0;
config.latency = NULL;
config.clients = listCreate();
config.hostip = "127.0.0.1";
config.hostport = 6379;
config.hostsocket = NULL;
config.tests = NULL;
config.dbnum = 0;
config.auth = NULL;
config.precision = 1;
config.max_threads = MAX_THREADS;
config.threads = NULL;
config.cluster_mode = 0;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
config.redis_config = NULL;
config.is_fetching_slots = 0;
config.is_updating_slots = 0;
config.slots_last_update = 0;
config.enable_tracking = 0;
}
/* Returns number of consumed options. */
int parseOptions(int argc, const char **argv) {
int i;
int lastarg;
int exit_status = 1;
for (i = 1; i < argc; i++) {
lastarg = (i == (argc-1));
if (!strcmp(argv[i],"-c") || !strcmp(argv[i],"--clients")) {
if (lastarg) goto invalid;
config.numclients = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--time")) {
if (lastarg) goto invalid;
config.period_ms = atoi(argv[++i]);
if (config.period_ms <= 0) {
printf("Warning: Invalid value for thread time. Defaulting to 5000ms.\n");
config.period_ms = 5000;
}
} else if (!strcmp(argv[i],"-h") || !strcmp(argv[i],"--host")) {
if (lastarg) goto invalid;
config.hostip = strdup(argv[++i]);
} else if (!strcmp(argv[i],"-p") || !strcmp(argv[i],"--port")) {
if (lastarg) goto invalid;
config.hostport = atoi(argv[++i]);
} else if (!strcmp(argv[i],"-s")) {
if (lastarg) goto invalid;
config.hostsocket = strdup(argv[++i]);
} else if (!strcmp(argv[i],"--password") ) {
if (lastarg) goto invalid;
config.auth = strdup(argv[++i]);
} else if (!strcmp(argv[i],"--user")) {
if (lastarg) goto invalid;
config.user = argv[++i];
} else if (!strcmp(argv[i],"--dbnum")) {
if (lastarg) goto invalid;
config.dbnum = atoi(argv[++i]);
config.dbnumstr = sdsfromlonglong(config.dbnum);
} else if (!strcmp(argv[i],"-t") || !strcmp(argv[i],"--threads")) {
if (lastarg) goto invalid;
config.max_threads = atoi(argv[++i]);
if (config.max_threads > MAX_THREADS) {
printf("Warning: Too many threads, limiting threads to %d.\n", MAX_THREADS);
config.max_threads = MAX_THREADS;
} else if (config.max_threads <= 0) {
printf("Warning: Invalid value for max threads. Defaulting to %d.\n", MAX_THREADS);
config.max_threads = MAX_THREADS;
}
} else if (!strcmp(argv[i],"--help")) {
exit_status = 0;
goto usage;
} else {
/* Assume the user meant to provide an option when the arg starts
* with a dash. We're done otherwise and should use the remainder
* as the command and arguments for running the benchmark. */
if (argv[i][0] == '-') goto invalid;
return i;
}
}
return i;
invalid:
printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]);
usage:
printf(
"Usage: keydb-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests>] [-k <boolean>]\n\n"
" -h, --host <hostname> Server hostname (default 127.0.0.1)\n"
" -p, --port <port> Server port (default 6379)\n"
" -c <clients> Number of parallel connections (default 50)\n"
" -t, --threads <threads> Maximum number of threads to start before ending\n"
" --time <time> Time between spinning up new client threads, in milliseconds\n"
" --dbnum <db> Select the specified DB number (default 0)\n"
" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
" --password <password> Password for Redis Auth\n\n"
);
exit(exit_status);
}
int extractPropertyFromInfo(const char *info, const char *key, double &val) {
char *line = strstr((char*)info, key);
if (line == nullptr) return 1;
line += strlen(key) + 1; // Skip past key name and following colon
char *newline = strchr(line, '\n');
*newline = 0; // Terminate string after relevant line
val = strtod(line, nullptr);
return 0;
}
int extractPropertyFromInfo(const char *info, const char *key, unsigned int &val) {
char *line = strstr((char*)info, key);
if (line == nullptr) return 1;
line += strlen(key) + 1; // Skip past key name and following colon
char *newline = strchr(line, '\n');
*newline = 0; // Terminate string after relevant line
val = atoi(line);
return 0;
}
double getSelfCpuTime(struct rusage *self_ru) {
getrusage(RUSAGE_SELF, self_ru);
double user_time = self_ru->ru_utime.tv_sec + (self_ru->ru_utime.tv_usec / (double)1000000);
double system_time = self_ru->ru_stime.tv_sec + (self_ru->ru_stime.tv_usec / (double)1000000);
return user_time + system_time;
}
double getServerCpuTime(redisContext *ctx) {
redisReply *reply = (redisReply*)redisCommand(ctx, "INFO CPU");
if (reply->type != REDIS_REPLY_STRING) {
freeReplyObject(reply);
printf("Error executing INFO command. Exiting.\n");
return -1;
}
double used_cpu_user, used_cpu_sys;
if (extractPropertyFromInfo(reply->str, "used_cpu_user", used_cpu_user)) {
printf("Error reading user CPU usage from INFO command. Exiting.\n");
return -1;
}
if (extractPropertyFromInfo(reply->str, "used_cpu_sys", used_cpu_sys)) {
printf("Error reading system CPU usage from INFO command. Exiting.\n");
return -1;
}
freeReplyObject(reply);
return used_cpu_user + used_cpu_sys;
}
double getMean(std::deque<double> *q) {
double sum = 0;
for (long unsigned int i = 0; i < q->size(); i++) {
sum += (*q)[i];
}
return sum / q->size();
}
bool isAtFullLoad(double cpuPercent, unsigned int threads) {
return cpuPercent / threads >= 96;
}
int main(int argc, const char **argv) {
int i;
storage_init(NULL, 0);
srandom(time(NULL));
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
initConfigDefaults();
i = parseOptions(argc,argv);
argc -= i;
argv += i;
config.latency = (long long*)zmalloc(sizeof(long long)*config.requests, MALLOC_LOCAL);
if (config.max_threads > 0) {
int err = 0;
err |= pthread_mutex_init(&(config.requests_issued_mutex), NULL);
err |= pthread_mutex_init(&(config.requests_finished_mutex), NULL);
err |= pthread_mutex_init(&(config.liveclients_mutex), NULL);
err |= pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
err |= pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
err |= pthread_mutex_init(&(config.updating_slots_mutex), NULL);
err |= pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
if (err != 0)
{
perror("Failed to initialize mutex");
exit(EXIT_FAILURE);
}
}
const char *set_value = "abcdefghijklmnopqrstuvwxyz";
int self_threads = 0;
char command[63];
initBenchmarkThreads();
redisContext *ctx = getRedisContext(config.hostip, config.hostport, config.hostsocket);
double server_cpu_time, last_server_cpu_time = getServerCpuTime(ctx);
struct rusage self_ru;
double self_cpu_time, last_self_cpu_time = getSelfCpuTime(&self_ru);
double server_cpu_load, last_server_cpu_load = 0, self_cpu_load, server_cpu_gain;
std::deque<double> load_gain_history = {};
double current_gain_avg, peak_gain_avg = 0;
redisReply *reply = (redisReply*)redisCommand(ctx, "INFO CPU");
if (reply->type != REDIS_REPLY_STRING) {
freeReplyObject(reply);
printf("Error executing INFO command. Exiting.\r\n");
return 1;
}
unsigned int server_threads;
if (extractPropertyFromInfo(reply->str, "server_threads", server_threads)) {
printf("Error reading server threads from INFO command. Exiting.\r\n");
return 1;
}
freeReplyObject(reply);
printf("Server has %d threads.\nStarting...\n", server_threads);
fflush(stdout);
while (self_threads < config.max_threads) {
for (int i = 0; i < config.numclients; i++) {
sprintf(command, "SET %d %s\r\n", self_threads * config.numclients + i, set_value);
createClient(command, strlen(command), NULL,self_threads);
}
benchmarkThread *t = config.threads[self_threads];
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
fprintf(stderr, "FATAL: Failed to start thread %d. Exiting.\n", self_threads);
exit(1);
}
self_threads++;
usleep(config.period_ms * 1000);
server_cpu_time = getServerCpuTime(ctx);
self_cpu_time = getSelfCpuTime(&self_ru);
server_cpu_load = (server_cpu_time - last_server_cpu_time) * 100000 / config.period_ms;
self_cpu_load = (self_cpu_time - last_self_cpu_time) * 100000 / config.period_ms;
if (server_cpu_time < 0) {
break;
}
printf("%d threads, %d total clients. CPU Usage Self: %.1f%% (%.1f%% per thread), Server: %.1f%% (%.1f%% per thread)\r",
self_threads,
self_threads * config.numclients,
self_cpu_load,
self_cpu_load / self_threads,
server_cpu_load,
server_cpu_load / server_threads);
fflush(stdout);
server_cpu_gain = server_cpu_load - last_server_cpu_load;
load_gain_history.push_back(server_cpu_gain);
if (load_gain_history.size() > 5) {
load_gain_history.pop_front();
}
current_gain_avg = getMean(&load_gain_history);
if (current_gain_avg > peak_gain_avg) {
peak_gain_avg = current_gain_avg;
}
last_server_cpu_time = server_cpu_time;
last_self_cpu_time = self_cpu_time;
last_server_cpu_load = server_cpu_load;
if (isAtFullLoad(server_cpu_load, server_threads)) {
printf("\nServer is at full CPU load. If higher performance is expected, check server configuration.\n");
break;
}
if (current_gain_avg <= 0.05 * peak_gain_avg) {
printf("\nServer CPU load appears to have stagnated with increasing clients.\n"
"Server does not appear to be at full load. Check network for throughput.\n");
break;
}
if (self_threads * config.numclients > 2000) {
printf("\nClient limit of 2000 reached. Server is not at full load and appears to be increasing.\n"
"2000 clients should be more than enough to reach a bottleneck. Check all configuration.\n");
}
}
printf("Done.\n");
freeAllClients();
freeBenchmarkThreads();
return 0;
}

View File

@ -426,7 +426,7 @@ sds createLatencyReport(void) {
}
if (advise_slowlog_inspect) {
report = sdscat(report,"- Check your Slow Log to understand what are the commands you are running which are too slow to execute. Please check https://redis.io/commands/slowlog for more information.\n");
report = sdscat(report,"- Check your Slow Log to understand what are the commands you are running which are too slow to execute. Please check https://docs.keydb.dev/docs/commands#slowlog for more information.\n");
}
/* Intrinsic latency. */

View File

@ -39,12 +39,11 @@ void lazyfreeFreeSlotsMap(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
/* Release the rax mapping Redis Cluster keys to slots in the
* lazyfree thread. */
/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = (rax*)args[0];
size_t len = rt->numele;
raxFree(rt);
freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}

View File

@ -94,8 +94,8 @@ lwCanvas *lwCreateCanvas(int width, int height, int bgcolor) {
lwCanvas *canvas = zmalloc(sizeof(*canvas));
canvas->width = width;
canvas->height = height;
canvas->pixels = zmalloc(width*height);
memset(canvas->pixels,bgcolor,width*height);
canvas->pixels = zmalloc((size_t)width*height);
memset(canvas->pixels,bgcolor,(size_t)width*height);
return canvas;
}

View File

@ -71,7 +71,7 @@ void memtest_progress_start(char *title, int pass) {
printf("\x1b[H\x1b[2K"); /* Cursor home, clear current line. */
printf("%s [%d]\n", title, pass); /* Print title. */
progress_printed = 0;
progress_full = ws.ws_col*(ws.ws_row-3);
progress_full = (size_t)ws.ws_col*(ws.ws_row-3);
fflush(stdout);
}

View File

@ -6366,7 +6366,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
/* If the information is not available, the function will set the
* field to zero bytes, so that when the field can't be populated the
* function kinda remains predictable. */
if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof)
memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
else
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
@ -9425,6 +9425,7 @@ long moduleDefragGlobals(void) {
module->defrag_cb(&defrag_ctx);
defragged += defrag_ctx.defragged;
}
dictReleaseIterator(di);
return defragged;
}

View File

@ -902,7 +902,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
if (samples) asize += (double)elesize/samples*dictSize(d);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
intset *is = (intset*)ptrFromObj(o);
asize = sizeof(*o)+sizeof(*is)+is->encoding*is->length;
asize = sizeof(*o)+sizeof(*is)+(size_t)is->encoding*is->length;
} else {
serverPanic("Unknown set encoding");
}

View File

@ -103,7 +103,6 @@ static struct config {
int randomkeys_keyspacelen;
int keepalive;
int pipeline;
int showerrors;
long long start;
long long totlatency;
const char *title;
@ -313,7 +312,9 @@ static redisContext *getRedisContext(const char *ip, int port,
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
else
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
goto cleanup;
freeReplyObject(reply);
redisFree(ctx);
exit(1);
}
freeReplyObject(reply);
return ctx;
@ -370,9 +371,15 @@ fail:
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
else fprintf(stderr, "%s\n", hostsocket);
int abort_test = 0;
if (!strncmp(reply->str,"NOAUTH",5) ||
!strncmp(reply->str,"WRONGPASS",9) ||
!strncmp(reply->str,"NOPERM",5))
abort_test = 1;
freeReplyObject(reply);
redisFree(c);
freeRedisConfig(cfg);
if (abort_test) exit(1);
return NULL;
}
static void freeRedisConfig(redisConfig *cfg) {
@ -517,44 +524,39 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
exit(1);
}
redisReply *r = (redisReply*)reply;
int is_err = (r->type == REDIS_REPLY_ERROR);
if (is_err && config.showerrors) {
/* TODO: static lasterr_time not thread-safe */
static time_t lasterr_time = 0;
time_t now = time(NULL);
if (lasterr_time != now) {
lasterr_time = now;
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
if (r->type == REDIS_REPLY_ERROR) {
/* Try to update slots configuration if reply error is
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
* contain(s) the slot hash tag.
* If the error is not topology-update related then we
* immediately exit to avoid false results. */
if (c->cluster_node && c->staglen) {
int fetch_slots = 0, do_wait = 0;
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
fetch_slots = 1;
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
/* Usually the cluster is able to recover itself after
* a CLUSTERDOWN error, so try to sleep one second
* before requesting the new configuration. */
fetch_slots = 1;
do_wait = 1;
printf("Error from server %s:%d: %s.\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
}
if (do_wait) sleep(1);
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
exit(1);
} else {
if (c->cluster_node) {
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
} else printf("Error from server: %s\n", r->str);
}
}
/* Try to update slots configuration if reply error is
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
* contain(s) the slot hash tag. */
if (is_err && c->cluster_node && c->staglen) {
int fetch_slots = 0, do_wait = 0;
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
fetch_slots = 1;
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
/* Usually the cluster is able to recover itself after
* a CLUSTERDOWN error, so try to sleep one second
* before requesting the new configuration. */
fetch_slots = 1;
do_wait = 1;
printf("Error from server %s:%d: %s\n",
c->cluster_node->ip,
c->cluster_node->port,
r->str);
}
if (do_wait) sleep(1);
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
exit(1);
}
}
freeReplyObject(reply);
@ -1298,8 +1300,7 @@ static int fetchClusterSlotsConfiguration(client c) {
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
atomicSet(config.is_fetching_slots, 1);
if (config.showerrors)
printf("Cluster slots configuration changed, fetching new one...\n");
printf("WARNING: Cluster slots configuration changed, fetching new one...\n");
const char *errmsg = "Failed to update cluster slots configuration";
static dictType dtype = {
dictSdsHash, /* hash function */
@ -1475,7 +1476,8 @@ int parseOptions(int argc, const char **argv) {
} else if (!strcmp(argv[i],"-I")) {
config.idlemode = 1;
} else if (!strcmp(argv[i],"-e")) {
config.showerrors = 1;
printf("WARNING: -e option has been deprecated. "
"We now immediatly exit on error to avoid false results.\n");
} else if (!strcmp(argv[i],"-t")) {
if (lastarg) goto invalid;
/* We get the list of tests to run as a string in the form
@ -1578,8 +1580,6 @@ usage:
" is executed. Default tests use this to hit random keys in the\n"
" specified range.\n"
" -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
" -e If server replies with errors, show them on stdout.\n"
" (no more than 1 error per second is displayed)\n"
" -q Quiet. Just show query/sec values\n"
" --precision Number of decimal places to display in latency output (default 0)\n"
" --csv Output in CSV format\n"
@ -1707,7 +1707,6 @@ int main(int argc, const char **argv) {
config.keepalive = 1;
config.datasize = 3;
config.pipeline = 1;
config.showerrors = 0;
config.randomkeys = 0;
config.randomkeys_keyspacelen = 0;
config.quiet = 0;
@ -1790,8 +1789,9 @@ int main(int argc, const char **argv) {
} else {
config.redis_config =
getRedisConfig(config.hostip, config.hostport, config.hostsocket);
if (config.redis_config == NULL)
if (config.redis_config == NULL) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
}
}
if (config.num_threads > 0) {
pthread_mutex_init(&(config.liveclients_mutex), NULL);
@ -1954,8 +1954,8 @@ int main(int argc, const char **argv) {
}
if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag);
benchmark("LRANGE_500 (first 450 elements)",cmd,len);
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
benchmark("LRANGE_500 (first 500 elements)",cmd,len);
free(cmd);
}
@ -1983,6 +1983,7 @@ int main(int argc, const char **argv) {
} while(config.loop);
zfree(data);
zfree(data);
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
return 0;

View File

@ -39,12 +39,14 @@
static char error[1044];
static off_t epos;
static long long line = 1;
int consumeNewline(char *buf) {
if (strncmp(buf,"\r\n",2) != 0) {
ERROR("Expected \\r\\n, got: %02x%02x",buf[0],buf[1]);
return 0;
}
line += 1;
return 1;
}
@ -201,8 +203,8 @@ int redis_check_aof_main(int argc, char **argv) {
off_t pos = process(fp);
off_t diff = size-pos;
printf("AOF analyzed: size=%lld, ok_up_to=%lld, diff=%lld\n",
(long long) size, (long long) pos, (long long) diff);
printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n",
(long long) size, (long long) pos, line, (long long) diff);
if (diff > 0) {
if (fix) {
char buf[2];

View File

@ -250,7 +250,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
rdbstate.doing = RDB_CHECK_DOING_READ_LEN;
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
rdbCheckInfo("Selecting DB ID %d", dbid);
rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently

View File

@ -467,7 +467,7 @@ static void cliOutputHelp(int argc, char **argv) {
help = entry->org;
if (group == -1) {
/* Compare all arguments */
if (argc == entry->argc) {
if (argc <= entry->argc) {
for (j = 0; j < argc; j++) {
if (strcasecmp(argv[j],entry->argv[j]) != 0) break;
}
@ -648,7 +648,9 @@ static int cliConnect(int flags) {
cliRefreshPrompt();
}
if (config.hostsocket == NULL) {
/* Do not use hostsocket when we got redirected in cluster mode */
if (config.hostsocket == NULL ||
(config.cluster_mode && config.cluster_reissue_command)) {
context = redisConnect(config.hostip,config.hostport);
} else {
context = redisConnectUnix(config.hostsocket);
@ -4554,7 +4556,7 @@ static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) {
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
clusterManagerNode **nodeptr)
{
assert(array->nodes < (array->nodes + array->len));
assert(array->len > 0);
/* If the first node to be shifted is not NULL, decrement count. */
if (*array->nodes != NULL) array->count--;
/* Store the first node to be shifted into 'nodeptr'. */
@ -4567,7 +4569,7 @@ static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array,
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array,
clusterManagerNode *node)
{
assert(array->nodes < (array->nodes + array->len));
assert(array->len > 0);
assert(node != NULL);
assert(array->count < array->len);
array->nodes[array->count++] = node;
@ -5944,7 +5946,7 @@ void showLatencyDistSamples(struct distsamples *samples, long long tot) {
printf("\033[38;5;0m"); /* Set foreground color to black. */
for (j = 0; ; j++) {
int coloridx =
ceil((float) samples[j].count / tot * (spectrum_palette_size-1));
ceil((double) samples[j].count / tot * (spectrum_palette_size-1));
int color = spectrum_palette[coloridx];
printf("\033[48;5;%dm%c", (int)color, samples[j].character);
samples[j].count = 0;

View File

@ -31,6 +31,7 @@
#include "sha1.h"
#include "rand.h"
#include "cluster.h"
#include "monotonic.h"
extern "C" {
#include <lua.h>
@ -1437,7 +1438,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
/* This is the Lua script "count" hook that we use to detect scripts timeout. */
void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
long long elapsed = mstime() - g_pserver->lua_time_start;
long long elapsed = elapsedMs(g_pserver->lua_time_start);
UNUSED(ar);
UNUSED(lua);
@ -1588,7 +1589,8 @@ void evalGenericCommand(client *c, int evalsha) {
serverTL->in_eval = 1;
g_pserver->lua_caller = c;
g_pserver->lua_cur_script = funcname + 2;
g_pserver->lua_time_start = mstime();
g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
g_pserver->lua_kill = 0;
if (g_pserver->lua_time_limit > 0 && ldb.active == 0) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
@ -2747,7 +2749,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
/* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
mstime_t elapsed = mstime() - g_pserver->lua_time_start;
mstime_t elapsed = elapsedMs(g_pserver->lua_time_start);
mstime_t timelimit = g_pserver->lua_time_limit ?
g_pserver->lua_time_limit : 5000;
if (elapsed >= timelimit) {
@ -2777,6 +2779,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
lua_pushstring(lua, "timeout during Lua debugging with client closing connection");
lua_error(lua);
}
g_pserver->lua_time_start = mstime();
g_pserver->lua_time_start = getMonotonicUs();
g_pserver->lua_time_snapshot = mstime();
}
}

View File

@ -4131,16 +4131,16 @@ void sentinelSetCommand(client *c) {
int numargs = j-old_j+1;
switch(numargs) {
case 2:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",ptrFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",szFromObj(c->argv[old_j]),
szFromObj(c->argv[old_j+1]));
break;
case 3:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",ptrFromObj(c->argv[old_j]),
ptrFromObj(c->argv[old_j+1]),
ptrFromObj(c->argv[old_j+2]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",szFromObj(c->argv[old_j]),
szFromObj(c->argv[old_j+1]),
szFromObj(c->argv[old_j+2]));
break;
default:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",ptrFromObj(c->argv[old_j]));
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",szFromObj(c->argv[old_j]));
break;
}
}

View File

@ -57,7 +57,6 @@
#include <limits.h>
#include <float.h>
#include <math.h>
#include <sys/resource.h>
#include <sys/utsname.h>
#include <locale.h>
#include <sys/socket.h>
@ -67,7 +66,6 @@
#include "aelocker.h"
#include "motd.h"
#include "t_nhash.h"
#include <sys/resource.h>
#ifdef __linux__
#include <sys/prctl.h>
#include <sys/mman.h>
@ -4301,6 +4299,8 @@ int processCommand(client *c, int callFlags) {
return C_OK;
}
int is_read_command = (c->cmd->flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
@ -4515,7 +4515,7 @@ int processCommand(client *c, int callFlags) {
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
c->cmd->proc != resetCommand &&
c->cmd->proc != resetCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&
@ -4527,6 +4527,14 @@ int processCommand(client *c, int callFlags) {
return C_OK;
}
/* Prevent a replica from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) {
rejectCommandFormat(c, "Replica can't interract with the keyspace");
return C_OK;
}
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) &&

View File

@ -1934,7 +1934,8 @@ struct redisServer {
::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
mstime_t lua_time_limit; /* Script timeout in milliseconds */
mstime_t lua_time_start; /* Start time of script, milliseconds time */
monotime lua_time_start; /* monotonic timer to detect timed-out script */
mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */
int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */
int lua_random_dirty; /* True if a random command was called during the
@ -2296,6 +2297,7 @@ void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);

View File

@ -813,7 +813,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
deleted += deleted_from_lp;
/* Now we the entries/deleted counters. */
/* Now we update the entries/deleted counters. */
p = lpFirst(lp);
lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
p = lpNext(lp,p); /* Skip deleted field. */

View File

@ -810,7 +810,7 @@ void stralgoLCS(client *c) {
/* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
* we index it as LCS[j+(blen+1)*j] */
uint32_t *lcs = (uint32_t*)zmalloc((alen+1)*(blen+1)*sizeof(uint32_t));
uint32_t *lcs = (uint32_t*)zmalloc((size_t)(alen+1)*(blen+1)*sizeof(uint32_t));
#define LCS(A,B) lcs[(B)+((A)*(blen+1))]
/* Start building the LCS table. */

View File

@ -36,7 +36,7 @@ set ::run_matching {} ; # If non empty, only tests matching pattern are run.
if {[catch {cd tmp}]} {
puts "tmp directory not found."
puts "Please run this test from the Redis source root."
puts "Please run this test from the KeyDB source root."
exit 1
}
@ -303,7 +303,7 @@ proc pause_on_error {} {
set count 10
if {[lindex $argv 1] ne {}} {set count [lindex $argv 1]}
foreach_redis_id id {
puts "=== REDIS $id ===="
puts "=== KeyDB $id ===="
puts [exec tail -$count redis_$id/log.txt]
puts "---------------------\n"
}
@ -317,7 +317,7 @@ proc pause_on_error {} {
}
} elseif {$cmd eq {ls}} {
foreach_redis_id id {
puts -nonewline "Redis $id"
puts -nonewline "KeyDB $id"
set errcode [catch {
set str {}
append str "@[RI $id tcp_port]: "
@ -348,13 +348,13 @@ proc pause_on_error {} {
}
}
} elseif {$cmd eq {help}} {
puts "ls List Sentinel and Redis instances."
puts "ls List Sentinel and KeyDB instances."
puts "show-sentinel-logs \[N\] Show latest N lines of logs."
puts "show-keydb-logs \[N\] Show latest N lines of logs."
puts "S <id> cmd ... arg Call command in Sentinel <id>."
puts "R <id> cmd ... arg Call command in Redis <id>."
puts "R <id> cmd ... arg Call command in KeyDB <id>."
puts "SI <id> <field> Show Sentinel <id> INFO <field>."
puts "RI <id> <field> Show Redis <id> INFO <field>."
puts "RI <id> <field> Show KeyDB <id> INFO <field>."
puts "continue Resume test."
} else {
set errcode [catch {eval $line} retval]

View File

@ -158,6 +158,18 @@ tags {"aof"} {
assert_match "*not valid*" $result
}
test "Short read: Utility should show the abnormal line num in AOF" {
create_aof {
append_to_aof [formatCommand set foo hello]
append_to_aof "!!!"
}
catch {
exec src/keydb-check-aof $aof_path
} result
assert_match "*ok_up_to_line=8*" $result
}
test "Short read: Utility should be able to fix the AOF" {
set result [exec src/keydb-check-aof --fix $aof_path << "y\n"]
assert_match "*Successfully truncated AOF*" $result

View File

@ -39,6 +39,7 @@ proc show_cluster_status {} {
# all the lists are empty.
#
# regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate
catch {
while 1 {
# Find the log with smallest time.
set empty 0
@ -67,6 +68,7 @@ proc show_cluster_status {} {
puts "\[$best port $R_port($best)\] [lindex $log($best) 0]"
set log($best) [lrange $log($best) 1 end]
}
}
}
}

View File

@ -395,7 +395,7 @@ start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percent
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
# keept running and not move any allocation.
# this test is more consistent on a fresh server with no history
start_server {tags {"defrag"} overrides {save ""}} {
start_server {tags {"defrag"} overrides {save "" server-threads 1}} {
r flushdb
r config resetstat
r config set hz 100

View File

@ -25,7 +25,7 @@ test {CONFIG SET port number} {
test {CONFIG SET bind address} {
start_server {} {
# non-valid address
catch {r CONFIG SET bind "some.wrong.bind.address"} e
catch {r CONFIG SET bind "999.999.999.999"} e
assert_match {*Failed to bind to specified addresses*} $e
# make sure server still bound to the previous address
@ -33,4 +33,4 @@ test {CONFIG SET bind address} {
$rd PING
$rd close
}
}
}

View File

@ -62,55 +62,55 @@ start_server {overrides {save ""} tags {"other"}} {
} {*index is out of range*}
tags {consistency} {
if {$::accurate} {set numops 10000} else {set numops 1000}
test {Check consistency of different data types after a reload} {
r flushdb
createComplexDataset r $numops
set dump [csvdump r]
set sha1 [r debug digest]
r debug reload
set sha1_after [r debug digest]
if {$sha1 eq $sha1_after} {
set _ 1
} else {
set newdump [csvdump r]
puts "Consistency test failed!"
puts "You can inspect the two dumps in /tmp/repldump*.txt"
if {$::accurate} {set numops 10000} else {set numops 1000}
test {Check consistency of different data types after a reload} {
r flushdb
createComplexDataset r $numops
set dump [csvdump r]
set sha1 [r debug digest]
r debug reload
set sha1_after [r debug digest]
if {$sha1 eq $sha1_after} {
set _ 1
} else {
set newdump [csvdump r]
puts "Consistency test failed!"
puts "You can inspect the two dumps in /tmp/repldump*.txt"
set fd [open /tmp/repldump1.txt w]
puts $fd $dump
close $fd
set fd [open /tmp/repldump2.txt w]
puts $fd $newdump
close $fd
set fd [open /tmp/repldump1.txt w]
puts $fd $dump
close $fd
set fd [open /tmp/repldump2.txt w]
puts $fd $newdump
close $fd
set _ 0
}
} {1}
set _ 0
}
} {1}
test {Same dataset digest if saving/reloading as AOF?} {
r config set aof-use-rdb-preamble no
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
set sha1_after [r debug digest]
if {$sha1 eq $sha1_after} {
set _ 1
} else {
set newdump [csvdump r]
puts "Consistency test failed!"
puts "You can inspect the two dumps in /tmp/aofdump*.txt"
test {Same dataset digest if saving/reloading as AOF?} {
r config set aof-use-rdb-preamble no
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
set sha1_after [r debug digest]
if {$sha1 eq $sha1_after} {
set _ 1
} else {
set newdump [csvdump r]
puts "Consistency test failed!"
puts "You can inspect the two dumps in /tmp/aofdump*.txt"
set fd [open /tmp/aofdump1.txt w]
puts $fd $dump
close $fd
set fd [open /tmp/aofdump2.txt w]
puts $fd $newdump
close $fd
set fd [open /tmp/aofdump1.txt w]
puts $fd $dump
close $fd
set fd [open /tmp/aofdump2.txt w]
puts $fd $newdump
close $fd
set _ 0
}
} {1}
set _ 0
}
} {1}
}
test {EXPIRES after a reload (snapshot + append only file rewrite)} {

View File

@ -395,6 +395,17 @@ start_server {tags {"tracking network"}} {
assert {[lindex msg 2] eq {} }
}
test {Test ASYNC flushall} {
clean_all
r CLIENT TRACKING on REDIRECT $redir_id
r GET key1
r GET key2
assert_equal [s 0 tracking_total_keys] 2
$rd_sg FLUSHALL ASYNC
assert_equal [s 0 tracking_total_keys] 0
assert_equal [lindex [$rd_redirection read] 2] {}
}
# Keys are defined to be evicted 100 at a time by default.
# If after eviction the number of keys still surpasses the limit
# defined in tracking-table-max-keys, we increases eviction