diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4d6c1c14c..d28d331db 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,3 +47,22 @@ jobs: - name: make run: make MALLOC=libc + build-centos7-jemalloc: + runs-on: ubuntu-latest + container: centos:7 + steps: + - uses: actions/checkout@v2 + - name: make + run: | + yum -y install gcc make + make + + build-centos6-jemalloc: + runs-on: ubuntu-latest + container: centos:6 + steps: + - uses: actions/checkout@v1 + - name: make + run: | + yum -y install gcc make + make diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 087b9f2ef..1ccd99805 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -124,12 +124,33 @@ jobs: - uses: actions/checkout@v2 - name: make run: | - yum -y install centos-release-scl - yum -y install devtoolset-7 - scl enable devtoolset-7 "make" + yum -y install gcc make + make - name: test run: | - yum -y install tcl + yum -y install which tcl + ./runtest --accurate --verbose + - name: module api test + run: ./runtest-moduleapi --verbose + - name: sentinel tests + run: ./runtest-sentinel + - name: cluster tests + run: ./runtest-cluster + + test-centos6-jemalloc: + runs-on: ubuntu-latest + if: github.repository == 'redis/redis' + container: centos:6 + timeout-minutes: 14400 + steps: + - uses: actions/checkout@v1 + - name: make + run: | + yum -y install gcc make + make + - name: test + run: | + yum -y install which tcl util-linux-ng ./runtest --accurate --verbose - name: module api test run: ./runtest-moduleapi --verbose diff --git a/src/Makefile b/src/Makefile index 9ff344c6a..6414b8c78 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram NODEPS:=clean distclean # Default settings -STD=-std=c11 -pedantic -DREDIS_STATIC='' +STD=-pedantic -DREDIS_STATIC='' ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions @@ -29,6 +29,16 @@ endif WARN=-Wall -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) +# Detect if the compiler supports C11 _Atomic +C11_ATOMIC := $(shell sh -c 'echo "\#include " > foo.c; \ + $(CC) -std=c11 -c foo.c -o foo.o &> /dev/null; \ + if [ -f foo.o ]; then echo "yes"; rm foo.o; fi; rm foo.c') +ifeq ($(C11_ATOMIC),yes) + STD+=-std=c11 +else + STD+=-std=c99 +endif + PREFIX?=/usr/local INSTALL_BIN=$(PREFIX)/bin INSTALL=install @@ -367,7 +377,7 @@ valgrind: $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" helgrind: - $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" + $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" REDIS_CFLAGS="-I/usr/local/include" REDIS_LDFLAGS="-L/usr/local/lib" src/help.h: @../utils/generate-command-help.rb > help.h diff --git a/src/atomicvar.h b/src/atomicvar.h index ecd26ad70..6ac04c605 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -1,5 +1,5 @@ -/* This file implements atomic counters using __atomic or __sync macros if - * available, otherwise synchronizing different threads using a mutex. +/* This file implements atomic counters using c11 _Atomic, __atomic or __sync + * macros if available, otherwise we will throw an error when compile. * * The exported interface is composed of three macros: * @@ -8,16 +8,8 @@ * atomicDecr(var,count) -- Decrement the atomic counter * atomicGet(var,dstvar) -- Fetch the atomic counter value * atomicSet(var,value) -- Set the atomic counter value - * - * The variable 'var' should also have a declared mutex with the same - * name and the "_mutex" postfix, for instance: - * - * long myvar; - * pthread_mutex_t myvar_mutex; - * atomicSet(myvar,12345); - * - * If atomic primitives are available (tested in config.h) the mutex - * is not used. + * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization + * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization * * Never use return value from the macros, instead use the AtomicGetIncr() * if you need to get the current value and increment it atomically, like @@ -58,17 +50,64 @@ */ #include +#include "config.h" #ifndef __ATOMIC_VAR_H #define __ATOMIC_VAR_H +/* Define redisAtomic for atomic variable. */ +#define redisAtomic + /* To test Redis with Helgrind (a Valgrind tool) it is useful to define * the following macro, so that __sync macros are used: those can be detected * by Helgrind (even if they are less efficient) so that no false positive * is reported. */ // #define __ATOMIC_VAR_FORCE_SYNC_MACROS -#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__ATOMIC_RELAXED) && !defined(__sun) && (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) +/* There will be many false positives if we test Redis with Helgrind, since + * Helgrind can't understand we have imposed ordering on the program, so + * we use macros in helgrind.h to tell Helgrind inter-thread happens-before + * relationship explicitly for avoiding false positives. + * + * For more details, please see: valgrind/helgrind.h and + * https://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.effective-use + * + * These macros take effect only when 'make helgrind', and you must first + * install Valgrind in the default path configuration. */ +#ifdef __ATOMIC_VAR_FORCE_SYNC_MACROS +#include +#else +#define ANNOTATE_HAPPENS_BEFORE(v) ((void) v) +#define ANNOTATE_HAPPENS_AFTER(v) ((void) v) +#endif + +#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__STDC_VERSION__) && \ + (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__) +/* Use '_Atomic' keyword if the compiler supports. */ +#undef redisAtomic +#define redisAtomic _Atomic +/* Implementation using _Atomic in C11. */ + +#include +#define atomicIncr(var,count) atomic_fetch_add_explicit(&var,(count),memory_order_relaxed) +#define atomicGetIncr(var,oldvalue_var,count) do { \ + oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \ +} while(0) +#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed) +#define atomicGet(var,dstvar) do { \ + dstvar = atomic_load_explicit(&var,memory_order_relaxed); \ +} while(0) +#define atomicSet(var,value) atomic_store_explicit(&var,value,memory_order_relaxed) +#define atomicGetWithSync(var,dstvar) do { \ + dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \ +} while(0) +#define atomicSetWithSync(var,value) \ + atomic_store_explicit(&var,value,memory_order_seq_cst) +#define REDIS_ATOMIC_API "c11-builtin" + +#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && !defined(__sun) && \ + (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) && \ + defined(__ATOMIC_RELAXED) && defined(__ATOMIC_SEQ_CST) /* Implementation using __atomic macros. */ #define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) @@ -80,6 +119,11 @@ dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \ } while(0) #define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED) +#define atomicGetWithSync(var,dstvar) do { \ + dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \ +} while(0) +#define atomicSetWithSync(var,value) \ + __atomic_store_n(&var,value,__ATOMIC_SEQ_CST) #define REDIS_ATOMIC_API "atomic-builtin" #elif defined(HAVE_ATOMIC) @@ -96,38 +140,19 @@ #define atomicSet(var,value) do { \ while(!__sync_bool_compare_and_swap(&var,var,value)); \ } while(0) +/* Actually the builtin issues a full memory barrier by default. */ +#define atomicGetWithSync(var,dstvar) { \ + dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \ + ANNOTATE_HAPPENS_AFTER(&var); \ +} while(0) +#define atomicSetWithSync(var,value) do { \ + ANNOTATE_HAPPENS_BEFORE(&var); \ + while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \ +} while(0) #define REDIS_ATOMIC_API "sync-builtin" #else -/* Implementation using pthread mutex. */ - -#define atomicIncr(var,count) do { \ - pthread_mutex_lock(&var ## _mutex); \ - var += (count); \ - pthread_mutex_unlock(&var ## _mutex); \ -} while(0) -#define atomicGetIncr(var,oldvalue_var,count) do { \ - pthread_mutex_lock(&var ## _mutex); \ - oldvalue_var = var; \ - var += (count); \ - pthread_mutex_unlock(&var ## _mutex); \ -} while(0) -#define atomicDecr(var,count) do { \ - pthread_mutex_lock(&var ## _mutex); \ - var -= (count); \ - pthread_mutex_unlock(&var ## _mutex); \ -} while(0) -#define atomicGet(var,dstvar) do { \ - pthread_mutex_lock(&var ## _mutex); \ - dstvar = var; \ - pthread_mutex_unlock(&var ## _mutex); \ -} while(0) -#define atomicSet(var,value) do { \ - pthread_mutex_lock(&var ## _mutex); \ - var = value; \ - pthread_mutex_unlock(&var ## _mutex); \ -} while(0) -#define REDIS_ATOMIC_API "pthread-mutex" +#error "Unable to determine atomic operations for your platform" #endif #endif /* __ATOMIC_VAR_H */ diff --git a/src/evict.c b/src/evict.c index 258e1a25e..0e8f818ff 100644 --- a/src/evict.c +++ b/src/evict.c @@ -79,7 +79,7 @@ unsigned int getLRUClock(void) { unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { - lruclock = server.lruclock; + atomicGet(server.lruclock,lruclock); } else { lruclock = getLRUClock(); } diff --git a/src/lazyfree.c b/src/lazyfree.c index 821dc50df..b0fc26fcf 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -3,8 +3,7 @@ #include "atomicvar.h" #include "cluster.h" -static size_t lazyfree_objects = 0; -pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER; +static redisAtomic size_t lazyfree_objects = 0; /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { diff --git a/src/module.c b/src/module.c index 315161ed9..a501a4830 100644 --- a/src/module.c +++ b/src/module.c @@ -357,11 +357,6 @@ unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks /* Data structures related to the redis module users */ -/* This callback type is called by moduleNotifyUserChanged() every time - * a user authenticated via the module API is associated with a different - * user or gets disconnected. */ -typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); - /* This is the object returned by RM_CreateModuleUser(). The module API is * able to create users, set ACLs to such users, and later authenticate * clients using such newly created users. */ diff --git a/src/networking.c b/src/networking.c index 6becdc288..3fa298783 100644 --- a/src/networking.c +++ b/src/networking.c @@ -103,7 +103,8 @@ client *createClient(connection *conn) { } selectDb(c,0); - uint64_t client_id = ++server.next_client_id; + uint64_t client_id; + atomicGetIncr(server.next_client_id, client_id, 1); c->id = client_id; c->resp = 2; c->conn = conn; @@ -1314,7 +1315,7 @@ client *lookupClientByID(uint64_t id) { * thread safe. */ int writeToClient(client *c, int handler_installed) { /* Update total number of writes on server */ - server.stat_total_writes_processed++; + atomicIncr(server.stat_total_writes_processed, 1); ssize_t nwritten = 0, totwritten = 0; size_t objlen; @@ -1376,7 +1377,7 @@ int writeToClient(client *c, int handler_installed) { zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - server.stat_net_output_bytes += totwritten; + atomicIncr(server.stat_net_output_bytes, totwritten); if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { nwritten = 0; @@ -1933,7 +1934,7 @@ void readQueryFromClient(connection *conn) { if (postponeClientRead(c)) return; /* Update total number of reads on server */ - server.stat_total_reads_processed++; + atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -1979,7 +1980,7 @@ void readQueryFromClient(connection *conn) { sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; - server.stat_net_input_bytes += nread; + atomicIncr(server.stat_net_input_bytes, nread); if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); @@ -2938,7 +2939,7 @@ int tio_debug = 0; pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; -_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; +redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ /* This is the list of clients each thread will serve when threaded I/O is @@ -2946,6 +2947,16 @@ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ * itself. */ list *io_threads_list[IO_THREADS_MAX_NUM]; +static inline unsigned long getIOPendingCount(int i) { + unsigned long count = 0; + atomicGetWithSync(io_threads_pending[i], count); + return count; +} + +static inline void setIOPendingCount(int i, unsigned long count) { + atomicSetWithSync(io_threads_pending[i], count); +} + void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ @@ -2959,17 +2970,17 @@ void *IOThreadMain(void *myid) { while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { - if (io_threads_pending[id] != 0) break; + if (getIOPendingCount(id) != 0) break; } /* Give the main thread a chance to stop this thread. */ - if (io_threads_pending[id] == 0) { + if (getIOPendingCount(id) == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } - serverAssert(io_threads_pending[id] != 0); + serverAssert(getIOPendingCount(id) != 0); if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); @@ -2989,7 +3000,7 @@ void *IOThreadMain(void *myid) { } } listEmpty(io_threads_list[id]); - io_threads_pending[id] = 0; + setIOPendingCount(id, 0); if (tio_debug) printf("[%ld] Done\n", id); } @@ -3018,7 +3029,7 @@ void initThreadedIO(void) { /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); - io_threads_pending[i] = 0; + setIOPendingCount(i, 0); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); @@ -3124,7 +3135,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); - io_threads_pending[j] = count; + setIOPendingCount(j, count); } /* Also use the main thread to process a slice of clients. */ @@ -3139,7 +3150,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) - pending += io_threads_pending[j]; + pending += getIOPendingCount(j); if (pending == 0) break; } if (tio_debug) printf("I/O WRITE All threads finshed\n"); @@ -3214,7 +3225,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { io_threads_op = IO_THREADS_OP_READ; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); - io_threads_pending[j] = count; + setIOPendingCount(j, count); } /* Also use the main thread to process a slice of clients. */ @@ -3229,7 +3240,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) - pending += io_threads_pending[j]; + pending += getIOPendingCount(j); if (pending == 0) break; } if (tio_debug) printf("I/O READ All threads finshed\n"); diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index a221ebdd2..7a7828184 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -76,11 +76,11 @@ static struct config { int hostport; const char *hostsocket; int numclients; - int liveclients; + redisAtomic int liveclients; int requests; - int requests_issued; - int requests_finished; - int previous_requests_finished; + redisAtomic int requests_issued; + redisAtomic int requests_finished; + redisAtomic int previous_requests_finished; int last_printed_bytes; long long previous_tick; int keysize; @@ -113,18 +113,12 @@ static struct config { struct redisConfig *redis_config; struct hdr_histogram* latency_histogram; struct hdr_histogram* current_sec_latency_histogram; - int is_fetching_slots; - int is_updating_slots; - int slots_last_update; + redisAtomic int is_fetching_slots; + redisAtomic int is_updating_slots; + redisAtomic int slots_last_update; int enable_tracking; - /* Thread mutexes to be used as fallbacks by atomicvar.h */ - pthread_mutex_t requests_issued_mutex; - pthread_mutex_t requests_finished_mutex; pthread_mutex_t liveclients_mutex; - pthread_mutex_t is_fetching_slots_mutex; pthread_mutex_t is_updating_slots_mutex; - pthread_mutex_t updating_slots_mutex; - pthread_mutex_t slots_last_update_mutex; } config; typedef struct _client { @@ -1669,13 +1663,8 @@ int main(int argc, const char **argv) { fprintf(stderr, "WARN: could not fetch server CONFIG\n"); } if (config.num_threads > 0) { - pthread_mutex_init(&(config.requests_issued_mutex), NULL); - pthread_mutex_init(&(config.requests_finished_mutex), NULL); pthread_mutex_init(&(config.liveclients_mutex), NULL); - pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL); pthread_mutex_init(&(config.is_updating_slots_mutex), NULL); - pthread_mutex_init(&(config.updating_slots_mutex), NULL); - pthread_mutex_init(&(config.slots_last_update_mutex), NULL); } if (config.keepalive == 0) { diff --git a/src/replication.c b/src/replication.c index 3f98b1062..be05254e8 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1032,7 +1032,7 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - server.stat_net_output_bytes += nwritten; + atomicIncr(server.stat_net_output_bytes, nwritten); sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); @@ -1061,7 +1061,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - server.stat_net_output_bytes += nwritten; + atomicIncr(server.stat_net_output_bytes, nwritten); if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; @@ -1102,7 +1102,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - server.stat_net_output_bytes += nwritten; + atomicIncr(server.stat_net_output_bytes, nwritten); if (slave->repldboff < server.rdb_pipe_bufflen) return; /* more data to write.. */ } @@ -1180,7 +1180,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - server.stat_net_output_bytes += nwritten; + atomicIncr(server.stat_net_output_bytes, nwritten); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1558,7 +1558,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - server.stat_net_input_bytes += nread; + atomicIncr(server.stat_net_input_bytes, nread); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ diff --git a/src/server.c b/src/server.c index e8c51ee86..31e082510 100644 --- a/src/server.c +++ b/src/server.c @@ -1764,7 +1764,8 @@ void databasesCron(void) { void updateCachedTime(int update_daylight_info) { server.ustime = ustime(); server.mstime = server.ustime / 1000; - server.unixtime = server.mstime / 1000; + time_t unixtime = server.mstime / 1000; + atomicSet(server.unixtime, unixtime); /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this @@ -1913,11 +1914,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } run_with_period(100) { + long long stat_net_input_bytes, stat_net_output_bytes; + atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); + atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); + trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, - server.stat_net_input_bytes); + stat_net_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, - server.stat_net_output_bytes); + stat_net_output_bytes); } /* We have just LRU_BITS bits per object for LRU information. @@ -1931,7 +1936,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - server.lruclock = getLRUClock(); + unsigned int lruclock = getLRUClock(); + atomicSet(server.lruclock,lruclock); cronUpdateMemoryStats(); @@ -2443,7 +2449,8 @@ void initServerConfig(void) { server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.loading_process_events_interval_bytes = (1024*1024*2); - server.lruclock = getLRUClock(); + unsigned int lruclock = getLRUClock(); + atomicSet(server.lruclock,lruclock); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2846,9 +2853,9 @@ void resetServerStats(void) { server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; server.stat_io_reads_processed = 0; - server.stat_total_reads_processed = 0; + atomicSet(server.stat_total_reads_processed, 0); server.stat_io_writes_processed = 0; - server.stat_total_writes_processed = 0; + atomicSet(server.stat_total_writes_processed, 0); for (j = 0; j < STATS_METRIC_COUNT; j++) { server.inst_metric[j].idx = 0; server.inst_metric[j].last_sample_time = mstime(); @@ -2856,8 +2863,8 @@ void resetServerStats(void) { memset(server.inst_metric[j].samples,0, sizeof(server.inst_metric[j].samples)); } - server.stat_net_input_bytes = 0; - server.stat_net_output_bytes = 0; + atomicSet(server.stat_net_input_bytes, 0); + atomicSet(server.stat_net_output_bytes, 0); server.stat_unexpected_error_replies = 0; server.aof_delayed_fsync = 0; server.blocked_last_cron = 0; @@ -4186,6 +4193,8 @@ sds genRedisInfoString(const char *section) { call_uname = 0; } + unsigned int lruclock; + atomicGet(server.lruclock,lruclock); info = sdscatfmt(info, "# Server\r\n" "redis_version:%s\r\n" @@ -4230,7 +4239,7 @@ sds genRedisInfoString(const char *section) { (int64_t)(uptime/(3600*24)), server.hz, server.config_hz, - server.lruclock, + lruclock, server.executable ? server.executable : "", server.configfile ? server.configfile : "", server.io_threads_active); @@ -4472,6 +4481,13 @@ sds genRedisInfoString(const char *section) { /* Stats */ if (allsections || defsections || !strcasecmp(section,"stats")) { + long long stat_total_reads_processed, stat_total_writes_processed; + long long stat_net_input_bytes, stat_net_output_bytes; + atomicGet(server.stat_total_reads_processed, stat_total_reads_processed); + atomicGet(server.stat_total_writes_processed, stat_total_writes_processed); + atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); + atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Stats\r\n" @@ -4513,8 +4529,8 @@ sds genRedisInfoString(const char *section) { server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), - server.stat_net_input_bytes, - server.stat_net_output_bytes, + stat_net_input_bytes, + stat_net_output_bytes, (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, server.stat_rejected_conn, @@ -4541,8 +4557,8 @@ sds genRedisInfoString(const char *section) { (unsigned long long) trackingGetTotalItems(), (unsigned long long) trackingGetTotalPrefixes(), server.stat_unexpected_error_replies, - server.stat_total_reads_processed, - server.stat_total_writes_processed, + stat_total_reads_processed, + stat_total_writes_processed, server.stat_io_reads_processed, server.stat_io_writes_processed); } diff --git a/src/server.h b/src/server.h index 8cba2f9b5..cac8544b9 100644 --- a/src/server.h +++ b/src/server.h @@ -34,6 +34,7 @@ #include "config.h" #include "solarisfixes.h" #include "rio.h" +#include "atomicvar.h" #include #include @@ -511,8 +512,10 @@ typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *val typedef size_t (*moduleTypeMemUsageFunc)(const void *value); typedef void (*moduleTypeFreeFunc)(void *value); -/* A callback that is called when the client authentication changes. This - * needs to be exposed since you can't cast a function pointer to (void *) */ +/* This callback type is called by moduleNotifyUserChanged() every time + * a user authenticated via the module API is associated with a different + * user or gets disconnected. This needs to be exposed since you can't cast + * a function pointer to (void *). */ typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); @@ -1063,7 +1066,7 @@ struct redisServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; - _Atomic unsigned int lruclock; /* Clock for LRU eviction */ + redisAtomic unsigned int lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1111,7 +1114,7 @@ struct redisServer { mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ + redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ int gopher_enabled; /* If true the server will reply to gopher queries. Will still serve RESP2 queries. */ @@ -1160,8 +1163,8 @@ struct redisServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ - _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ + redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */ + redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ @@ -1169,8 +1172,8 @@ struct redisServer { long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ - _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ + redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */ + redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -1193,7 +1196,7 @@ struct redisServer { int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ - _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */ + size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -1393,7 +1396,7 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ + redisAtomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ mstime_t mstime; /* 'unixtime' in milliseconds. */ diff --git a/src/zmalloc.c b/src/zmalloc.c index e36319241..cefa39aa8 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -74,8 +74,7 @@ void zlibc_free(void *ptr) { #define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n)) #define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n)) -static size_t used_memory = 0; -pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER; +static redisAtomic size_t used_memory = 0; static void zmalloc_default_oom(size_t size) { fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",