diff --git a/deps/hiredis/read.c b/deps/hiredis/read.c index c75c3435f..cc0f3cc72 100644 --- a/deps/hiredis/read.c +++ b/deps/hiredis/read.c @@ -31,6 +31,7 @@ #include "fmacros.h" #include +#include #include #ifndef _MSC_VER #include diff --git a/redis.conf b/redis.conf index 99f66a667..2a10eabe5 100644 --- a/redis.conf +++ b/redis.conf @@ -942,13 +942,7 @@ aof-use-rdb-preamble yes lua-time-limit 5000 ################################ REDIS CLUSTER ############################### -# -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# WARNING EXPERIMENTAL: Redis Cluster is considered to be stable code, however -# in order to mark it as "mature" we need to wait for a non trivial percentage -# of users to deploy it in production. -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# + # Normal Redis instances can't be part of a Redis Cluster; only nodes that are # started as cluster nodes can. In order to start a Redis instance as a # cluster node enable the cluster support uncommenting the following: diff --git a/runtest-moduleapi b/runtest-moduleapi new file mode 100755 index 000000000..84cdb9bb8 --- /dev/null +++ b/runtest-moduleapi @@ -0,0 +1,16 @@ +#!/bin/sh +TCL_VERSIONS="8.5 8.6" +TCLSH="" + +for VERSION in $TCL_VERSIONS; do + TCL=`which tclsh$VERSION 2>/dev/null` && TCLSH=$TCL +done + +if [ -z $TCLSH ] +then + echo "You need tcl 8.5 or newer in order to run the Redis test" + exit 1 +fi + +make -C tests/modules && \ +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" diff --git a/src/Makefile b/src/Makefile index 6ba411ecc..aa7b204ff 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -STD=-std=c99 -pedantic -DREDIS_STATIC='' +STD=-std=c11 -pedantic -DREDIS_STATIC='' CXX_STD=-std=c++14 -pedantic -fno-rtti -fno-exceptions -D__STDC_FORMAT_MACROS ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) diff --git a/src/acl.cpp b/src/acl.cpp index d7d352d42..259d9fb61 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -542,6 +542,8 @@ struct redisCommand *ACLLookupCommand(const char *name) { * and command ID. */ void ACLResetSubcommandsForCommand(user *u, unsigned long id) { if (u->allowed_subcommands && u->allowed_subcommands[id]) { + for (int i = 0; u->allowed_subcommands[id][i]; i++) + sdsfree(u->allowed_subcommands[id][i]); zfree(u->allowed_subcommands[id]); u->allowed_subcommands[id] = NULL; } diff --git a/src/aof.cpp b/src/aof.cpp index 19c6c4a12..bfebf5a5c 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -199,6 +199,12 @@ ssize_t aofRewriteBufferWrite(int fd) { * AOF file implementation * ------------------------------------------------------------------------- */ +/* Return true if an AOf fsync is currently already in progress in a + * BIO thread. */ +int aofFsyncInProgress(void) { + return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; +} + /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { @@ -337,10 +343,24 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; - if (sdslen(g_pserver->aof_buf) == 0) return; + if (sdslen(g_pserver->aof_buf) == 0) { + /* Check if we need to do fsync even the aof buffer is empty, + * because previously in AOF_FSYNC_EVERYSEC mode, fsync is + * called only when aof buffer is not empty, so if users + * stop write commands before fsync called in one second, + * the data in page cache cannot be flushed in time. */ + if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && + g_pserver->aof_fsync_offset != g_pserver->aof_current_size && + g_pserver->unixtime > g_pserver->aof_last_fsync && + !(sync_in_progress = aofFsyncInProgress())) { + goto try_fsync; + } else { + return; + } + } if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC) - sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; + sync_in_progress = aofFsyncInProgress(); if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. @@ -472,6 +492,7 @@ void flushAppendOnlyFile(int force) { g_pserver->aof_buf = sdsempty(); } +try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (g_pserver->aof_no_fsync_on_rewrite && @@ -486,10 +507,14 @@ void flushAppendOnlyFile(int force) { redis_fsync(g_pserver->aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; g_pserver->aof_last_fsync = g_pserver->unixtime; } else if ((g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && g_pserver->unixtime > g_pserver->aof_last_fsync)) { - if (!sync_in_progress) aof_background_fsync(g_pserver->aof_fd); + if (!sync_in_progress) { + aof_background_fsync(g_pserver->aof_fd); + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; + } g_pserver->aof_last_fsync = g_pserver->unixtime; } } @@ -703,6 +728,7 @@ int loadAppendOnlyFile(char *filename) { * operation is received. */ if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { g_pserver->aof_current_size = 0; + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; fclose(fp); return C_ERR; } @@ -842,6 +868,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ stopLoading(); aofUpdateCurrentSize(); g_pserver->aof_rewrite_base_size = g_pserver->aof_current_size; + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; return C_OK; readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ diff --git a/src/bitops.cpp b/src/bitops.cpp index 98e8b9bc7..02034f377 100644 --- a/src/bitops.cpp +++ b/src/bitops.cpp @@ -995,12 +995,18 @@ void bitfieldCommand(client *c) { /* Lookup for read is ok if key doesn't exit, but errors * if it's not a string. */ o = lookupKeyRead(c->db,c->argv[1]); - if (o != nullptr && checkType(c,o,OBJ_STRING)) return; + if (o != nullptr && checkType(c,o,OBJ_STRING)) { + zfree(ops); + return; + } } else { /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, - highest_write_offset)) == nullptr) return; + highest_write_offset)) == nullptr) { + zfree(ops); + return; + } } addReplyArrayLen(c,numops); diff --git a/src/defrag.cpp b/src/defrag.cpp index 64f7a3bb2..2e9abd290 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -47,7 +47,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged); +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); /* Defrag helper for generic allocations. * @@ -353,7 +353,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { sdsele = (sds)ln->value; if ((newsds = activeDefragSds(sdsele))) { /* When defragging an sds value, we need to update the dict key */ - unsigned int hash = dictGetHash(d, sdsele); + uint64_t hash = dictGetHash(d, sdsele); replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); ln->value = newsds; defragged++; @@ -390,7 +390,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { * moved. Return value is the the dictEntry if found, or NULL if not found. * NOTE: this is very ugly code, but it let's us avoid the complication of * doing a scan on another dict. */ -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) { +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); if (deref) { dictEntry *de = *deref; diff --git a/src/evict.cpp b/src/evict.cpp index f7b99f389..4be6bf761 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -78,7 +78,7 @@ unsigned int getLRUClock(void) { unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/g_pserver->hz <= LRU_CLOCK_RESOLUTION) { - atomicGet(g_pserver->lruclock,lruclock); + lruclock = g_pserver->lruclock; } else { lruclock = getLRUClock(); } diff --git a/src/module.cpp b/src/module.cpp index 04ca21a97..56a951977 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -763,6 +763,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->usedby = listCreate(); module->usingMods = listCreate(); module->filters = listCreate(); + module->in_call = 0; ctx->module = module; } @@ -3786,14 +3787,7 @@ void moduleHandleBlockedClients(int iel) { * replies to send to the client in a thread safe context. * We need to glue such replies to the client output buffer and * free the temporary client we just used for the replies. */ - if (c) { - if (bc->reply_client->bufpos) - addReplyProto(c,bc->reply_client->buf, - bc->reply_client->bufpos); - if (listLength(bc->reply_client->reply)) - listJoin(c->reply,bc->reply_client->reply); - c->reply_bytes += bc->reply_client->reply_bytes; - } + if (c) AddReplyFromClient(c, bc->reply_client); freeClient(bc->reply_client); if (c != NULL) { @@ -3917,7 +3911,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * in order to keep things like the currently selected database and similar * things. */ ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN); - if (bc) selectDb(ctx->client,bc->dbid); + if (bc) { + selectDb(ctx->client,bc->dbid); + ctx->client->id = bc->client->id; + } return ctx; } diff --git a/src/modules/Makefile b/src/modules/Makefile index 537aa0daf..4f6b50f2e 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so +all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -47,11 +47,6 @@ hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo -hellofilter.xo: ../redismodule.h - -hellofilter.so: hellofilter.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc - testmodule.xo: ../redismodule.h testmodule.so: testmodule.xo diff --git a/src/networking.cpp b/src/networking.cpp index 29d17c8a3..0c70922cd 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "atomicvar.h" +#include #include #include #include @@ -167,7 +168,7 @@ client *createClient(int fd, int iel) { selectDb(c,0); uint64_t client_id; - atomicGetIncr(g_pserver->next_client_id,client_id,1); + client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; fastlock_init(&c->lock); c->id = client_id; @@ -1011,6 +1012,19 @@ void addReplySubcommandSyntaxError(client *c) { sdsfree(cmd); } +/* Append 'src' client output buffers into 'dst' client output buffers. + * This function clears the output buffers of 'src' */ +void AddReplyFromClient(client *dst, client *src) { + if (prepareClientToWrite(dst, false) != C_OK) + return; + addReplyProto(dst,src->buf, src->bufpos); + if (listLength(src->reply)) + listJoin(dst->reply,src->reply); + dst->reply_bytes += src->reply_bytes; + src->reply_bytes = 0; + src->bufpos = 0; +} + /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ @@ -1379,6 +1393,11 @@ void freeClient(client *c) { * a context where calling freeClient() is not possible, because the client * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { + /* We need to handle concurrent access to the server.clients_to_close list + * only in the freeClientAsync() function, since it's the only function that + * may access the list while Redis uses I/O threads. All the other accesses + * are in the context of the main thread while the other threads are + * idle. */ if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; AeLocker lock; lock.arm(nullptr); @@ -1414,7 +1433,12 @@ client *lookupClientByID(uint64_t id) { } /* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed. */ + * is still valid after the call, C_ERR if it was freed because of some + * error. + * + * This function is called by threads, but always with handler_installed + * set to 0. So when handler_installed is set to 0 the function must be + * thread safe. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; clientReplyBlock *o; @@ -1480,7 +1504,7 @@ int writeToClient(int fd, client *c, int handler_installed) { !(c->flags & CLIENT_SLAVE)) break; } - __atomic_fetch_add(&g_pserver->stat_net_output_bytes, totwritten, __ATOMIC_RELAXED); + g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; @@ -1951,13 +1975,48 @@ int processMultibulkBuffer(client *c) { return C_ERR; } +/* This function calls processCommand(), but also performs a few sub tasks + * that are useful in that context: + * + * 1. It sets the current client to the client 'c'. + * 2. In the case of master clients, the replication offset is updated. + * 3. The client is reset unless there are reasons to avoid doing it. + * + * The function returns C_ERR in case the client was freed as a side effect + * of processing the command, otherwise C_OK is returned. */ +int processCommandAndResetClient(client *c, int flags) { + int deadclient = 0; + serverTL->current_client = c; + if (processCommand(c, flags) == C_OK) { + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } + } + if (serverTL->current_client == NULL) deadclient = 1; + serverTL->current_client = NULL; + /* freeMemoryIfNeeded may flush slave output buffers. This may + * result into a slave, that may be the active client, to be + * freed. */ + return deadclient ? C_ERR : C_OK; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c, int callFlags) { AssertCorrectThread(c); - bool fFreed = false; /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { @@ -2003,48 +2062,38 @@ void processInputBuffer(client *c, int callFlags) { } else { AeLocker locker; locker.arm(c); - serverTL->current_client = c; - /* Only reset the client when the command was executed. */ - if (processCommand(c, callFlags) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) - resetClient(c); + /* We are finally ready to execute the command. */ + if (processCommandAndResetClient(c, callFlags) == C_ERR) { + /* If the client is no longer valid, we avoid exiting this + * loop and trimming the client buffer later. So we return + * ASAP in that case. */ + return; } - /* freeMemoryIfNeeded may flush slave output buffers. This may - * result into a slave, that may be the active client, to be - * freed. */ - if (serverTL->current_client == NULL) { - fFreed = true; - break; - } - serverTL->current_client = NULL; } } /* Trim to pos */ - if (!fFreed && c->qb_pos) { + if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } } /* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-slaves, in case the client 'c' + * the replication forwarding to the sub-replicas, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c, CMD_CALL_FULL); } else { + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c, CMD_CALL_FULL); size_t applied = c->reploff - prev_offset; @@ -2104,16 +2153,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -2133,10 +2178,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); - sdsfree(bytes); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + sdsfree(bytes); + freeClientAsync(c); return; } @@ -2900,3 +2943,4 @@ int processEventsWhileBlocked(int iel) { aeAcquireLock(); return count; } + diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 8d50303ee..45170f037 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -258,6 +258,19 @@ static redisConfig *getRedisConfig(const char *ip, int port, else fprintf(stderr,"%s: %s\n",hostsocket,err); goto fail; } + + if(config.auth){ + void *authReply = NULL; + redisAppendCommand(c, "AUTH %s", config.auth); + if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; + if (reply) freeReplyObject(reply); + reply = ((redisReply *) authReply); + if (reply->type == REDIS_REPLY_ERROR) { + fprintf(stderr, "ERROR: %s\n", reply->str); + goto fail; + } + } + redisAppendCommand(c, "CONFIG GET %s", "save"); redisAppendCommand(c, "CONFIG GET %s", "appendonly"); @@ -1196,7 +1209,7 @@ static int fetchClusterSlotsConfiguration(client c) { assert(reply->type == REDIS_REPLY_ARRAY); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; - assert(r->type = REDIS_REPLY_ARRAY); + assert(r->type == REDIS_REPLY_ARRAY); assert(r->elements >= 3); int from, to, slot; from = r->element[0]->integer; @@ -1298,7 +1311,7 @@ int parseOptions(int argc, const char **argv) { if (*p < '0' || *p > '9') goto invalid; } config.randomkeys = 1; - config.randomkeys_keyspacelen = atoi(argv[++i]); + config.randomkeys_keyspacelen = atoi(next); if (config.randomkeys_keyspacelen < 0) config.randomkeys_keyspacelen = 0; } else if (!strcmp(argv[i],"-q")) { diff --git a/src/redis-check-aof.cpp b/src/redis-check-aof.cpp index cb707b9ac..de9ab1f77 100644 --- a/src/redis-check-aof.cpp +++ b/src/redis-check-aof.cpp @@ -37,7 +37,7 @@ snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \ } -static char error[1024]; +static char error[1044]; static off_t epos; int consumeNewline(char *buf) { diff --git a/src/server.cpp b/src/server.cpp index 7a308fea3..4d10b478f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1745,16 +1745,17 @@ void databasesCron(void) { * every object access, and accuracy is not needed. To access a global var is * a lot faster than calling time(NULL) */ void updateCachedTime(void) { - time_t unixtime = time(NULL); - atomicSet(g_pserver->unixtime,unixtime); + g_pserver->unixtime = time(NULL); g_pserver->mstime = mstime(); - /* To get information about daylight saving time, we need to call localtime_r - * and cache the result. However calling localtime_r in this context is safe - * since we will never fork() while here, in the main thread. The logging - * function will call a thread safe version of localtime that has no locks. */ + /* To get information about daylight saving time, we need to call + * localtime_r and cache the result. However calling localtime_r in this + * context is safe since we will never fork() while here, in the main + * thread. The logging function will call a thread safe version of + * localtime that has no locks. */ struct tm tm; - localtime_r(&g_pserver->unixtime,&tm); + time_t ut = g_pserver->unixtime; + localtime_r(&ut,&tm); g_pserver->daylight_active = tm.tm_isdst; } @@ -1826,8 +1827,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - unsigned long lruclock = getLRUClock(); - atomicSet(g_pserver->lruclock,lruclock); + g_pserver->lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > g_pserver->stat_peak_memory) @@ -2119,6 +2119,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); aeAcquireLock(); + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -2143,6 +2146,9 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(iel); + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -2287,10 +2293,6 @@ void initMasterInfo(redisMaster *master) void initServerConfig(void) { int j; - serverAssert(pthread_mutex_init(&g_pserver->next_client_id_mutex,NULL) == 0); - serverAssert(pthread_mutex_init(&g_pserver->lruclock_mutex,NULL) == 0); - serverAssert(pthread_mutex_init(&g_pserver->unixtime_mutex,NULL) == 0); - updateCachedTime(); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; @@ -2405,8 +2407,7 @@ void initServerConfig(void) { g_pserver->lua_time_limit = LUA_SCRIPT_TIME_LIMIT; g_pserver->fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; - unsigned int lruclock = getLRUClock(); - atomicSet(g_pserver->lruclock,lruclock); + g_pserver->lruclock = getLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -3986,8 +3987,7 @@ sds genRedisInfoString(const char *section) { call_uname = 0; } - unsigned int lruclock; - atomicGet(g_pserver->lruclock,lruclock); + unsigned int lruclock = g_pserver->lruclock.load(); info = sdscatprintf(info, "# Server\r\n" "redis_version:%s\r\n" @@ -4299,8 +4299,8 @@ sds genRedisInfoString(const char *section) { g_pserver->stat_numconnections, g_pserver->stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), - g_pserver->stat_net_input_bytes, - g_pserver->stat_net_output_bytes, + g_pserver->stat_net_input_bytes.load(), + g_pserver->stat_net_output_bytes.load(), (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, g_pserver->stat_rejected_conn, @@ -4955,8 +4955,6 @@ int main(int argc, char **argv) { return sha1Test(argc, argv); } else if (!strcasecmp(argv[2], "util")) { return utilTest(argc, argv); - } else if (!strcasecmp(argv[2], "sds")) { - return sdsTest(argc, argv); } else if (!strcasecmp(argv[2], "endianconv")) { return endianconvTest(argc, argv); } else if (!strcasecmp(argv[2], "crc64")) { diff --git a/src/server.h b/src/server.h index e1db21e47..f315bdcd8 100644 --- a/src/server.h +++ b/src/server.h @@ -49,6 +49,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -150,6 +151,8 @@ public: #define CONFIG_DEFAULT_TCP_BACKLOG 511 /* TCP listen backlog. */ #define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* Default client timeout: infinite */ #define CONFIG_DEFAULT_DBNUM 16 +#define CONFIG_DEFAULT_IO_THREADS_NUM 1 /* Single threaded by default */ +#define CONFIG_DEFAULT_IO_THREADS_DO_READS 0 /* Read + parse from threads? */ #define CONFIG_MAX_LINE 1024 #define CRON_DBS_PER_CALL 16 #define NET_MAX_WRITES_PER_EVENT (1024*64) @@ -1236,7 +1239,7 @@ struct redisServer { struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS]; - unsigned int lruclock; /* Clock for LRU eviction */ + std::atomic lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1268,7 +1271,7 @@ struct redisServer { mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - uint64_t next_client_id; /* Next client unique ID. Incremental. */ + std::atomic next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1305,8 +1308,8 @@ struct redisServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - long long stat_net_input_bytes; /* Bytes read from network. */ - long long stat_net_output_bytes; /* Bytes written to network. */ + std::atomic stat_net_input_bytes; /* Bytes read from network. */ + std::atomic stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ /* The following two are used to track instantaneous metrics, like @@ -1317,7 +1320,6 @@ struct redisServer { long long samples[STATS_METRIC_SAMPLES]; int idx; } inst_metric[STATS_METRIC_COUNT]; - /* AOF persistence */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ @@ -1327,6 +1329,7 @@ struct redisServer { off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ + off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ @@ -1464,10 +1467,10 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ + std::atomic unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + long long mstime; /* 'unixtime' with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1526,12 +1529,6 @@ struct redisServer { int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ - /* Mutexes used to protect atomic variables when atomic builtins are - * not available. */ - pthread_mutex_t lruclock_mutex; - pthread_mutex_t next_client_id_mutex; - pthread_mutex_t unixtime_mutex; - int fActiveReplica; /* Can this replica also be a master? */ struct fastlock flock; @@ -1540,6 +1537,9 @@ struct redisServer { // Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition uint64_t mvcc_tstamp; + + /* System hardware info */ + size_t system_memory_size; /* Total memory in system as reported by OS */ }; typedef struct pubsubPattern { @@ -1709,6 +1709,7 @@ void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); +void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); @@ -1764,6 +1765,7 @@ int writeToClient(int fd, client *c, int handler_installed); void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); +void initThreadedIO(void); // Special Thread-safe addReply() commands for posting messages to clients from a different thread void addReplyAsync(client *c, robj_roptr obj); diff --git a/src/t_list.cpp b/src/t_list.cpp index a65aea8ad..b072bf957 100644 --- a/src/t_list.cpp +++ b/src/t_list.cpp @@ -617,7 +617,7 @@ void rpoplpushCommand(client *c) { * the AOF and replication channel. * * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that + * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that * we can propagate the command properly. * * The function returns C_OK if we are able to serve the client, otherwise diff --git a/src/t_stream.cpp b/src/t_stream.cpp index d67dd3f39..d2b2cbbf0 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -492,14 +492,14 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI streamEncodeID(si->start_key,start); } else { si->start_key[0] = 0; - si->start_key[0] = 0; + si->start_key[1] = 0; } if (end) { streamEncodeID(si->end_key,end); } else { si->end_key[0] = UINT64_MAX; - si->end_key[0] = UINT64_MAX; + si->end_key[1] = UINT64_MAX; } /* Seek the correct node in the radix tree. */ diff --git a/src/ziplist.c b/src/ziplist.c index 3b618e63a..4d2e89a45 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -576,7 +576,7 @@ void zipEntry(unsigned char *p, zlentry *e) { /* Create a new empty ziplist. */ unsigned char *ziplistNew(void) { - unsigned int bytes = ZIPLIST_HEADER_SIZE+1; + unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE; unsigned char *zl = zmalloc(bytes, MALLOC_SHARED); ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 2530511c9..9a833457e 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -166,12 +166,15 @@ start_server {} { # Pick a random slave set slave_id [expr {($master_id+1)%5}] set sync_count [status $R($master_id) sync_full] + set sync_partial [status $R($master_id) sync_partial_ok] catch { $R($slave_id) config rewrite $R($slave_id) debug restart } + # note: just waiting for connected_slaves==4 has a race condition since + # we might do the check before the master realized that the slave disconnected wait_for_condition 50 2000 { - [status $R($master_id) connected_slaves] == 4 + [status $R($master_id) sync_partial_ok] == $sync_partial + 1 } else { fail "Replica not reconnecting" } diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index a3bce2a4c..bf8682446 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -79,6 +79,32 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec stop_bg_complex_data $load_handle0 stop_bg_complex_data $load_handle1 stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + set retry 10 while {$retry && ([$master debug digest] ne [$slave debug digest])}\ { diff --git a/tests/modules/Makefile b/tests/modules/Makefile new file mode 100644 index 000000000..014d20afa --- /dev/null +++ b/tests/modules/Makefile @@ -0,0 +1,24 @@ + +# find the OS +uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') + +# Compile flags for linux / osx +ifeq ($(uname_S),Linux) + SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -shared +else + SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup +endif + +.SUFFIXES: .c .so .xo .o + +all: commandfilter.so + +.c.xo: + $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ + +commandfilter.xo: ../../src/redismodule.h + +commandfilter.so: commandfilter.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/src/modules/hellofilter.c b/tests/modules/commandfilter.c similarity index 80% rename from src/modules/hellofilter.c rename to tests/modules/commandfilter.c index 448e12983..d25d49c44 100644 --- a/src/modules/hellofilter.c +++ b/tests/modules/commandfilter.c @@ -1,18 +1,18 @@ #define REDISMODULE_EXPERIMENTAL_API -#include "../redismodule.h" +#include "redismodule.h" #include static RedisModuleString *log_key_name; -static const char log_command_name[] = "hellofilter.log"; -static const char ping_command_name[] = "hellofilter.ping"; -static const char unregister_command_name[] = "hellofilter.unregister"; +static const char log_command_name[] = "commandfilter.log"; +static const char ping_command_name[] = "commandfilter.ping"; +static const char unregister_command_name[] = "commandfilter.unregister"; static int in_log_command = 0; static RedisModuleCommandFilter *filter = NULL; -int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -23,7 +23,7 @@ int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, return REDISMODULE_OK; } -int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -39,7 +39,7 @@ int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a return REDISMODULE_OK; } -int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); @@ -74,9 +74,9 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar return REDISMODULE_OK; } -void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) +void CommandFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) { - if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ + if (in_log_command) return; /* don't process our own RM_Call() from CommandFilter_LogCommand() */ /* Fun manipulations: * - Remove @delme @@ -117,7 +117,7 @@ void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) + if (RedisModule_Init(ctx,"commandfilter",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (argc != 2) { @@ -130,18 +130,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_StringToLongLong(argv[1], &noself); if (RedisModule_CreateCommand(ctx,log_command_name, - HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,ping_command_name, - HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,unregister_command_name, - HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, + if ((filter = RedisModule_RegisterCommandFilter(ctx, CommandFilter_CommandFilter, noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) == NULL) return REDISMODULE_ERR; diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d2f281526..1442067f5 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,7 +63,6 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf - modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0 @@ -504,7 +503,7 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--only}} { lappend ::only_tests $arg incr j - } elseif {$opt eq {--skiptill}} { + } elseif {$opt eq {--skip-till}} { set ::skip_till $arg incr j } elseif {$opt eq {--list-tests}} { diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 82c75f82d..058441433 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -108,4 +108,11 @@ start_server {tags {"acl"}} { assert_match {*+debug|segfault*} $cmdstr assert_match {*+acl*} $cmdstr } + + test {ACL #5998 regression: memory leaks adding / removing subcommands} { + r AUTH default "" + r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c + r ACL setuser newuser -debug + # The test framework will detect a leak if any. + } } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 1def57af5..0f64ddc18 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -161,7 +161,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} } # make sure master doesn't disconnect slave because of timeout - $master config set repl-timeout 300 ;# 5 minutes + $master config set repl-timeout 1200 ;# 20 minutes (for valgrind and slow machines) $master config set maxmemory-policy allkeys-random $master config set client-output-buffer-limit "replica 100000000 100000000 300" $master config set repl-backlog-size [expr {10*1024}] @@ -212,7 +212,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} assert {[$master dbsize] == 100} assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers - assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + set delta_max [expr {$cmd_count / 2}] ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta < $delta_max && $delta > -$delta_max} $master client kill type slave set killed_used [s -1 used_memory] @@ -221,7 +222,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] assert {$killed_slave_buf == 0} - assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max} } # unfreeze slave process (after the 'test' succeeded or failed, but before we attempt to terminate the server diff --git a/tests/modules/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl similarity index 86% rename from tests/modules/commandfilter.tcl rename to tests/unit/moduleapi/commandfilter.tcl index 1e5c41d2b..6078f64f2 100644 --- a/tests/modules/commandfilter.tcl +++ b/tests/unit/moduleapi/commandfilter.tcl @@ -1,4 +1,4 @@ -set testmodule [file normalize src/modules/hellofilter.so] +set testmodule [file normalize tests/modules/commandfilter.so] start_server {tags {"modules"}} { r module load $testmodule log-key 0 @@ -27,7 +27,7 @@ start_server {tags {"modules"}} { test {Command Filter applies on RM_Call() commands} { r del log-key - r hellofilter.ping + r commandfilter.ping r lrange log-key 0 -1 } "{ping @log}" @@ -39,13 +39,13 @@ start_server {tags {"modules"}} { test {Command Filter applies on Lua redis.call() that calls a module} { r del log-key - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 r lrange log-key 0 -1 } "{ping @log}" test {Command Filter is unregistered implicitly on module unload} { r del log-key - r module unload hellofilter + r module unload commandfilter r set mykey @log r lrange log-key 0 -1 } {} @@ -59,14 +59,14 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] # Unregister - r hellofilter.unregister + r commandfilter.unregister r del log-key r set mykey @log r lrange log-key 0 -1 } {} - r module unload hellofilter + r module unload commandfilter r module load $testmodule log-key 1 test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { @@ -74,10 +74,10 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] r del log-key - r hellofilter.ping + r commandfilter.ping assert_equal {} [r lrange log-key 0 -1] - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 assert_equal {} [r lrange log-key 0 -1] }