diff --git a/deps/hiredis/async_private.h b/deps/hiredis/async_private.h index b9d23fffd..ea0558d42 100644 --- a/deps/hiredis/async_private.h +++ b/deps/hiredis/async_private.h @@ -51,7 +51,7 @@ #define _EL_CLEANUP(ctx) do { \ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ ctx->ev.cleanup = NULL; \ - } while(0); + } while(0) static inline void refreshTimeout(redisAsyncContext *ctx) { #define REDIS_TIMER_ISSET(tvp) \ diff --git a/redis.conf b/redis.conf index 849f171bc..a5062fda9 100644 --- a/redis.conf +++ b/redis.conf @@ -196,9 +196,12 @@ tcp-keepalive 300 # # tls-cluster yes -# Explicitly specify TLS versions to support. Allowed values are case insensitive -# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1) or -# any combination. To enable only TLSv1.2 and TLSv1.3, use: +# By default, only TLSv1.2 and TLSv1.3 are enabled and it is highly recommended +# that older formally deprecated versions are kept disabled to reduce the attack surface. +# You can explicitly specify TLS versions to support. +# Allowed values are case insensitive and include "TLSv1", "TLSv1.1", "TLSv1.2", +# "TLSv1.3" (OpenSSL >= 1.1.1) or any combination. +# To enable only TLSv1.2 and TLSv1.3, use: # # tls-protocols "TLSv1.2 TLSv1.3" @@ -688,7 +691,7 @@ replica-priority 100 # Redis implements server assisted support for client side caching of values. # This is implemented using an invalidation table that remembers, using -# 16 millions of slots, what clients may have certain subsets of keys. In turn +# a radix key indexed by key name, what clients have which keys. In turn # this is used in order to send invalidation messages to clients. Please # check this page to understand more about the feature: # @@ -1973,3 +1976,10 @@ jemalloc-bg-thread yes # # Set bgsave child process to cpu affinity 1,10,11 # bgsave_cpulist 1,10-11 + +# In some cases redis will emit warnings and even refuse to start if it detects +# that the system is in bad state, it is possible to suppress these warnings +# by setting the following config which takes a space delimited list of warnings +# to suppress +# +# ignore-warnings ARM64-COW-BUG diff --git a/src/Makefile b/src/Makefile index 0329da8c9..3bc9f11c0 100644 --- a/src/Makefile +++ b/src/Makefile @@ -115,8 +115,18 @@ else ifeq ($(uname_S),Darwin) # Darwin FINAL_LIBS+= -ldl + # Homebrew's OpenSSL is not linked to /usr/local to avoid + # conflicts with the system's LibreSSL installation so it + # must be referenced explicitly during build. +ifeq ($(uname_M),arm64) + # Homebrew arm64 uses /opt/homebrew as HOMEBREW_PREFIX + OPENSSL_CFLAGS=-I/opt/homebrew/opt/openssl/include + OPENSSL_LDFLAGS=-L/opt/homebrew/opt/openssl/lib +else + # Homebrew x86/ppc uses /usr/local as HOMEBREW_PREFIX OPENSSL_CFLAGS=-I/usr/local/opt/openssl/include OPENSSL_LDFLAGS=-L/usr/local/opt/openssl/lib +endif else ifeq ($(uname_S),AIX) # AIX @@ -260,11 +270,11 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) -REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o +REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) -REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o +REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o REDIS_CHECK_RDB_NAME=redis-check-rdb$(PROG_SUFFIX) REDIS_CHECK_AOF_NAME=redis-check-aof$(PROG_SUFFIX) @@ -336,7 +346,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) -dict-benchmark: dict.c zmalloc.c sds.c siphash.c +dict-benchmark: dict.c zmalloc.c sds.c siphash.c mt19937-64.c $(REDIS_CC) $(FINAL_CFLAGS) $^ -D DICT_BENCHMARK_MAIN -o $@ $(FINAL_LIBS) DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) @@ -405,8 +415,8 @@ install: all $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN) $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN) $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN) - $(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN) - $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN) + @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_CHECK_RDB_NAME) + @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_CHECK_AOF_NAME) @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME) uninstall: diff --git a/src/acl.c b/src/acl.c index a1a7c4237..14d023cc3 100644 --- a/src/acl.c +++ b/src/acl.c @@ -174,15 +174,15 @@ sds ACLHashPassword(unsigned char *cleartext, size_t len) { return sdsnewlen(hex,HASH_PASSWORD_LEN); } -/* Given a hash and the hash length, returns C_OK if it is a valid password +/* Given a hash and the hash length, returns C_OK if it is a valid password * hash, or C_ERR otherwise. */ int ACLCheckPasswordHash(unsigned char *hash, int hashlen) { if (hashlen != HASH_PASSWORD_LEN) { - return C_ERR; + return C_ERR; } - + /* Password hashes can only be characters that represent - * hexadecimal values, which are numbers and lowercase + * hexadecimal values, which are numbers and lowercase * characters 'a' through 'f'. */ for(int i = 0; i < HASH_PASSWORD_LEN; i++) { char c = hash[i]; @@ -2184,18 +2184,30 @@ void aclCommand(client *c) { } } else if (c->argc == 2 && !strcasecmp(sub,"help")) { const char *help[] = { -"LOAD -- Reload users from the ACL file.", -"SAVE -- Save the current config to the ACL file.", -"LIST -- Show user details in config file format.", -"USERS -- List all the registered usernames.", -"SETUSER [attribs ...] -- Create or modify a user.", -"GETUSER -- Get the user details.", -"DELUSER [...] -- Delete a list of users.", -"CAT -- List available categories.", -"CAT -- List commands inside category.", -"GENPASS [] -- Generate a secure user password.", -"WHOAMI -- Return the current connection username.", -"LOG [ | RESET] -- Show the ACL log entries.", +"CAT []", +" List all commands that belong to , or all command categories", +" when no category is specified.", +"DELUSER [ ...]", +" Delete a list of users.", +"GETUSER ", +" Get the user's details.", +"GENPASS []", +" Generate a secure 256-bit user password. The optional `bits` argument can", +" be used to specify a different size.", +"LIST", +" Show users details in config file format.", +"LOAD", +" Reload users from the ACL file.", +"LOG [ | RESET]", +" Show the ACL log entries.", +"SAVE", +" Save the current config to the ACL file.", +"SETUSER [ ...]", +" Create or modify a user with the specified attributes.", +"USERS", +" List all the registered usernames.", +"WHOAMI", +" Return the current connection username.", NULL }; addReplyHelp(c,help); @@ -2224,7 +2236,7 @@ void addReplyCommandCategories(client *c, struct redisCommand *cmd) { void authCommand(client *c) { /* Only two or three argument forms are allowed. */ if (c->argc > 3) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } diff --git a/src/aof.c b/src/aof.c index 79b2f1284..d3191277f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -206,29 +206,27 @@ int aofFsyncInProgress(void) { /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { - bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); + bioCreateFsyncJob(fd); } /* Kills an AOFRW child process if exists */ void killAppendOnlyChild(void) { int statloc; /* No AOFRW child? return. */ - if (server.aof_child_pid == -1) return; + if (server.child_type != CHILD_TYPE_AOF) return; /* Kill AOFRW child, wait for child exit. */ serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld", - (long) server.aof_child_pid); - if (kill(server.aof_child_pid,SIGUSR1) != -1) { - while(wait3(&statloc,0,NULL) != server.aof_child_pid); + (long) server.child_pid); + if (kill(server.child_pid,SIGUSR1) != -1) { + while(wait3(&statloc,0,NULL) != server.child_pid); } /* Reset the buffer accumulating changes while the child saves. */ aofRewriteBufferReset(); - aofRemoveTempFile(server.aof_child_pid); - server.aof_child_pid = -1; + aofRemoveTempFile(server.child_pid); + resetChildState(); server.aof_rewrite_time_start = -1; /* Close pipes used for IPC between the two processes. */ aofClosePipes(); - closeChildInfoPipe(); - updateDictResizePolicy(); } /* Called when the user switches from "appendonly yes" to "appendonly no" @@ -265,14 +263,14 @@ int startAppendOnly(void) { strerror(errno)); return C_ERR; } - if (hasActiveChildProcess() && server.aof_child_pid == -1) { + if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) { server.aof_rewrite_scheduled = 1; serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and * start a new one: the old one cannot be reused because it is not * accumulating the AOF buffer. */ - if (server.aof_child_pid != -1) { + if (server.child_type == CHILD_TYPE_AOF) { serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now."); killAppendOnlyChild(); } @@ -646,7 +644,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ - if (server.aof_child_pid != -1) + if (server.child_type == CHILD_TYPE_AOF) aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); @@ -1427,6 +1425,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { dictEntry *de; size_t processed = 0; int j; + long key_count = 0; + long long cow_updated_time = 0; for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; @@ -1486,6 +1486,19 @@ int rewriteAppendOnlyFileRio(rio *aof) { processed = aof->processed_bytes; aofReadDiffFromParent(); } + + /* Update COW info every 1 second (approximately). + * in order to avoid calling mstime() on each iteration, we will + * check the diff every 1024 keys */ + if ((key_count & 1023) == 0) { + key_count = 0; + long long now = mstime(); + if (now - cow_updated_time >= 1000) { + sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + cow_updated_time = now; + } + } + key_count++; } dictReleaseIterator(di); di = NULL; @@ -1579,8 +1592,31 @@ int rewriteAppendOnlyFile(char *filename) { serverLog(LL_NOTICE, "Concatenating %.2f MB of AOF diff received from parent.", (double) sdslen(server.aof_child_diff) / (1024*1024)); - if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) - goto werr; + + /* Now we write the entire AOF buffer we received from the parent + * via the pipe during the life of this fork child. + * once a second, we'll take a break and send updated COW info to the parent */ + size_t bytes_to_write = sdslen(server.aof_child_diff); + const char *buf = server.aof_child_diff; + long long cow_updated_time = mstime(); + + while (bytes_to_write) { + /* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */ + size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20); + + if (rioWrite(&aof,buf,chunk_size) == 0) + goto werr; + + bytes_to_write -= chunk_size; + buf += chunk_size; + + /* Update COW info */ + long long now = mstime(); + if (now - cow_updated_time >= 1000) { + sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + cow_updated_time = now; + } + } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp)) goto werr; @@ -1703,7 +1739,6 @@ int rewriteAppendOnlyFileBackground(void) { if (hasActiveChildProcess()) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR; - openChildInfoPipe(); if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) { char tmpfile[256]; @@ -1712,7 +1747,7 @@ int rewriteAppendOnlyFileBackground(void) { redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { - sendChildCOWInfo(CHILD_TYPE_AOF, "AOF rewrite"); + sendChildCOWInfo(CHILD_TYPE_AOF, 1, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); @@ -1720,7 +1755,6 @@ int rewriteAppendOnlyFileBackground(void) { } else { /* Parent */ if (childpid == -1) { - closeChildInfoPipe(); serverLog(LL_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); @@ -1731,8 +1765,7 @@ int rewriteAppendOnlyFileBackground(void) { "Background append only file rewriting started by pid %ld",(long) childpid); server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); - server.aof_child_pid = childpid; - updateDictResizePolicy(); + /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start @@ -1745,7 +1778,7 @@ int rewriteAppendOnlyFileBackground(void) { } void bgrewriteaofCommand(client *c) { - if (server.aof_child_pid != -1) { + if (server.child_type == CHILD_TYPE_AOF) { addReplyError(c,"Background append only file rewriting already in progress"); } else if (hasActiveChildProcess()) { server.aof_rewrite_scheduled = 1; @@ -1803,7 +1836,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { * rewritten AOF. */ latencyStartMonitor(latency); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", - (int)server.aof_child_pid); + (int)server.child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (newfd == -1) { serverLog(LL_WARNING, @@ -1909,7 +1942,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_state = AOF_ON; /* Asynchronously close the overwritten AOF. */ - if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); + if (oldfd != -1) bioCreateCloseJob(oldfd); serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); @@ -1931,8 +1964,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { cleanup: aofClosePipes(); aofRewriteBufferReset(); - aofRemoveTempFile(server.aof_child_pid); - server.aof_child_pid = -1; + aofRemoveTempFile(server.child_pid); server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ diff --git a/src/bio.c b/src/bio.c index a11bcb18b..c6e17f49d 100644 --- a/src/bio.c +++ b/src/bio.c @@ -78,15 +78,13 @@ static unsigned long long bio_pending[BIO_NUM_OPS]; * file as the API does not expose the internals at all. */ struct bio_job { time_t time; /* Time at which the job was created. */ - /* Job specific arguments pointers. If we need to pass more than three - * arguments we can just pass a pointer to a structure or alike. */ - void *arg1, *arg2, *arg3; + /* Job specific arguments.*/ + int fd; /* Fd for file based background jobs */ + lazy_free_fn *free_fn; /* Function that will free the provided arguments */ + void *free_args[]; /* List of arguments to be passed to the free function */ }; void *bioProcessBackgroundJobs(void *arg); -void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); -void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ @@ -128,13 +126,8 @@ void bioInit(void) { } } -void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { - struct bio_job *job = zmalloc(sizeof(*job)); - +void bioSubmitJob(int type, struct bio_job *job) { job->time = time(NULL); - job->arg1 = arg1; - job->arg2 = arg2; - job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; @@ -142,6 +135,35 @@ void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { pthread_mutex_unlock(&bio_mutex[type]); } +void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { + va_list valist; + /* Allocate memory for the job structure and all required + * arguments */ + struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); + job->free_fn = free_fn; + + va_start(valist, arg_count); + for (int i = 0; i < arg_count; i++) { + job->free_args[i] = va_arg(valist, void *); + } + va_end(valist); + bioSubmitJob(BIO_LAZY_FREE, job); +} + +void bioCreateCloseJob(int fd) { + struct bio_job *job = zmalloc(sizeof(*job)); + job->fd = fd; + + bioSubmitJob(BIO_CLOSE_FILE, job); +} + +void bioCreateFsyncJob(int fd) { + struct bio_job *job = zmalloc(sizeof(*job)); + job->fd = fd; + + bioSubmitJob(BIO_AOF_FSYNC, job); +} + void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; @@ -196,20 +218,11 @@ void *bioProcessBackgroundJobs(void *arg) { /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { - close((long)job->arg1); + close(job->fd); } else if (type == BIO_AOF_FSYNC) { - redis_fsync((long)job->arg1); + redis_fsync(job->fd); } else if (type == BIO_LAZY_FREE) { - /* What we free changes depending on what arguments are set: - * arg1 -> free the object at pointer. - * arg2 & arg3 -> free two dictionaries (a Redis DB). - * only arg3 -> free the radix tree. */ - if (job->arg1) - lazyfreeFreeObjectFromBioThread(job->arg1); - else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); - else if (job->arg3) - lazyfreeFreeSlotsMapFromBioThread(job->arg3); + job->free_fn(job->free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/bio.h b/src/bio.h index 6c2155941..1e6e97297 100644 --- a/src/bio.h +++ b/src/bio.h @@ -30,13 +30,17 @@ #ifndef __BIO_H #define __BIO_H +typedef void lazy_free_fn(void *args[]); + /* Exported API */ void bioInit(void); -void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); unsigned long long bioPendingJobsOfType(int type); unsigned long long bioWaitStepOfType(int type); time_t bioOlderJobOfType(int type); void bioKillThreads(void); +void bioCreateCloseJob(int fd); +void bioCreateFsyncJob(int fd); +void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); /* Background job opcodes */ #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ diff --git a/src/bitops.c b/src/bitops.c index 5e996679b..afd79ad88 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -611,7 +611,7 @@ void bitopCommand(client *c) { else if((opname[0] == 'n' || opname[0] == 'N') && !strcasecmp(opname,"not")) op = BITOP_NOT; else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } @@ -813,7 +813,7 @@ void bitcountCommand(client *c) { end = strlen-1; } else { /* Syntax error. */ - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } @@ -878,7 +878,7 @@ void bitposCommand(client *c) { end = strlen-1; } else { /* Syntax error. */ - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } @@ -970,7 +970,7 @@ void bitfieldGeneric(client *c, int flags) { } continue; } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); zfree(ops); return; } diff --git a/src/blocked.c b/src/blocked.c index d85723458..46935c79f 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -89,6 +89,12 @@ void blockClient(client *c, int btype) { server.blocked_clients++; server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); + if (btype == BLOCKED_PAUSE) { + listAddNodeTail(server.paused_clients, c); + c->paused_list_node = listLast(server.paused_clients); + /* Mark this client to execute its command */ + c->flags |= CLIENT_PENDING_COMMAND; + } } /* This function is called in the beforeSleep() function of the event loop @@ -110,6 +116,11 @@ void processUnblockedClients(void) { * client is not blocked before to proceed, but things may change and * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { + /* If we have a queued command, execute it now. */ + if (processPendingCommandsAndResetClient(c) == C_ERR) { + continue; + } + /* Then process client if it has more data in it's buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) { processInputBuffer(c); } @@ -154,6 +165,9 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); + } else if (c->btype == BLOCKED_PAUSE) { + listDelNode(server.paused_clients,c->paused_list_node); + c->paused_list_node = NULL; } else { serverPanic("Unknown btype in unblockClient()."); } @@ -200,9 +214,16 @@ void disconnectAllBlockedClients(void) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED) { - addReplySds(c,sdsnew( + /* PAUSED clients are an exception, when they'll be unblocked, the + * command processing will start from scratch, and the command will + * be either executed or rejected. (unlike LIST blocked clients for + * which the command is already in progress in a way. */ + if (c->btype == BLOCKED_PAUSE) + continue; + + addReplyError(c, "-UNBLOCKED force unblock from blocking operation, " - "instance state changed (master -> replica?)\r\n")); + "instance state changed (master -> replica?)"); unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } diff --git a/src/childinfo.c b/src/childinfo.c index d11aa7bcf..cae73fe46 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -30,6 +30,12 @@ #include "server.h" #include +typedef struct { + int process_type; /* AOF or RDB child? */ + int on_exit; /* COW size of active or exited child */ + size_t cow_size; /* Copy on write size. */ +} child_info_data; + /* Open a child-parent channel used in order to move information about the * RDB / AOF saving process from the child to the parent (for instance * the amount of copy on write memory used) */ @@ -41,7 +47,7 @@ void openChildInfoPipe(void) { } else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) { closeChildInfoPipe(); } else { - memset(&server.child_info_data,0,sizeof(server.child_info_data)); + server.child_info_nread = 0; } } @@ -54,34 +60,76 @@ void closeChildInfoPipe(void) { close(server.child_info_pipe[1]); server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; + server.child_info_nread = 0; } } -/* Send COW data to parent. The child should call this function after populating - * the corresponding fields it want to sent (according to the process type). */ -void sendChildInfo(int ptype) { +/* Send COW data to parent. */ +void sendChildInfo(int process_type, int on_exit, size_t cow_size) { if (server.child_info_pipe[1] == -1) return; - server.child_info_data.magic = CHILD_INFO_MAGIC; - server.child_info_data.process_type = ptype; - ssize_t wlen = sizeof(server.child_info_data); - if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) { + + child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size}; + ssize_t wlen = sizeof(buffer); + + if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) { /* Nothing to do on error, this will be detected by the other side. */ } } -/* Receive COW data from parent. */ -void receiveChildInfo(void) { - if (server.child_info_pipe[0] == -1) return; - ssize_t wlen = sizeof(server.child_info_data); - if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen && - server.child_info_data.magic == CHILD_INFO_MAGIC) - { - if (server.child_info_data.process_type == CHILD_TYPE_RDB) { - server.stat_rdb_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_AOF) { - server.stat_aof_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_MODULE) { - server.stat_module_cow_bytes = server.child_info_data.cow_size; - } +/* Update COW data. */ +void updateChildInfo(int process_type, int on_exit, size_t cow_size) { + if (!on_exit) { + server.stat_current_cow_bytes = cow_size; + return; + } + + if (process_type == CHILD_TYPE_RDB) { + server.stat_rdb_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_AOF) { + server.stat_aof_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_MODULE) { + server.stat_module_cow_bytes = cow_size; + } +} + +/* Read COW info data from the pipe. + * if complete data read into the buffer, process type, copy-on-write type and copy-on-write size + * are stored into *process_type, *on_exit and *cow_size respectively and returns 1. + * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */ +int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) { + /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */ + static child_info_data buffer; + ssize_t wlen = sizeof(buffer); + + /* Do not overlap */ + if (server.child_info_nread == wlen) server.child_info_nread = 0; + + int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread); + if (nread > 0) { + server.child_info_nread += nread; + } + + /* We have complete child info */ + if (server.child_info_nread == wlen) { + *process_type = buffer.process_type; + *on_exit = buffer.on_exit; + *cow_size = buffer.cow_size; + return 1; + } else { + return 0; + } +} + +/* Receive COW data from child. */ +void receiveChildInfo(void) { + if (server.child_info_pipe[0] == -1) return; + + int process_type; + int on_exit; + size_t cow_size; + + /* Drain the pipe and update child info so that we get the final message. */ + while (readChildInfo(&process_type, &on_exit, &cow_size)) { + updateChildInfo(process_type, on_exit, cow_size); } } diff --git a/src/cluster.c b/src/cluster.c index 8651a81d3..78c36e8d1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -47,7 +47,7 @@ clusterNode *myself = NULL; clusterNode *createClusterNode(char *nodename, int flags); -int clusterAddNode(clusterNode *node); +void clusterAddNode(clusterNode *node); void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void clusterReadHandler(connection *conn); void clusterSendPing(clusterLink *link, int type); @@ -961,12 +961,12 @@ void freeClusterNode(clusterNode *n) { } /* Add a node to the nodes hash table */ -int clusterAddNode(clusterNode *node) { +void clusterAddNode(clusterNode *node) { int retval; retval = dictAdd(server.cluster->nodes, sdsnewlen(node->name,CLUSTER_NAMELEN), node); - return (retval == DICT_OK) ? C_OK : C_ERR; + serverAssert(retval == DICT_OK); } /* Remove a node from the cluster. The function performs the high level @@ -2164,7 +2164,7 @@ int clusterProcessPacket(clusterLink *link) { resetManualFailover(); server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); + pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); /* We need to send a ping message to the replica, as it would carry @@ -3421,9 +3421,8 @@ void clusterHandleSlaveMigration(int max_slaves) { * The function can be used both to initialize the manual failover state at * startup or to abort a manual failover in progress. */ void resetManualFailover(void) { - if (server.cluster->mf_end && clientsArePaused()) { - server.clients_pause_end_time = 0; - clientsArePaused(); /* Just use the side effect of the function. */ + if (server.cluster->mf_end) { + checkClientPauseTimeoutAndReturnIfPaused(); } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; @@ -4357,28 +4356,49 @@ void clusterCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"ADDSLOTS [slot ...] -- Assign slots to current node.", -"BUMPEPOCH -- Advance the cluster config epoch.", -"COUNT-failure-reports -- Return number of failure reports for .", -"COUNTKEYSINSLOT - Return the number of keys in .", -"DELSLOTS [slot ...] -- Delete slots information from current node.", -"FAILOVER [force|takeover] -- Promote current replica node to being a master.", -"FORGET -- Remove a node from the cluster.", -"GETKEYSINSLOT -- Return key names stored by current node in a slot.", -"FLUSHSLOTS -- Delete current node own slots information.", -"INFO - Return information about the cluster.", -"KEYSLOT -- Return the hash slot for .", -"MEET [bus-port] -- Connect nodes into a working cluster.", -"MYID -- Return the node id.", -"NODES -- Return cluster configuration seen by node. Output format:", -" ... ", -"REPLICATE -- Configure current node as replica to .", -"RESET [hard|soft] -- Reset current node (default: soft).", -"SET-config-epoch - Set config epoch of current node.", -"SETSLOT (importing|migrating|stable|node ) -- Set slot state.", -"REPLICAS -- Return replicas.", -"SAVECONFIG - Force saving cluster configuration on disk.", -"SLOTS -- Return information about slots range mappings. Each range is made of:", +"ADDSLOTS [ ...]", +" Assign slots to current node.", +"BUMPEPOCH", +" Advance the cluster config epoch.", +"COUNT-FAILURE-REPORTS ", +" Return number of failure reports for .", +"COUNTKEYSINSLOT ", +" Return the number of keys in .", +"DELSLOTS [ ...]", +" Delete slots information from current node.", +"FAILOVER [FORCE|TAKEOVER]", +" Promote current replica node to being a master.", +"FORGET ", +" Remove a node from the cluster.", +"GETKEYSINSLOT ", +" Return key names stored by current node in a slot.", +"FLUSHSLOTS", +" Delete current node own slots information.", +"INFO", +" Return information about the cluster.", +"KEYSLOT ", +" Return the hash slot for .", +"MEET []", +" Connect nodes into a working cluster.", +"MYID", +" Return the node id.", +"NODES", +" Return cluster configuration seen by node. Output format:", +" ...", +"REPLICATE ", +" Configure current node as replica to .", +"RESET [HARD|SOFT]", +" Reset current node (default: soft).", +"SET-CONFIG-EPOCH ", +" Set config epoch of current node.", +"SETSLOT (IMPORTING|MIGRATING|STABLE|NODE )", +" Set slot state.", +"REPLICAS ", +" Return replicas.", +"SAVECONFIG", +" Force saving cluster configuration on disk.", +"SLOTS", +" Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL }; @@ -4820,7 +4840,7 @@ NULL takeover = 1; force = 1; /* Takeover also implies force. */ } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -4911,7 +4931,7 @@ NULL } else if (!strcasecmp(c->argv[2]->ptr,"soft")) { hard = 0; } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -5049,7 +5069,7 @@ void restoreCommand(client *c) { } j++; /* Consume additional arg. */ } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -5057,7 +5077,7 @@ void restoreCommand(client *c) { /* Make sure this key does not already exist here... */ robj *key = c->argv[1]; if (!replace && lookupKeyWrite(c->db,key) != NULL) { - addReply(c,shared.busykeyerr); + addReplyErrorObject(c,shared.busykeyerr); return; } @@ -5170,8 +5190,7 @@ migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long ti conn = server.tls_cluster ? connCreateTLS() : connCreateSocket(); if (connBlockingConnect(conn, c->argv[1]->ptr, atoi(c->argv[2]->ptr), timeout) != C_OK) { - addReplySds(c, - sdsnew("-IOERR error or timeout connecting to the client\r\n")); + addReplyError(c,"-IOERR error or timeout connecting to the client"); connClose(conn); sdsfree(name); return NULL; @@ -5259,14 +5278,14 @@ void migrateCommand(client *c) { replace = 1; } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { if (!moreargs) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } j++; password = c->argv[j]->ptr; } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) { if (moreargs < 2) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } username = c->argv[++j]->ptr; @@ -5282,7 +5301,7 @@ void migrateCommand(client *c) { num_keys = c->argc - j - 1; break; /* All the remaining args are keys. */ } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -5763,7 +5782,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * cluster is down. */ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; - } else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand) + } else if ((cmd->flags & CMD_WRITE) && !(cmd->proc == evalCommand) && !(cmd->proc == evalShaCommand)) { /* The cluster is configured to allow read only commands @@ -5812,11 +5831,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Handle the read-only client case reading from a slave: if this * node is a slave and the request is about a hash slot our master * is serving, we can reply without redirection. */ - int is_readonly_command = (c->cmd->flags & CMD_READONLY) || - (c->cmd->proc == execCommand && !(c->mstate.cmd_inv_flags & CMD_READONLY)); + int is_write_command = (c->cmd->flags & CMD_WRITE) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); if (c->flags & CLIENT_READONLY && - (is_readonly_command || cmd->proc == evalCommand || - cmd->proc == evalShaCommand) && + (!is_write_command || cmd->proc == evalCommand || cmd->proc == evalShaCommand) && nodeIsSlave(myself) && myself->slaveof == n) { @@ -5838,23 +5856,23 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * be set to the hash slot that caused the redirection. */ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { if (error_code == CLUSTER_REDIR_CROSS_SLOT) { - addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); + addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot"); } else if (error_code == CLUSTER_REDIR_UNSTABLE) { /* The request spawns multiple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ - addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); + addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot"); } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n")); + addReplyError(c,"-CLUSTERDOWN The cluster is down"); } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { - addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down and only accepts read commands\r\n")); + addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands"); } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { - addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n")); + addReplyError(c,"-CLUSTERDOWN Hash slot not served"); } else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK) { - addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", + addReplyErrorSds(c,sdscatprintf(sdsempty(), + "-%s %d %s:%d", (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot,n->ip,n->port)); } else { @@ -5901,7 +5919,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { /* if the client is read-only and attempting to access key that our * replica can handle, allow it. */ if ((c->flags & CLIENT_READONLY) && - (c->lastcmd->flags & CMD_READONLY) && + !(c->lastcmd->flags & CMD_WRITE) && nodeIsSlave(myself) && myself->slaveof == node) { node = myself; diff --git a/src/cluster.h b/src/cluster.h index 7e5f79c87..d58f350ce 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -40,7 +40,7 @@ typedef struct clusterLink { sds sndbuf; /* Packet send buffer */ char *rcvbuf; /* Packet reception buffer */ size_t rcvbuf_len; /* Used size of rcvbuf */ - size_t rcvbuf_alloc; /* Used size of rcvbuf */ + size_t rcvbuf_alloc; /* Allocated size of rcvbuf */ struct clusterNode *node; /* Node related to this link if any, or NULL */ } clusterLink; diff --git a/src/config.c b/src/config.c index c858df3f3..2e109dbae 100644 --- a/src/config.c +++ b/src/config.c @@ -166,6 +166,15 @@ typedef struct stringConfigData { be stored as a NULL value. */ } stringConfigData; +typedef struct sdsConfigData { + sds *config; /* Pointer to the server config this value is stored in. */ + const char *default_value; /* Default value of the config on rewrite. */ + int (*is_valid_fn)(sds val, char **err); /* Optional function to check validity of new value (generic doc above) */ + int (*update_fn)(sds val, sds prev, char **err); /* Optional function to apply new value at runtime (generic doc above) */ + int convert_empty_to_null; /* Boolean indicating if empty SDS strings should + be stored as a NULL value. */ +} sdsConfigData; + typedef struct enumConfigData { int *config; /* The pointer to the server config this value is stored in */ configEnum *enum_value; /* The underlying enum type this data represents */ @@ -212,6 +221,7 @@ typedef struct numericConfigData { typedef union typeData { boolConfigData yesno; stringConfigData string; + sdsConfigData sds; enumConfigData enumd; numericConfigData numeric; } typeData; @@ -512,7 +522,7 @@ void loadServerConfigFromString(char *config) { } server.repl_state = REPL_STATE_CONNECT; } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { - if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { + if (sdslen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; goto loaderr; } @@ -524,10 +534,10 @@ void loadServerConfigFromString(char *config) { sdsfree(server.requirepass); server.requirepass = NULL; if (sdslen(argv[1])) { - sds aclop = sdscatprintf(sdsempty(),">%s",argv[1]); + sds aclop = sdscatlen(sdsnew(">"), argv[1], sdslen(argv[1])); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); - server.requirepass = sdsnew(argv[1]); + server.requirepass = sdsdup(argv[1]); } else { ACLSetUser(DefaultUser,"nopass",-1); } @@ -751,10 +761,10 @@ void configSetCommand(client *c) { sdsfree(server.requirepass); server.requirepass = NULL; if (sdslen(o->ptr)) { - sds aclop = sdscatprintf(sdsempty(),">%s",(char*)o->ptr); + sds aclop = sdscatlen(sdsnew(">"), o->ptr, sdslen(o->ptr)); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); - server.requirepass = sdsnew(o->ptr); + server.requirepass = sdsdup(o->ptr); } else { ACLSetUser(DefaultUser,"nopass",-1); } @@ -905,7 +915,7 @@ badfmt: /* Bad format errors */ addReplyBulkCString(c,_var ? _var : ""); \ matches++; \ } \ -} while(0); +} while(0) #define config_get_bool_field(_name,_var) do { \ if (stringmatch(pattern,_name,1)) { \ @@ -913,7 +923,7 @@ badfmt: /* Bad format errors */ addReplyBulkCString(c,_var ? "yes" : "no"); \ matches++; \ } \ -} while(0); +} while(0) #define config_get_numerical_field(_name,_var) do { \ if (stringmatch(pattern,_name,1)) { \ @@ -922,8 +932,7 @@ badfmt: /* Bad format errors */ addReplyBulkCString(c,buf); \ matches++; \ } \ -} while(0); - +} while(0) void configGetCommand(client *c) { robj *o = c->argv[2]; @@ -1330,6 +1339,28 @@ void rewriteConfigStringOption(struct rewriteConfigState *state, const char *opt rewriteConfigRewriteLine(state,option,line,force); } +/* Rewrite a SDS string option. */ +void rewriteConfigSdsOption(struct rewriteConfigState *state, const char *option, sds value, const sds defvalue) { + int force = 1; + sds line; + + /* If there is no value set, we don't want the SDS option + * to be present in the configuration at all. */ + if (value == NULL) { + rewriteConfigMarkAsProcessed(state, option); + return; + } + + /* Set force to zero if the value is set to its default. */ + if (defvalue && sdscmp(value, defvalue) == 0) force = 0; + + line = sdsnew(option); + line = sdscatlen(line, " ", 1); + line = sdscatrepr(line, value, sdslen(value)); + + rewriteConfigRewriteLine(state, option, line, force); +} + /* Rewrite a numerical (long long range) option. */ void rewriteConfigNumericalOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) { int force = value != defvalue; @@ -1802,22 +1833,14 @@ static void boolConfigRewrite(typeData data, const char *name, struct rewriteCon /* String Configs */ static void stringConfigInit(typeData data) { - if (data.string.convert_empty_to_null) { - *data.string.config = data.string.default_value ? zstrdup(data.string.default_value) : NULL; - } else { - *data.string.config = zstrdup(data.string.default_value); - } + *data.string.config = (data.string.convert_empty_to_null && !data.string.default_value) ? NULL : zstrdup(data.string.default_value); } static int stringConfigSet(typeData data, sds value, int update, char **err) { if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err)) return 0; char *prev = *data.string.config; - if (data.string.convert_empty_to_null) { - *data.string.config = value[0] ? zstrdup(value) : NULL; - } else { - *data.string.config = zstrdup(value); - } + *data.string.config = (data.string.convert_empty_to_null && !value[0]) ? NULL : zstrdup(value); if (update && data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) { zfree(*data.string.config); *data.string.config = prev; @@ -1835,6 +1858,38 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC rewriteConfigStringOption(state, name,*(data.string.config), data.string.default_value); } +/* SDS Configs */ +static void sdsConfigInit(typeData data) { + *data.sds.config = (data.sds.convert_empty_to_null && !data.sds.default_value) ? NULL: sdsnew(data.sds.default_value); +} + +static int sdsConfigSet(typeData data, sds value, int update, char **err) { + if (data.sds.is_valid_fn && !data.sds.is_valid_fn(value, err)) + return 0; + sds prev = *data.sds.config; + *data.sds.config = (data.sds.convert_empty_to_null && (sdslen(value) == 0)) ? NULL : sdsdup(value); + if (update && data.sds.update_fn && !data.sds.update_fn(*data.sds.config, prev, err)) { + sdsfree(*data.sds.config); + *data.sds.config = prev; + return 0; + } + sdsfree(prev); + return 1; +} + +static void sdsConfigGet(client *c, typeData data) { + if (*data.sds.config) { + addReplyBulkSds(c, sdsdup(*data.sds.config)); + } else { + addReplyBulkCString(c, ""); + } +} + +static void sdsConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { + rewriteConfigSdsOption(state, name, *(data.sds.config), data.sds.default_value ? sdsnew(data.sds.default_value) : NULL); +} + + #define ALLOW_EMPTY_STRING 0 #define EMPTY_STRING_IS_NULL 1 @@ -1850,6 +1905,18 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC } \ } +#define createSDSConfig(name, alias, modifiable, empty_to_null, config_addr, default, is_valid, update) { \ + embedCommonConfig(name, alias, modifiable) \ + embedConfigInterface(sdsConfigInit, sdsConfigSet, sdsConfigGet, sdsConfigRewrite) \ + .data.sds = { \ + .config = &(config_addr), \ + .default_value = (default), \ + .is_valid_fn = (is_valid), \ + .update_fn = (update), \ + .convert_empty_to_null = (empty_to_null), \ + } \ +} + /* Enum configs */ static void enumConfigInit(typeData data) { *data.enumd.config = data.enumd.default_value; @@ -2349,7 +2416,6 @@ standardConfig configs[] = { createStringConfig("pidfile", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.pidfile, NULL, NULL, NULL), createStringConfig("replica-announce-ip", "slave-announce-ip", MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.slave_announce_ip, NULL, NULL, NULL), createStringConfig("masteruser", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.masteruser, NULL, NULL, NULL), - createStringConfig("masterauth", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.masterauth, NULL, NULL, NULL), createStringConfig("cluster-announce-ip", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_ip, NULL, NULL, NULL), createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL), createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL), @@ -2358,6 +2424,10 @@ standardConfig configs[] = { createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL), createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL), createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL), + createStringConfig("ignore-warnings", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.ignore_warnings, "", NULL, NULL), + + /* SDS Configs */ + createSDSConfig("masterauth", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.masterauth, NULL, NULL, NULL), /* Enum Configs */ createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, server.supervised_mode, SUPERVISED_NONE, NULL, NULL), @@ -2477,12 +2547,17 @@ void configCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"GET -- Return parameters matching the glob-like and their values.", -"SET -- Set parameter to value.", -"RESETSTAT -- Reset statistics reported by INFO.", -"REWRITE -- Rewrite the configuration file.", +"GET ", +" Return parameters matching the glob-like and their values.", +"SET ", +" Set the configuration to .", +"RESETSTAT", +" Reset statistics reported by the INFO command.", +"REWRITE", +" Rewrite the configuration file.", NULL }; + addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr,"set") && c->argc == 4) { configSetCommand(c); @@ -2491,6 +2566,7 @@ NULL } else if (!strcasecmp(c->argv[1]->ptr,"resetstat") && c->argc == 2) { resetServerStats(); resetCommandTableStats(); + resetErrorTableStats(); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"rewrite") && c->argc == 2) { if (server.configfile == NULL) { diff --git a/src/crc64.c b/src/crc64.c index 4cbc019f6..6c9432c4a 100644 --- a/src/crc64.c +++ b/src/crc64.c @@ -134,7 +134,7 @@ int crc64Test(int argc, char *argv[]) { printf("[calcula]: e9c6d914c4b8d9ca == %016" PRIx64 "\n", (uint64_t)_crc64(0, "123456789", 9)); printf("[64speed]: e9c6d914c4b8d9ca == %016" PRIx64 "\n", - (uint64_t)crc64(0, "123456789", 9)); + (uint64_t)crc64(0, (unsigned char*)"123456789", 9)); char li[] = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed " "do eiusmod tempor incididunt ut labore et dolore magna " "aliqua. Ut enim ad minim veniam, quis nostrud exercitation " @@ -146,7 +146,7 @@ int crc64Test(int argc, char *argv[]) { printf("[calcula]: c7794709e69683b3 == %016" PRIx64 "\n", (uint64_t)_crc64(0, li, sizeof(li))); printf("[64speed]: c7794709e69683b3 == %016" PRIx64 "\n", - (uint64_t)crc64(0, li, sizeof(li))); + (uint64_t)crc64(0, (unsigned char*)li, sizeof(li))); return 0; } diff --git a/src/crcspeed.c b/src/crcspeed.c index d4955bfc9..67cb8fd9f 100644 --- a/src/crcspeed.c +++ b/src/crcspeed.c @@ -35,7 +35,8 @@ void crcspeed64little_init(crcfn64 crcfn, uint64_t table[8][256]) { /* generate CRCs for all single byte sequences */ for (int n = 0; n < 256; n++) { - table[0][n] = crcfn(0, &n, 1); + unsigned char v = n; + table[0][n] = crcfn(0, &v, 1); } /* generate nested CRC table for future slice-by-8 lookup */ diff --git a/src/db.c b/src/db.c index 5045935c3..5d63566a7 100644 --- a/src/db.c +++ b/src/db.c @@ -433,7 +433,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { /* Make sure the WATCHed keys are affected by the FLUSH* commands. * Note that we need to call the function while the keys are still * there. */ - signalFlushedDb(dbnum); + signalFlushedDb(dbnum, async); /* Empty redis database structure. */ removed = emptyDbStructure(server.db, dbnum, async, callback); @@ -572,9 +572,20 @@ void signalModifiedKey(client *c, redisDb *db, robj *key) { trackingInvalidateKey(c,key); } -void signalFlushedDb(int dbid) { - touchWatchedKeysOnFlush(dbid); - trackingInvalidateKeysOnFlush(dbid); +void signalFlushedDb(int dbid, int async) { + int startdb, enddb; + if (dbid == -1) { + startdb = 0; + enddb = server.dbnum-1; + } else { + startdb = enddb = dbid; + } + + for (int j = startdb; j <= enddb; j++) { + touchAllWatchedKeysInDb(&server.db[j], NULL); + } + + trackingInvalidateKeysOnFlush(async); } /*----------------------------------------------------------------------------- @@ -593,7 +604,7 @@ int getFlushCommandFlags(client *c, int *flags) { /* Parse the optional ASYNC option. */ if (c->argc > 1) { if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return C_ERR; } *flags = EMPTYDB_ASYNC; @@ -606,7 +617,7 @@ int getFlushCommandFlags(client *c, int *flags) { /* Flushes the whole server data set. */ void flushAllDataAndResetRDB(int flags) { server.dirty += emptyDb(-1,flags,NULL); - if (server.rdb_child_pid != -1) killRDBChild(); + if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); if (server.saveparamslen > 0) { /* Normally rdbSave() will reset dirty, but we don't want this here * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ @@ -616,6 +627,9 @@ void flushAllDataAndResetRDB(int flags) { rdbSave(server.rdb_filename,rsiptr); server.dirty = saved_dirty; } + + /* Without that extra dirty++, when db was already empty, FLUSHALL will + * not be replicated nor put into the AOF. */ server.dirty++; #if defined(USE_JEMALLOC) /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. @@ -839,7 +853,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { } if (count < 1) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); goto cleanup; } @@ -858,7 +872,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { typename = c->argv[i+1]->ptr; i+= 2; } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); goto cleanup; } } @@ -1047,7 +1061,7 @@ void shutdownCommand(client *c) { int flags = 0; if (c->argc > 2) { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } else if (c->argc == 2) { if (!strcasecmp(c->argv[1]->ptr,"nosave")) { @@ -1055,7 +1069,7 @@ void shutdownCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr,"save")) { flags |= SHUTDOWN_SAVE; } else { - addReply(c,shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -1141,7 +1155,7 @@ void moveCommand(client *c) { /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ if (src == dst) { - addReply(c,shared.sameobjecterr); + addReplyErrorObject(c,shared.sameobjecterr); return; } @@ -1205,7 +1219,7 @@ void copyCommand(client *c) { selectDb(c,srcid); /* Back to the source DB */ j++; /* Consume additional arg. */ } else { - addReply(c, shared.syntaxerr); + addReplyErrorObject(c,shared.syntaxerr); return; } } @@ -1221,7 +1235,7 @@ void copyCommand(client *c) { robj *key = c->argv[1]; robj *newkey = c->argv[2]; if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) { - addReply(c,shared.sameobjecterr); + addReplyErrorObject(c,shared.sameobjecterr); return; } @@ -1328,9 +1342,14 @@ int dbSwapDatabases(long id1, long id2) { * However normally we only do this check for efficiency reasons * in dbAdd() when a list is created. So here we need to rescan * the list of clients blocked on lists and signal lists as ready - * if needed. */ + * if needed. + * + * Also the swapdb should make transaction fail if there is any + * client watching keys */ scanDatabaseForReadyLists(db1); + touchAllWatchedKeysInDb(db1, db2); scanDatabaseForReadyLists(db2); + touchAllWatchedKeysInDb(db2, db1); return C_OK; } @@ -1501,6 +1520,12 @@ int expireIfNeeded(redisDb *db, robj *key) { * we think the key is expired at this time. */ if (server.masterhost != NULL) return 1; + /* If clients are paused, we keep the current dataset constant, + * but return to the client what we believe is the right state. Typically, + * at the end of the pause we will properly expire the key OR we will + * have failed over and the new primary will send us the expire. */ + if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; + /* Delete the key */ server.stat_expiredkeys++; propagateExpire(db,key,server.lazyfree_lazy_expire); diff --git a/src/debug.c b/src/debug.c index 6ce8b3bdc..a725e5a30 100644 --- a/src/debug.c +++ b/src/debug.c @@ -381,39 +381,88 @@ void mallctl_string(client *c, robj **argv, int argc) { void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"ASSERT -- Crash by assertion failed.", -"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", -"CRASH-AND-RECOVER -- Hard crash and restart after delay.", -"DIGEST -- Output a hex signature representing the current DB content.", -"DIGEST-VALUE ... -- Output a hex signature of the values of all the specified keys.", -"DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false]", -"ERROR -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", -"LOG -- write message to the server log.", -"LEAK -- Create a memory leak of the input string.", -"HTSTATS -- Return hash table statistics of the specified Redis database.", -"HTSTATS-KEY -- Like htstats but for the hash table stored as key's value.", -"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", -"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", -"OBJECT -- Show low level info about key and associated value.", -"OOM -- Crash the server simulating an out-of-memory error.", -"PANIC -- Crash the server simulating a panic.", -"POPULATE [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", -"RELOAD [MERGE] [NOFLUSH] [NOSAVE] -- Save the RDB on disk and reload it back in memory. By default it will save the RDB file and load it back. With the NOFLUSH option the current database is not removed before loading the new one, but conflicts in keys will kill the server with an exception. When MERGE is used, conflicting keys will be loaded (the key in the loaded RDB file will win). When NOSAVE is used, the server will not save the current dataset in the RDB file before loading. Use DEBUG RELOAD NOSAVE when you want just to load the RDB file you placed in the Redis working directory in order to replace the current dataset in memory. Use DEBUG RELOAD NOSAVE NOFLUSH MERGE when you want to add what is in the current RDB file placed in the Redis current directory, with the current memory content. Use DEBUG RELOAD when you want to verify Redis is able to persist the current dataset in the RDB file, flush the memory content, and load it back.", -"RESTART -- Graceful restart: save config, db, restart.", -"SDSLEN -- Show low level SDS string info representing key and value.", -"SEGFAULT -- Crash the server with sigsegv.", -"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", -"SET-SKIP-CHECKSUM-VALIDATION <0|1> -- Enables or disables checksum checks for rdb or RESTORE payload.", -"AOF-FLUSH-SLEEP -- Server will sleep before flushing the AOF, this is used for testing", -"SLEEP -- Stop the server for . Decimals allowed.", -"STRUCTSIZE -- Return the size of different Redis core C structures.", -"ZIPLIST -- Show low level info about the ziplist encoding.", -"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", -"CONFIG-REWRITE-FORCE-ALL -- Like CONFIG REWRITE but writes all configuration options, including keywords not listed in original configuration file or default values.", +"AOF-FLUSH-SLEEP ", +" Server will sleep before flushing the AOF, this is used for testing.", +"ASSERT", +" Crash by assertion failed.", +"CHANGE-REPL-ID" +" Change the replication IDs of the instance.", +" Dangerous: should be used only for testing the replication subsystem.", +"CONFIG-REWRITE-FORCE-ALL", +" Like CONFIG REWRITE but writes all configuration options, including", +" keywords not listed in original configuration file or default values.", +"CRASH-AND-RECOVER ", +" Hard crash and restart after a delay.", +"DIGEST", +" Output a hex signature representing the current DB content.", +"DIGEST-VALUE [ ...]", +" Output a hex signature of the values of all the specified keys.", +"ERROR ", +" Return a Redis protocol error with as message. Useful for clients", +" unit tests to simulate Redis errors.", +"LOG ", +" Write to the server log.", +"HTSTATS ", +" Return hash table statistics of the specified Redis database.", +"HTSTATS-KEY ", +" Like HTSTATS but for the hash table stored at 's value.", +"LOADAOF", +" Flush the AOF buffers on disk and reload the AOF in memory.", +"LUA-ALWAYS-REPLICATE-COMMANDS <0|1>", +" Setting it to 1 makes Lua replication defaulting to replicating single", +" commands, without the script having to enable effects replication.", #ifdef USE_JEMALLOC -"MALLCTL [] -- Get or set a malloc tunning integer.", -"MALLCTL-STR [] -- Get or set a malloc tunning string.", +"MALLCTL []", +" Get or set a malloc tuning integer.", +"MALLCTL-STR []", +" Get or set a malloc tuning string.", #endif +"OBJECT ", +" Show low level info about `key` and associated value.", +"OOM", +" Crash the server simulating an out-of-memory error.", +"PANIC", +" Crash the server simulating a panic.", +"POPULATE [] []", +" Create string keys named key:. If is specified then", +" it is used instead of the 'key' prefix.", +"DEBUG PROTOCOL ", +" Reply with a test value of the specified type. can be: string,", +" integer, double, bignum, null, array, set, map, attrib, push, verbatim,", +" true, false.", +"RELOAD [option ...]", +" Save the RDB on disk and reload it back to memory. Valid