Merge branch 'unstable' of https://github.com/antirez/redis into MergeRedis

Note: some tests failing

Former-commit-id: 86d7276f24f0cf1a0eceb6cd00a6a0ae2a0fa520
This commit is contained in:
John Sully 2019-05-11 02:20:34 -04:00
commit 397e85befb
27 changed files with 296 additions and 141 deletions

1
deps/hiredis/read.c vendored
View File

@ -31,6 +31,7 @@
#include "fmacros.h" #include "fmacros.h"
#include <string.h> #include <string.h>
#include <strings.h>
#include <stdlib.h> #include <stdlib.h>
#ifndef _MSC_VER #ifndef _MSC_VER
#include <unistd.h> #include <unistd.h>

View File

@ -942,13 +942,7 @@ aof-use-rdb-preamble yes
lua-time-limit 5000 lua-time-limit 5000
################################ REDIS CLUSTER ############################### ################################ REDIS CLUSTER ###############################
#
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# WARNING EXPERIMENTAL: Redis Cluster is considered to be stable code, however
# in order to mark it as "mature" we need to wait for a non trivial percentage
# of users to deploy it in production.
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# Normal Redis instances can't be part of a Redis Cluster; only nodes that are # Normal Redis instances can't be part of a Redis Cluster; only nodes that are
# started as cluster nodes can. In order to start a Redis instance as a # started as cluster nodes can. In order to start a Redis instance as a
# cluster node enable the cluster support uncommenting the following: # cluster node enable the cluster support uncommenting the following:

16
runtest-moduleapi Executable file
View File

@ -0,0 +1,16 @@
#!/bin/sh
TCL_VERSIONS="8.5 8.6"
TCLSH=""
for VERSION in $TCL_VERSIONS; do
TCL=`which tclsh$VERSION 2>/dev/null` && TCLSH=$TCL
done
if [ -z $TCLSH ]
then
echo "You need tcl 8.5 or newer in order to run the Redis test"
exit 1
fi
make -C tests/modules && \
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}"

View File

@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua
NODEPS:=clean distclean NODEPS:=clean distclean
# Default settings # Default settings
STD=-std=c99 -pedantic -DREDIS_STATIC='' STD=-std=c11 -pedantic -DREDIS_STATIC=''
CXX_STD=-std=c++14 -pedantic -fno-rtti -fno-exceptions -D__STDC_FORMAT_MACROS CXX_STD=-std=c++14 -pedantic -fno-rtti -fno-exceptions -D__STDC_FORMAT_MACROS
ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S))) ifneq (,$(findstring FreeBSD,$(uname_S)))

View File

@ -542,6 +542,8 @@ struct redisCommand *ACLLookupCommand(const char *name) {
* and command ID. */ * and command ID. */
void ACLResetSubcommandsForCommand(user *u, unsigned long id) { void ACLResetSubcommandsForCommand(user *u, unsigned long id) {
if (u->allowed_subcommands && u->allowed_subcommands[id]) { if (u->allowed_subcommands && u->allowed_subcommands[id]) {
for (int i = 0; u->allowed_subcommands[id][i]; i++)
sdsfree(u->allowed_subcommands[id][i]);
zfree(u->allowed_subcommands[id]); zfree(u->allowed_subcommands[id]);
u->allowed_subcommands[id] = NULL; u->allowed_subcommands[id] = NULL;
} }

View File

@ -199,6 +199,12 @@ ssize_t aofRewriteBufferWrite(int fd) {
* AOF file implementation * AOF file implementation
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
/* Return true if an AOf fsync is currently already in progress in a
* BIO thread. */
int aofFsyncInProgress(void) {
return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
/* Starts a background task that performs fsync() against the specified /* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */ * file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) { void aof_background_fsync(int fd) {
@ -337,10 +343,24 @@ void flushAppendOnlyFile(int force) {
int sync_in_progress = 0; int sync_in_progress = 0;
mstime_t latency; mstime_t latency;
if (sdslen(g_pserver->aof_buf) == 0) return; if (sdslen(g_pserver->aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC &&
g_pserver->aof_fsync_offset != g_pserver->aof_current_size &&
g_pserver->unixtime > g_pserver->aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC) if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; sync_in_progress = aofFsyncInProgress();
if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing. /* With this append fsync policy we do background fsyncing.
@ -472,6 +492,7 @@ void flushAppendOnlyFile(int force) {
g_pserver->aof_buf = sdsempty(); g_pserver->aof_buf = sdsempty();
} }
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */ * children doing I/O in the background. */
if (g_pserver->aof_no_fsync_on_rewrite && if (g_pserver->aof_no_fsync_on_rewrite &&
@ -486,10 +507,14 @@ void flushAppendOnlyFile(int force) {
redis_fsync(g_pserver->aof_fd); /* Let's try to get this data on the disk */ redis_fsync(g_pserver->aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency); latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency); latencyAddSampleIfNeeded("aof-fsync-always",latency);
g_pserver->aof_fsync_offset = g_pserver->aof_current_size;
g_pserver->aof_last_fsync = g_pserver->unixtime; g_pserver->aof_last_fsync = g_pserver->unixtime;
} else if ((g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && } else if ((g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC &&
g_pserver->unixtime > g_pserver->aof_last_fsync)) { g_pserver->unixtime > g_pserver->aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(g_pserver->aof_fd); if (!sync_in_progress) {
aof_background_fsync(g_pserver->aof_fd);
g_pserver->aof_fsync_offset = g_pserver->aof_current_size;
}
g_pserver->aof_last_fsync = g_pserver->unixtime; g_pserver->aof_last_fsync = g_pserver->unixtime;
} }
} }
@ -703,6 +728,7 @@ int loadAppendOnlyFile(char *filename) {
* operation is received. */ * operation is received. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
g_pserver->aof_current_size = 0; g_pserver->aof_current_size = 0;
g_pserver->aof_fsync_offset = g_pserver->aof_current_size;
fclose(fp); fclose(fp);
return C_ERR; return C_ERR;
} }
@ -842,6 +868,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
stopLoading(); stopLoading();
aofUpdateCurrentSize(); aofUpdateCurrentSize();
g_pserver->aof_rewrite_base_size = g_pserver->aof_current_size; g_pserver->aof_rewrite_base_size = g_pserver->aof_current_size;
g_pserver->aof_fsync_offset = g_pserver->aof_current_size;
return C_OK; return C_OK;
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */

View File

@ -995,12 +995,18 @@ void bitfieldCommand(client *c) {
/* Lookup for read is ok if key doesn't exit, but errors /* Lookup for read is ok if key doesn't exit, but errors
* if it's not a string. */ * if it's not a string. */
o = lookupKeyRead(c->db,c->argv[1]); o = lookupKeyRead(c->db,c->argv[1]);
if (o != nullptr && checkType(c,o,OBJ_STRING)) return; if (o != nullptr && checkType(c,o,OBJ_STRING)) {
zfree(ops);
return;
}
} else { } else {
/* Lookup by making room up to the farest bit reached by /* Lookup by making room up to the farest bit reached by
* this operation. */ * this operation. */
if ((o = lookupStringForBitCommand(c, if ((o = lookupStringForBitCommand(c,
highest_write_offset)) == nullptr) return; highest_write_offset)) == nullptr) {
zfree(ops);
return;
}
} }
addReplyArrayLen(c,numops); addReplyArrayLen(c,numops);

View File

@ -47,7 +47,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/ /* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref); void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
/* Defrag helper for generic allocations. /* Defrag helper for generic allocations.
* *
@ -353,7 +353,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
sdsele = (sds)ln->value; sdsele = (sds)ln->value;
if ((newsds = activeDefragSds(sdsele))) { if ((newsds = activeDefragSds(sdsele))) {
/* When defragging an sds value, we need to update the dict key */ /* When defragging an sds value, we need to update the dict key */
unsigned int hash = dictGetHash(d, sdsele); uint64_t hash = dictGetHash(d, sdsele);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged);
ln->value = newsds; ln->value = newsds;
defragged++; defragged++;
@ -390,7 +390,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
* moved. Return value is the the dictEntry if found, or NULL if not found. * moved. Return value is the the dictEntry if found, or NULL if not found.
* NOTE: this is very ugly code, but it let's us avoid the complication of * NOTE: this is very ugly code, but it let's us avoid the complication of
* doing a scan on another dict. */ * doing a scan on another dict. */
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) { dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) {
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
if (deref) { if (deref) {
dictEntry *de = *deref; dictEntry *de = *deref;

View File

@ -78,7 +78,7 @@ unsigned int getLRUClock(void) {
unsigned int LRU_CLOCK(void) { unsigned int LRU_CLOCK(void) {
unsigned int lruclock; unsigned int lruclock;
if (1000/g_pserver->hz <= LRU_CLOCK_RESOLUTION) { if (1000/g_pserver->hz <= LRU_CLOCK_RESOLUTION) {
atomicGet(g_pserver->lruclock,lruclock); lruclock = g_pserver->lruclock;
} else { } else {
lruclock = getLRUClock(); lruclock = getLRUClock();
} }

View File

@ -763,6 +763,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->usedby = listCreate(); module->usedby = listCreate();
module->usingMods = listCreate(); module->usingMods = listCreate();
module->filters = listCreate(); module->filters = listCreate();
module->in_call = 0;
ctx->module = module; ctx->module = module;
} }
@ -3786,14 +3787,7 @@ void moduleHandleBlockedClients(int iel) {
* replies to send to the client in a thread safe context. * replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and * We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */ * free the temporary client we just used for the replies. */
if (c) { if (c) AddReplyFromClient(c, bc->reply_client);
if (bc->reply_client->bufpos)
addReplyProto(c,bc->reply_client->buf,
bc->reply_client->bufpos);
if (listLength(bc->reply_client->reply))
listJoin(c->reply,bc->reply_client->reply);
c->reply_bytes += bc->reply_client->reply_bytes;
}
freeClient(bc->reply_client); freeClient(bc->reply_client);
if (c != NULL) { if (c != NULL) {
@ -3917,7 +3911,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
* in order to keep things like the currently selected database and similar * in order to keep things like the currently selected database and similar
* things. */ * things. */
ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN); ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN);
if (bc) selectDb(ctx->client,bc->dbid); if (bc) {
selectDb(ctx->client,bc->dbid);
ctx->client->id = bc->client->id;
}
return ctx; return ctx;
} }

View File

@ -13,7 +13,7 @@ endif
.SUFFIXES: .c .so .xo .o .SUFFIXES: .c .so .xo .o
all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so
.c.xo: .c.xo:
$(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
@ -47,11 +47,6 @@ hellodict.xo: ../redismodule.h
hellodict.so: hellodict.xo hellodict.so: hellodict.xo
hellofilter.xo: ../redismodule.h
hellofilter.so: hellofilter.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
testmodule.xo: ../redismodule.h testmodule.xo: ../redismodule.h
testmodule.so: testmodule.xo testmodule.so: testmodule.xo

View File

@ -30,6 +30,7 @@
#include "server.h" #include "server.h"
#include "atomicvar.h" #include "atomicvar.h"
#include <sys/socket.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <math.h> #include <math.h>
#include <ctype.h> #include <ctype.h>
@ -167,7 +168,7 @@ client *createClient(int fd, int iel) {
selectDb(c,0); selectDb(c,0);
uint64_t client_id; uint64_t client_id;
atomicGetIncr(g_pserver->next_client_id,client_id,1); client_id = g_pserver->next_client_id.fetch_add(1);
c->iel = iel; c->iel = iel;
fastlock_init(&c->lock); fastlock_init(&c->lock);
c->id = client_id; c->id = client_id;
@ -1011,6 +1012,19 @@ void addReplySubcommandSyntaxError(client *c) {
sdsfree(cmd); sdsfree(cmd);
} }
/* Append 'src' client output buffers into 'dst' client output buffers.
* This function clears the output buffers of 'src' */
void AddReplyFromClient(client *dst, client *src) {
if (prepareClientToWrite(dst, false) != C_OK)
return;
addReplyProto(dst,src->buf, src->bufpos);
if (listLength(src->reply))
listJoin(dst->reply,src->reply);
dst->reply_bytes += src->reply_bytes;
src->reply_bytes = 0;
src->bufpos = 0;
}
/* Copy 'src' client output buffers into 'dst' client output buffers. /* Copy 'src' client output buffers into 'dst' client output buffers.
* The function takes care of freeing the old output buffers of the * The function takes care of freeing the old output buffers of the
* destination client. */ * destination client. */
@ -1379,6 +1393,11 @@ void freeClient(client *c) {
* a context where calling freeClient() is not possible, because the client * a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */ * should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) { void freeClientAsync(client *c) {
/* We need to handle concurrent access to the server.clients_to_close list
* only in the freeClientAsync() function, since it's the only function that
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
AeLocker lock; AeLocker lock;
lock.arm(nullptr); lock.arm(nullptr);
@ -1414,7 +1433,12 @@ client *lookupClientByID(uint64_t id) {
} }
/* Write data in output buffers to client. Return C_OK if the client /* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed. */ * is still valid after the call, C_ERR if it was freed because of some
* error.
*
* This function is called by threads, but always with handler_installed
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(int fd, client *c, int handler_installed) { int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0; ssize_t nwritten = 0, totwritten = 0;
clientReplyBlock *o; clientReplyBlock *o;
@ -1480,7 +1504,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
!(c->flags & CLIENT_SLAVE)) break; !(c->flags & CLIENT_SLAVE)) break;
} }
__atomic_fetch_add(&g_pserver->stat_net_output_bytes, totwritten, __ATOMIC_RELAXED); g_pserver->stat_net_output_bytes += totwritten;
if (nwritten == -1) { if (nwritten == -1) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
nwritten = 0; nwritten = 0;
@ -1951,13 +1975,48 @@ int processMultibulkBuffer(client *c) {
return C_ERR; return C_ERR;
} }
/* This function calls processCommand(), but also performs a few sub tasks
* that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
* 2. In the case of master clients, the replication offset is updated.
* 3. The client is reset unless there are reasons to avoid doing it.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
int processCommandAndResetClient(client *c, int flags) {
int deadclient = 0;
serverTL->current_client = c;
if (processCommand(c, flags) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c);
}
}
if (serverTL->current_client == NULL) deadclient = 1;
serverTL->current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
/* This function is called every time, in the client structure 'c', there is /* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket * more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be * or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */ * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c, int callFlags) { void processInputBuffer(client *c, int callFlags) {
AssertCorrectThread(c); AssertCorrectThread(c);
bool fFreed = false;
/* Keep processing while there is something in the input buffer */ /* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) { while(c->qb_pos < sdslen(c->querybuf)) {
@ -2003,48 +2062,38 @@ void processInputBuffer(client *c, int callFlags) {
} else { } else {
AeLocker locker; AeLocker locker;
locker.arm(c); locker.arm(c);
serverTL->current_client = c;
/* Only reset the client when the command was executed. */ /* We are finally ready to execute the command. */
if (processCommand(c, callFlags) == C_OK) { if (processCommandAndResetClient(c, callFlags) == C_ERR) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* If the client is no longer valid, we avoid exiting this
/* Update the applied replication offset of our master. */ * loop and trimming the client buffer later. So we return
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; * ASAP in that case. */
} return;
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
} }
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
if (serverTL->current_client == NULL) {
fFreed = true;
break;
}
serverTL->current_client = NULL;
} }
} }
/* Trim to pos */ /* Trim to pos */
if (!fFreed && c->qb_pos) { if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1); sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0; c->qb_pos = 0;
} }
} }
/* This is a wrapper for processInputBuffer that also cares about handling /* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-slaves, in case the client 'c' * the replication forwarding to the sub-replicas, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the * is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */ * raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) { void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) { if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c, CMD_CALL_FULL); processInputBuffer(c, CMD_CALL_FULL);
} else { } else {
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
size_t prev_offset = c->reploff; size_t prev_offset = c->reploff;
processInputBuffer(c, CMD_CALL_FULL); processInputBuffer(c, CMD_CALL_FULL);
size_t applied = c->reploff - prev_offset; size_t applied = c->reploff - prev_offset;
@ -2104,16 +2153,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} else { } else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
lock.unlock(); freeClientAsync(c);
aelock.arm(nullptr);
freeClient(c);
return; return;
} }
} else if (nread == 0) { } else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection"); serverLog(LL_VERBOSE, "Client closed connection");
lock.unlock(); freeClientAsync(c);
aelock.arm(nullptr);
freeClient(c);
return; return;
} else if (c->flags & CLIENT_MASTER) { } else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer /* Append the query buffer to the pending (not applied) buffer
@ -2133,10 +2178,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
bytes = sdscatrepr(bytes,c->querybuf,64); bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci); sdsfree(ci);
sdsfree(bytes); sdsfree(bytes);
lock.unlock(); freeClientAsync(c);
aelock.arm(nullptr);
freeClient(c);
return; return;
} }
@ -2900,3 +2943,4 @@ int processEventsWhileBlocked(int iel) {
aeAcquireLock(); aeAcquireLock();
return count; return count;
} }

View File

@ -258,6 +258,19 @@ static redisConfig *getRedisConfig(const char *ip, int port,
else fprintf(stderr,"%s: %s\n",hostsocket,err); else fprintf(stderr,"%s: %s\n",hostsocket,err);
goto fail; goto fail;
} }
if(config.auth){
void *authReply = NULL;
redisAppendCommand(c, "AUTH %s", config.auth);
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
if (reply) freeReplyObject(reply);
reply = ((redisReply *) authReply);
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "ERROR: %s\n", reply->str);
goto fail;
}
}
redisAppendCommand(c, "CONFIG GET %s", "save"); redisAppendCommand(c, "CONFIG GET %s", "save");
redisAppendCommand(c, "CONFIG GET %s", "appendonly"); redisAppendCommand(c, "CONFIG GET %s", "appendonly");
@ -1196,7 +1209,7 @@ static int fetchClusterSlotsConfiguration(client c) {
assert(reply->type == REDIS_REPLY_ARRAY); assert(reply->type == REDIS_REPLY_ARRAY);
for (i = 0; i < reply->elements; i++) { for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i]; redisReply *r = reply->element[i];
assert(r->type = REDIS_REPLY_ARRAY); assert(r->type == REDIS_REPLY_ARRAY);
assert(r->elements >= 3); assert(r->elements >= 3);
int from, to, slot; int from, to, slot;
from = r->element[0]->integer; from = r->element[0]->integer;
@ -1298,7 +1311,7 @@ int parseOptions(int argc, const char **argv) {
if (*p < '0' || *p > '9') goto invalid; if (*p < '0' || *p > '9') goto invalid;
} }
config.randomkeys = 1; config.randomkeys = 1;
config.randomkeys_keyspacelen = atoi(argv[++i]); config.randomkeys_keyspacelen = atoi(next);
if (config.randomkeys_keyspacelen < 0) if (config.randomkeys_keyspacelen < 0)
config.randomkeys_keyspacelen = 0; config.randomkeys_keyspacelen = 0;
} else if (!strcmp(argv[i],"-q")) { } else if (!strcmp(argv[i],"-q")) {

View File

@ -37,7 +37,7 @@
snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \ snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \
} }
static char error[1024]; static char error[1044];
static off_t epos; static off_t epos;
int consumeNewline(char *buf) { int consumeNewline(char *buf) {

View File

@ -1745,16 +1745,17 @@ void databasesCron(void) {
* every object access, and accuracy is not needed. To access a global var is * every object access, and accuracy is not needed. To access a global var is
* a lot faster than calling time(NULL) */ * a lot faster than calling time(NULL) */
void updateCachedTime(void) { void updateCachedTime(void) {
time_t unixtime = time(NULL); g_pserver->unixtime = time(NULL);
atomicSet(g_pserver->unixtime,unixtime);
g_pserver->mstime = mstime(); g_pserver->mstime = mstime();
/* To get information about daylight saving time, we need to call localtime_r /* To get information about daylight saving time, we need to call
* and cache the result. However calling localtime_r in this context is safe * localtime_r and cache the result. However calling localtime_r in this
* since we will never fork() while here, in the main thread. The logging * context is safe since we will never fork() while here, in the main
* function will call a thread safe version of localtime that has no locks. */ * thread. The logging function will call a thread safe version of
* localtime that has no locks. */
struct tm tm; struct tm tm;
localtime_r(&g_pserver->unixtime,&tm); time_t ut = g_pserver->unixtime;
localtime_r(&ut,&tm);
g_pserver->daylight_active = tm.tm_isdst; g_pserver->daylight_active = tm.tm_isdst;
} }
@ -1826,8 +1827,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* *
* Note that you can change the resolution altering the * Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */ * LRU_CLOCK_RESOLUTION define. */
unsigned long lruclock = getLRUClock(); g_pserver->lruclock = getLRUClock();
atomicSet(g_pserver->lruclock,lruclock);
/* Record the max memory used since the server was started. */ /* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > g_pserver->stat_peak_memory) if (zmalloc_used_memory() > g_pserver->stat_peak_memory)
@ -2119,6 +2119,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
aeAcquireLock(); aeAcquireLock();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN);
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
* time. */ * time. */
@ -2143,6 +2146,9 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(iel); handleClientsWithPendingWrites(iel);
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(iel);
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
* time. */ * time. */
@ -2287,10 +2293,6 @@ void initMasterInfo(redisMaster *master)
void initServerConfig(void) { void initServerConfig(void) {
int j; int j;
serverAssert(pthread_mutex_init(&g_pserver->next_client_id_mutex,NULL) == 0);
serverAssert(pthread_mutex_init(&g_pserver->lruclock_mutex,NULL) == 0);
serverAssert(pthread_mutex_init(&g_pserver->unixtime_mutex,NULL) == 0);
updateCachedTime(); updateCachedTime();
getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE);
g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0';
@ -2405,8 +2407,7 @@ void initServerConfig(void) {
g_pserver->lua_time_limit = LUA_SCRIPT_TIME_LIMIT; g_pserver->lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
g_pserver->fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; g_pserver->fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA;
unsigned int lruclock = getLRUClock(); g_pserver->lruclock = getLRUClock();
atomicSet(g_pserver->lruclock,lruclock);
resetServerSaveParams(); resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@ -3986,8 +3987,7 @@ sds genRedisInfoString(const char *section) {
call_uname = 0; call_uname = 0;
} }
unsigned int lruclock; unsigned int lruclock = g_pserver->lruclock.load();
atomicGet(g_pserver->lruclock,lruclock);
info = sdscatprintf(info, info = sdscatprintf(info,
"# Server\r\n" "# Server\r\n"
"redis_version:%s\r\n" "redis_version:%s\r\n"
@ -4299,8 +4299,8 @@ sds genRedisInfoString(const char *section) {
g_pserver->stat_numconnections, g_pserver->stat_numconnections,
g_pserver->stat_numcommands, g_pserver->stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND), getInstantaneousMetric(STATS_METRIC_COMMAND),
g_pserver->stat_net_input_bytes, g_pserver->stat_net_input_bytes.load(),
g_pserver->stat_net_output_bytes, g_pserver->stat_net_output_bytes.load(),
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
g_pserver->stat_rejected_conn, g_pserver->stat_rejected_conn,
@ -4955,8 +4955,6 @@ int main(int argc, char **argv) {
return sha1Test(argc, argv); return sha1Test(argc, argv);
} else if (!strcasecmp(argv[2], "util")) { } else if (!strcasecmp(argv[2], "util")) {
return utilTest(argc, argv); return utilTest(argc, argv);
} else if (!strcasecmp(argv[2], "sds")) {
return sdsTest(argc, argv);
} else if (!strcasecmp(argv[2], "endianconv")) { } else if (!strcasecmp(argv[2], "endianconv")) {
return endianconvTest(argc, argv); return endianconvTest(argc, argv);
} else if (!strcasecmp(argv[2], "crc64")) { } else if (!strcasecmp(argv[2], "crc64")) {

View File

@ -49,6 +49,7 @@
#include <pthread.h> #include <pthread.h>
#include <syslog.h> #include <syslog.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <atomic>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#include <lua.h> #include <lua.h>
@ -150,6 +151,8 @@ public:
#define CONFIG_DEFAULT_TCP_BACKLOG 511 /* TCP listen backlog. */ #define CONFIG_DEFAULT_TCP_BACKLOG 511 /* TCP listen backlog. */
#define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* Default client timeout: infinite */ #define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* Default client timeout: infinite */
#define CONFIG_DEFAULT_DBNUM 16 #define CONFIG_DEFAULT_DBNUM 16
#define CONFIG_DEFAULT_IO_THREADS_NUM 1 /* Single threaded by default */
#define CONFIG_DEFAULT_IO_THREADS_DO_READS 0 /* Read + parse from threads? */
#define CONFIG_MAX_LINE 1024 #define CONFIG_MAX_LINE 1024
#define CRON_DBS_PER_CALL 16 #define CRON_DBS_PER_CALL 16
#define NET_MAX_WRITES_PER_EVENT (1024*64) #define NET_MAX_WRITES_PER_EVENT (1024*64)
@ -1236,7 +1239,7 @@ struct redisServer {
struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS]; struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS];
unsigned int lruclock; /* Clock for LRU eviction */ std::atomic<unsigned int> lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */ int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */ int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@ -1268,7 +1271,7 @@ struct redisServer {
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */
uint64_t next_client_id; /* Next client unique ID. Incremental. */ std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */ int protected_mode; /* Don't accept external connections. */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */ int loading; /* We are loading data from disk if true */
@ -1305,8 +1308,8 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
long long stat_net_input_bytes; /* Bytes read from network. */ std::atomic<long long> stat_net_input_bytes; /* Bytes read from network. */
long long stat_net_output_bytes; /* Bytes written to network. */ std::atomic<long long> stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
/* The following two are used to track instantaneous metrics, like /* The following two are used to track instantaneous metrics, like
@ -1317,7 +1320,6 @@ struct redisServer {
long long samples[STATS_METRIC_SAMPLES]; long long samples[STATS_METRIC_SAMPLES];
int idx; int idx;
} inst_metric[STATS_METRIC_COUNT]; } inst_metric[STATS_METRIC_COUNT];
/* AOF persistence */ /* AOF persistence */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */ int aof_fsync; /* Kind of fsync() policy */
@ -1327,6 +1329,7 @@ struct redisServer {
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */ off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */ pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
@ -1464,10 +1467,10 @@ struct redisServer {
int list_max_ziplist_size; int list_max_ziplist_size;
int list_compress_depth; int list_compress_depth;
/* time cache */ /* time cache */
time_t unixtime; /* Unix time sampled every cron cycle. */ std::atomic<time_t> unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */ time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */ int daylight_active; /* Currently in daylight saving time. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ long long mstime; /* 'unixtime' with milliseconds resolution. */
/* Pubsub */ /* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */ dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */ list *pubsub_patterns; /* A list of pubsub_patterns */
@ -1526,12 +1529,6 @@ struct redisServer {
int bug_report_start; /* True if bug report header was already logged. */ int bug_report_start; /* True if bug report header was already logged. */
int watchdog_period; /* Software watchdog period in ms. 0 = off */ int watchdog_period; /* Software watchdog period in ms. 0 = off */
/* Mutexes used to protect atomic variables when atomic builtins are
* not available. */
pthread_mutex_t lruclock_mutex;
pthread_mutex_t next_client_id_mutex;
pthread_mutex_t unixtime_mutex;
int fActiveReplica; /* Can this replica also be a master? */ int fActiveReplica; /* Can this replica also be a master? */
struct fastlock flock; struct fastlock flock;
@ -1540,6 +1537,9 @@ struct redisServer {
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Lower 20 bits: a counter incrementing for each command executed in the same millisecond
// Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition
uint64_t mvcc_tstamp; uint64_t mvcc_tstamp;
/* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */
}; };
typedef struct pubsubPattern { typedef struct pubsubPattern {
@ -1709,6 +1709,7 @@ void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len); void addReplyProto(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj_roptr obj); void addReplyBulk(client *c, robj_roptr obj);
void AddReplyFromClient(client *c, client *src);
void addReplyBulkCString(client *c, const char *s); void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll); void addReplyBulkLongLong(client *c, long long ll);
@ -1764,6 +1765,7 @@ int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c); void linkClient(client *c);
void protectClient(client *c); void protectClient(client *c);
void unprotectClient(client *c); void unprotectClient(client *c);
void initThreadedIO(void);
// Special Thread-safe addReply() commands for posting messages to clients from a different thread // Special Thread-safe addReply() commands for posting messages to clients from a different thread
void addReplyAsync(client *c, robj_roptr obj); void addReplyAsync(client *c, robj_roptr obj);

View File

@ -617,7 +617,7 @@ void rpoplpushCommand(client *c) {
* the AOF and replication channel. * the AOF and replication channel.
* *
* The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
* 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
* we can propagate the command properly. * we can propagate the command properly.
* *
* The function returns C_OK if we are able to serve the client, otherwise * The function returns C_OK if we are able to serve the client, otherwise

View File

@ -492,14 +492,14 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
streamEncodeID(si->start_key,start); streamEncodeID(si->start_key,start);
} else { } else {
si->start_key[0] = 0; si->start_key[0] = 0;
si->start_key[0] = 0; si->start_key[1] = 0;
} }
if (end) { if (end) {
streamEncodeID(si->end_key,end); streamEncodeID(si->end_key,end);
} else { } else {
si->end_key[0] = UINT64_MAX; si->end_key[0] = UINT64_MAX;
si->end_key[0] = UINT64_MAX; si->end_key[1] = UINT64_MAX;
} }
/* Seek the correct node in the radix tree. */ /* Seek the correct node in the radix tree. */

View File

@ -576,7 +576,7 @@ void zipEntry(unsigned char *p, zlentry *e) {
/* Create a new empty ziplist. */ /* Create a new empty ziplist. */
unsigned char *ziplistNew(void) { unsigned char *ziplistNew(void) {
unsigned int bytes = ZIPLIST_HEADER_SIZE+1; unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes, MALLOC_SHARED); unsigned char *zl = zmalloc(bytes, MALLOC_SHARED);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);

View File

@ -166,12 +166,15 @@ start_server {} {
# Pick a random slave # Pick a random slave
set slave_id [expr {($master_id+1)%5}] set slave_id [expr {($master_id+1)%5}]
set sync_count [status $R($master_id) sync_full] set sync_count [status $R($master_id) sync_full]
set sync_partial [status $R($master_id) sync_partial_ok]
catch { catch {
$R($slave_id) config rewrite $R($slave_id) config rewrite
$R($slave_id) debug restart $R($slave_id) debug restart
} }
# note: just waiting for connected_slaves==4 has a race condition since
# we might do the check before the master realized that the slave disconnected
wait_for_condition 50 2000 { wait_for_condition 50 2000 {
[status $R($master_id) connected_slaves] == 4 [status $R($master_id) sync_partial_ok] == $sync_partial + 1
} else { } else {
fail "Replica not reconnecting" fail "Replica not reconnecting"
} }

View File

@ -79,6 +79,32 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
stop_bg_complex_data $load_handle0 stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1 stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2 stop_bg_complex_data $load_handle2
# Wait for the slave to reach the "online"
# state from the POV of the master.
set retry 5000
while {$retry} {
set info [$master info]
if {[string match {*slave0:*state=online*} $info]} {
break
} else {
incr retry -1
after 100
}
}
if {$retry == 0} {
error "assertion:Slave not correctly synchronized"
}
# Wait that slave acknowledge it is online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues. (-LOADING error)
wait_for_condition 5000 100 {
[lindex [$slave role] 3] eq {connected}
} else {
fail "Slave still not connected after some time"
}
set retry 10 set retry 10
while {$retry && ([$master debug digest] ne [$slave debug digest])}\ while {$retry && ([$master debug digest] ne [$slave debug digest])}\
{ {

24
tests/modules/Makefile Normal file
View File

@ -0,0 +1,24 @@
# find the OS
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
# Compile flags for linux / osx
ifeq ($(uname_S),Linux)
SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -shared
else
SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
.SUFFIXES: .c .so .xo .o
all: commandfilter.so
.c.xo:
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
commandfilter.xo: ../../src/redismodule.h
commandfilter.so: commandfilter.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc

View File

@ -1,18 +1,18 @@
#define REDISMODULE_EXPERIMENTAL_API #define REDISMODULE_EXPERIMENTAL_API
#include "../redismodule.h" #include "redismodule.h"
#include <string.h> #include <string.h>
static RedisModuleString *log_key_name; static RedisModuleString *log_key_name;
static const char log_command_name[] = "hellofilter.log"; static const char log_command_name[] = "commandfilter.log";
static const char ping_command_name[] = "hellofilter.ping"; static const char ping_command_name[] = "commandfilter.ping";
static const char unregister_command_name[] = "hellofilter.unregister"; static const char unregister_command_name[] = "commandfilter.unregister";
static int in_log_command = 0; static int in_log_command = 0;
static RedisModuleCommandFilter *filter = NULL; static RedisModuleCommandFilter *filter = NULL;
int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int CommandFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{ {
(void) argc; (void) argc;
(void) argv; (void) argv;
@ -23,7 +23,7 @@ int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
return REDISMODULE_OK; return REDISMODULE_OK;
} }
int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int CommandFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{ {
(void) argc; (void) argc;
(void) argv; (void) argv;
@ -39,7 +39,7 @@ int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
return REDISMODULE_OK; return REDISMODULE_OK;
} }
int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) int CommandFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{ {
RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); RedisModuleString *s = RedisModule_CreateString(ctx, "", 0);
@ -74,9 +74,9 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
return REDISMODULE_OK; return REDISMODULE_OK;
} }
void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) void CommandFilter_CommandFilter(RedisModuleCommandFilterCtx *filter)
{ {
if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ if (in_log_command) return; /* don't process our own RM_Call() from CommandFilter_LogCommand() */
/* Fun manipulations: /* Fun manipulations:
* - Remove @delme * - Remove @delme
@ -117,7 +117,7 @@ void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter)
} }
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) if (RedisModule_Init(ctx,"commandfilter",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR; == REDISMODULE_ERR) return REDISMODULE_ERR;
if (argc != 2) { if (argc != 2) {
@ -130,18 +130,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModule_StringToLongLong(argv[1], &noself); RedisModule_StringToLongLong(argv[1], &noself);
if (RedisModule_CreateCommand(ctx,log_command_name, if (RedisModule_CreateCommand(ctx,log_command_name,
HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) CommandFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,ping_command_name, if (RedisModule_CreateCommand(ctx,ping_command_name,
HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) CommandFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,unregister_command_name, if (RedisModule_CreateCommand(ctx,unregister_command_name,
HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) CommandFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;
if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, if ((filter = RedisModule_RegisterCommandFilter(ctx, CommandFilter_CommandFilter,
noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) noself ? REDISMODULE_CMDFILTER_NOSELF : 0))
== NULL) return REDISMODULE_ERR; == NULL) return REDISMODULE_ERR;

View File

@ -63,7 +63,6 @@ set ::all_tests {
unit/lazyfree unit/lazyfree
unit/wait unit/wait
unit/pendingquerybuf unit/pendingquerybuf
modules/commandfilter
} }
# Index to the next test to run in the ::all_tests list. # Index to the next test to run in the ::all_tests list.
set ::next_test 0 set ::next_test 0
@ -504,7 +503,7 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--only}} { } elseif {$opt eq {--only}} {
lappend ::only_tests $arg lappend ::only_tests $arg
incr j incr j
} elseif {$opt eq {--skiptill}} { } elseif {$opt eq {--skip-till}} {
set ::skip_till $arg set ::skip_till $arg
incr j incr j
} elseif {$opt eq {--list-tests}} { } elseif {$opt eq {--list-tests}} {

View File

@ -108,4 +108,11 @@ start_server {tags {"acl"}} {
assert_match {*+debug|segfault*} $cmdstr assert_match {*+debug|segfault*} $cmdstr
assert_match {*+acl*} $cmdstr assert_match {*+acl*} $cmdstr
} }
test {ACL #5998 regression: memory leaks adding / removing subcommands} {
r AUTH default ""
r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c
r ACL setuser newuser -debug
# The test framework will detect a leak if any.
}
} }

View File

@ -161,7 +161,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
} }
# make sure master doesn't disconnect slave because of timeout # make sure master doesn't disconnect slave because of timeout
$master config set repl-timeout 300 ;# 5 minutes $master config set repl-timeout 1200 ;# 20 minutes (for valgrind and slow machines)
$master config set maxmemory-policy allkeys-random $master config set maxmemory-policy allkeys-random
$master config set client-output-buffer-limit "replica 100000000 100000000 300" $master config set client-output-buffer-limit "replica 100000000 100000000 300"
$master config set repl-backlog-size [expr {10*1024}] $master config set repl-backlog-size [expr {10*1024}]
@ -212,7 +212,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
assert {[$master dbsize] == 100} assert {[$master dbsize] == 100}
assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers
assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB set delta_max [expr {$cmd_count / 2}] ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
assert {$delta < $delta_max && $delta > -$delta_max}
$master client kill type slave $master client kill type slave
set killed_used [s -1 used_memory] set killed_used [s -1 used_memory]
@ -221,7 +222,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}]
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
assert {$killed_slave_buf == 0} assert {$killed_slave_buf == 0}
assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max}
} }
# unfreeze slave process (after the 'test' succeeded or failed, but before we attempt to terminate the server # unfreeze slave process (after the 'test' succeeded or failed, but before we attempt to terminate the server

View File

@ -1,4 +1,4 @@
set testmodule [file normalize src/modules/hellofilter.so] set testmodule [file normalize tests/modules/commandfilter.so]
start_server {tags {"modules"}} { start_server {tags {"modules"}} {
r module load $testmodule log-key 0 r module load $testmodule log-key 0
@ -27,7 +27,7 @@ start_server {tags {"modules"}} {
test {Command Filter applies on RM_Call() commands} { test {Command Filter applies on RM_Call() commands} {
r del log-key r del log-key
r hellofilter.ping r commandfilter.ping
r lrange log-key 0 -1 r lrange log-key 0 -1
} "{ping @log}" } "{ping @log}"
@ -39,13 +39,13 @@ start_server {tags {"modules"}} {
test {Command Filter applies on Lua redis.call() that calls a module} { test {Command Filter applies on Lua redis.call() that calls a module} {
r del log-key r del log-key
r eval "redis.call('hellofilter.ping')" 0 r eval "redis.call('commandfilter.ping')" 0
r lrange log-key 0 -1 r lrange log-key 0 -1
} "{ping @log}" } "{ping @log}"
test {Command Filter is unregistered implicitly on module unload} { test {Command Filter is unregistered implicitly on module unload} {
r del log-key r del log-key
r module unload hellofilter r module unload commandfilter
r set mykey @log r set mykey @log
r lrange log-key 0 -1 r lrange log-key 0 -1
} {} } {}
@ -59,14 +59,14 @@ start_server {tags {"modules"}} {
assert_equal "{set mykey @log}" [r lrange log-key 0 -1] assert_equal "{set mykey @log}" [r lrange log-key 0 -1]
# Unregister # Unregister
r hellofilter.unregister r commandfilter.unregister
r del log-key r del log-key
r set mykey @log r set mykey @log
r lrange log-key 0 -1 r lrange log-key 0 -1
} {} } {}
r module unload hellofilter r module unload commandfilter
r module load $testmodule log-key 1 r module load $testmodule log-key 1
test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} {
@ -74,10 +74,10 @@ start_server {tags {"modules"}} {
assert_equal "{set mykey @log}" [r lrange log-key 0 -1] assert_equal "{set mykey @log}" [r lrange log-key 0 -1]
r del log-key r del log-key
r hellofilter.ping r commandfilter.ping
assert_equal {} [r lrange log-key 0 -1] assert_equal {} [r lrange log-key 0 -1]
r eval "redis.call('hellofilter.ping')" 0 r eval "redis.call('commandfilter.ping')" 0
assert_equal {} [r lrange log-key 0 -1] assert_equal {} [r lrange log-key 0 -1]
} }