diff --git a/src/Makefile b/src/Makefile index ecbd2753d..68b80a2a7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -345,7 +345,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/atomicvar.h b/src/atomicvar.h index 2c2969c33..1cbfd5b9d 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -1,16 +1,41 @@ /* This file implements atomic counters using c11 _Atomic, __atomic or __sync * macros if available, otherwise we will throw an error when compile. * - * The exported interface is composed of three macros: + * The exported interface is composed of the following macros: * * atomicIncr(var,count) -- Increment the atomic counter * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter + * atomicIncrGet(var,newvalue_var,count) -- Increment and get the atomic counter new value * atomicDecr(var,count) -- Decrement the atomic counter * atomicGet(var,dstvar) -- Fetch the atomic counter value * atomicSet(var,value) -- Set the atomic counter value * atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization * atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization - * + * + * Atomic operations on flags. + * Flag type can be int, long, long long or their unsigned counterparts. + * The value of the flag can be 1 or 0. + * + * atomicFlagGetSet(var,oldvalue_var) -- Get and set the atomic counter value + * + * NOTE1: __atomic* and _Atomic implementations can be actually elaborated to support any value by changing the + * hardcoded new value passed to __atomic_exchange* from 1 to @param count + * i.e oldvalue_var = atomic_exchange_explicit(&var, count). + * However, in order to be compatible with the __sync functions family, we can use only 0 and 1. + * The only exchange alternative suggested by __sync is __sync_lock_test_and_set, + * But as described by the gnu manual for __sync_lock_test_and_set(): + * https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html + * "A target may support reduced functionality here by which the only valid value to store is the immediate constant 1. The exact value + * actually stored in *ptr is implementation defined." + * Hence, we can't rely on it for a any value other than 1. + * We eventually chose to implement this method with __sync_val_compare_and_swap since it satisfies functionality needed for atomicFlagGetSet + * (if the flag was 0 -> set to 1, if it's already 1 -> do nothing, but the final result is that the flag is set), + * and also it has a full barrier (__sync_lock_test_and_set has acquire barrier). + * + * NOTE2: Unlike other atomic type, which aren't guaranteed to be lock free, c11 atmoic_flag does. + * To check whether a type is lock free, atomic_is_lock_free() can be used. + * It can be considered to limit the flag type to atomic_flag to improve performance. + * * Never use return value from the macros, instead use the AtomicGetIncr() * if you need to get the current value and increment it atomically, like * in the following example: @@ -93,6 +118,8 @@ #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \ } while(0) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = atomicIncr(var,count) + count #define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed) #define atomicGet(var,dstvar) do { \ dstvar = atomic_load_explicit(&var,memory_order_relaxed); \ @@ -103,6 +130,8 @@ } while(0) #define atomicSetWithSync(var,value) \ atomic_store_explicit(&var,value,memory_order_seq_cst) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = atomic_exchange_explicit(&var,1,memory_order_relaxed) #define REDIS_ATOMIC_API "c11-builtin" #elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && \ @@ -111,6 +140,8 @@ /* Implementation using __atomic macros. */ #define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \ } while(0) @@ -124,12 +155,16 @@ } while(0) #define atomicSetWithSync(var,value) \ __atomic_store_n(&var,value,__ATOMIC_SEQ_CST) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = __atomic_exchange_n(&var,1,__ATOMIC_RELAXED) #define REDIS_ATOMIC_API "atomic-builtin" #elif defined(HAVE_ATOMIC) /* Implementation using __sync macros. */ #define atomicIncr(var,count) __sync_add_and_fetch(&var,(count)) +#define atomicIncrGet(var, newvalue_var, count) \ + newvalue_var = __sync_add_and_fetch(&var,(count)) #define atomicGetIncr(var,oldvalue_var,count) do { \ oldvalue_var = __sync_fetch_and_add(&var,(count)); \ } while(0) @@ -149,6 +184,8 @@ ANNOTATE_HAPPENS_BEFORE(&var); \ while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \ } while(0) +#define atomicFlagGetSet(var,oldvalue_var) \ + oldvalue_var = __sync_val_compare_and_swap(&var,0,1) #define REDIS_ATOMIC_API "sync-builtin" #else diff --git a/src/debug.c b/src/debug.c index 1e915a1e1..540768018 100644 --- a/src/debug.c +++ b/src/debug.c @@ -36,6 +36,7 @@ #include "quicklist.h" #include "fpconv_dtoa.h" #include "cluster.h" +#include "threads_mngr.h" #include #include @@ -75,6 +76,7 @@ void bugReportStart(void); void printCrashReport(void); void bugReportEnd(int killViaSignal, int sig); void logStackTrace(void *eip, int uplevel); +void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret); /* ================================= Debugging ============================== */ @@ -1819,6 +1821,102 @@ void closeDirectLogFiledes(int fd) { } #ifdef HAVE_BACKTRACE +#define BACKTRACE_MAX_SIZE 100 +#ifdef __linux__ +#include +#include +#include + +static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output); + +#define MAX_BUFF_LENGTH 256 +typedef struct { + char thread_name[16]; + int trace_size; + pid_t tid; + void *trace[BACKTRACE_MAX_SIZE]; +} stacktrace_data; + +static void *collect_stacktrace_data(void) { + /* allocate stacktrace_data struct */ + stacktrace_data *trace_data = zmalloc(sizeof(stacktrace_data)); + + /* Get the stack trace first! */ + trace_data->trace_size = backtrace(trace_data->trace, BACKTRACE_MAX_SIZE); + + /* get the thread name */ + prctl(PR_GET_NAME, trace_data->thread_name); + + /* get the thread id */ + trace_data->tid = syscall(SYS_gettid); + + /* return the trace data */ + return trace_data; +} + +static void writeStacktraces(int fd, int uplevel) { + /* get the list of all the process's threads that don't block or ignore the THREADS_SIGNAL */ + pid_t pid = getpid(); + size_t len_tids = 0; + pid_t *tids = get_ready_to_signal_threads_tids(pid, THREADS_SIGNAL, &len_tids); + + /* This call returns either NULL or the stacktraces data from all tids */ + stacktrace_data **stacktraces_data = (stacktrace_data **)ThreadsManager_runOnThreads(tids, len_tids, collect_stacktrace_data); + + /* free tids */ + zfree(tids); + + /* ThreadsManager_runOnThreads returns NULL if it is already running */ + if (!stacktraces_data) return; + + + char buff[MAX_BUFF_LENGTH]; + pid_t calling_tid = syscall(SYS_gettid); + /* for backtrace_data in backtraces_data: */ + for (size_t i = 0; i < len_tids; i++) { + stacktrace_data *curr_stacktrace_data = stacktraces_data[i]; + /*ThreadsManager_runOnThreads might fail to collect the thread's data */ + if (!curr_stacktrace_data) continue; + + /* stacktrace header includes the tid and the thread's name */ + snprintf(buff, MAX_BUFF_LENGTH, "\n%d %s", curr_stacktrace_data->tid, curr_stacktrace_data->thread_name); + if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; + + /* skip kernel call to the signal handler, the signal handler and the callback addresses */ + int curr_uplevel = 3; + + if (curr_stacktrace_data->tid == calling_tid) { + /* skip signal syscall and ThreadsManager_runOnThreads */ + curr_uplevel += uplevel + 2; + /* Add an indication to header of the thread that is handling the log file */ + snprintf(buff, MAX_BUFF_LENGTH, " *\n"); + } else { + /* just add a new line */ + snprintf(buff, MAX_BUFF_LENGTH, "\n"); + } + + if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; + + /* add the stacktrace */ + backtrace_symbols_fd(curr_stacktrace_data->trace+curr_uplevel, curr_stacktrace_data->trace_size-curr_uplevel, fd); + + zfree(curr_stacktrace_data); + } + zfree(stacktraces_data); +} + +#else /* __linux__*/ + +static void writeStacktraces(int fd, int uplevel) { + void *trace[BACKTRACE_MAX_SIZE]; + + int trace_size = backtrace(trace, BACKTRACE_MAX_SIZE); + + char *msg = "\nBacktrace:\n"; + if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; + backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd); +} +#endif /* __linux__ */ /* Logs the stack trace using the backtrace() call. This function is designed * to be called from signal handlers safely. @@ -1826,16 +1924,12 @@ void closeDirectLogFiledes(int fd) { * The uplevel argument indicates how many of the calling functions to skip. */ void logStackTrace(void *eip, int uplevel) { - void *trace[100]; - int trace_size = 0, fd = openDirectLogFiledes(); + int fd = openDirectLogFiledes(); char *msg; uplevel++; /* skip this function */ if (fd == -1) return; /* If we can't log there is anything to do. */ - /* Get the stack trace first! */ - trace_size = backtrace(trace, 100); - msg = "\n------ STACK TRACE ------\n"; if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; @@ -1847,9 +1941,8 @@ void logStackTrace(void *eip, int uplevel) { } /* Write symbols to log file */ - msg = "\nBacktrace:\n"; - if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */}; - backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd); + ++uplevel; + writeStacktraces(fd, uplevel); /* Cleanup */ closeDirectLogFiledes(fd); @@ -2184,6 +2277,17 @@ static void sigsegvHandler(int sig, siginfo_t *info, void *secret) { bugReportEnd(1, sig); } +void setupDebugSigHandlers(void) { + setupSigSegvHandler(); + + struct sigaction act; + + sigemptyset(&act.sa_mask); + act.sa_flags = SA_SIGINFO; + act.sa_sigaction = sigalrmSignalHandler; + sigaction(SIGALRM, &act, NULL); +} + void setupSigSegvHandler(void) { /* Initialize the signal handler lock. Attempting to initialize an already initialized mutex or mutexattr results in undefined behavior. */ @@ -2305,16 +2409,21 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) { /* =========================== Software Watchdog ============================ */ #include -void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) { +void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret) { #ifdef HAVE_BACKTRACE ucontext_t *uc = (ucontext_t*) secret; #else (void)secret; #endif - UNUSED(info); UNUSED(sig); - serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---"); + /* SIGALRM can be sent explicitly to the process calling kill() to get the stacktraces, + or every watchdog_period interval. In the last case, si_pid is not set */ + if(info->si_pid == 0) { + serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---"); + } else { + serverLogFromHandler(LL_WARNING, "\nReceived SIGALRM"); + } #ifdef HAVE_BACKTRACE logStackTrace(getAndSetMcontextEip(uc, NULL), 1); #else @@ -2338,25 +2447,10 @@ void watchdogScheduleSignal(int period) { setitimer(ITIMER_REAL, &it, NULL); } void applyWatchdogPeriod(void) { - struct sigaction act; - /* Disable watchdog when period is 0 */ if (server.watchdog_period == 0) { watchdogScheduleSignal(0); /* Stop the current timer. */ - - /* Set the signal handler to SIG_IGN, this will also remove pending - * signals from the queue. */ - sigemptyset(&act.sa_mask); - act.sa_flags = 0; - act.sa_handler = SIG_IGN; - sigaction(SIGALRM, &act, NULL); } else { - /* Setup the signal handler. */ - sigemptyset(&act.sa_mask); - act.sa_flags = SA_SIGINFO; - act.sa_sigaction = watchdogSignalHandler; - sigaction(SIGALRM, &act, NULL); - /* If the configured period is smaller than twice the timer period, it is * too short for the software watchdog to work reliably. Fix it now * if needed. */ @@ -2374,3 +2468,114 @@ void debugDelay(int usec) { if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0; if (usec) usleep(usec); } + +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + +/* =========================== Stacktrace Utils ============================ */ +#define TIDS_INITIAL_SIZE 50 + +/** If it doesn't block and doesn't ignore, return 1 (the thread will handle the signal) + * If thread tid blocks or ignores sig_num returns 0 (thread is not ready to catch the signal). + * also returns 0 if something is wrong and prints a warning message to the log file **/ +static int is_thread_ready_to_signal(pid_t pid, pid_t tid, int sig_num) { + /* open the threads status file */ + char buff[MAX_BUFF_LENGTH]; + snprintf(buff, MAX_BUFF_LENGTH, "/proc/%d/task/%d/status", pid, tid); + FILE *thread_status_file = fopen(buff, "r"); + if (thread_status_file == NULL) { + serverLog(LL_WARNING, + "tid:%d: failed to open /proc/%d/task/%d/status file", tid, pid, tid); + return 0; + } + + int ret = 1; + size_t field_name_len = strlen("SigBlk:"); /* SigIgn has the same length */ + char *line = NULL; + size_t fields_count = 2; + while ((line = fgets(buff, MAX_BUFF_LENGTH, thread_status_file)) && fields_count) { + /* iterate the file until we reach SigBlk or SigIgn field line */ + if (!strncmp(buff, "SigBlk:", field_name_len) || !strncmp(buff, "SigIgn:", field_name_len)) { + /* check if the signal exist in the mask */ + unsigned long sig_mask = strtoul(buff + field_name_len, NULL, 16); + if(sig_mask & sig_num) { /* if the signal is blocked/ignored return 0 */ + ret = 0; + break; + } + --fields_count; + } + } + + fclose(thread_status_file); + + /* if we reached EOF, it means we haven't found SigBlk or/and SigIgn, something is wrong */ + if (line == NULL) { + ret = 0; + serverLog(LL_WARNING, + "tid:%d: failed to find SigBlk or/and SigIgn field(s) in /proc/%d/task/%d/status file", tid, pid, tid); + } + return ret; +} + +/** Returns a list of all the process's (pid) threads that can receive signal sig_num. + * Also updates tids_len_output to the number of valid threads' ids in the returned array + * NOTE: It is the caller responsibility to free the returned array with zfree(). */ +static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output) { + /* Initialize the path the process threads' directory. */ + char path_buff[MAX_BUFF_LENGTH]; + snprintf(path_buff, MAX_BUFF_LENGTH, "/proc/%d/task", pid); + + /* Get the directory handler. */ + DIR *dir; + if (!(dir = opendir(path_buff))) return NULL; + + size_t tids_cap = TIDS_INITIAL_SIZE; + pid_t *tids = zmalloc(sizeof(pid_t) * tids_cap); + + size_t tids_count = 0; + struct dirent *entry; + pid_t calling_tid = syscall(SYS_gettid); + int current_thread_index = -1; + + /* Each thread is represented by a directory */ + while ((entry = readdir(dir)) != NULL) { + if (entry->d_type == DT_DIR) { + /* Skip irrelevant directories. */ + if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) { + /* the thread's directory name is equivalent to its tid. */ + pid_t tid = atoi(entry->d_name); + + if(!is_thread_ready_to_signal(pid, tid, sig_num)) continue; + + if(tid == calling_tid) { + current_thread_index = tids_count; + } + + /* increase tids capacity if needed */ + if(tids_count >= tids_cap) { + tids_cap *= 2; + tids = zrealloc(tids, sizeof(pid_t) * tids_cap); + } + + /* save the thread id */ + tids[tids_count++] = tid; + } + } + } + + /* Swap the last tid with the the current thread id */ + if(current_thread_index != -1) { + pid_t last_tid = tids[tids_count - 1]; + + tids[tids_count - 1] = calling_tid; + tids[current_thread_index] = last_tid; + } + + + closedir(dir); + + *tids_len_output = tids_count; + return tids; +} +#endif /* __linux__ */ +#endif /* HAVE_BACKTRACE */ diff --git a/src/server.c b/src/server.c index 929f9b7cd..6b62056d3 100644 --- a/src/server.c +++ b/src/server.c @@ -38,6 +38,7 @@ #include "functions.h" #include "hdr_histogram.h" #include "syscheck.h" +#include "threads_mngr.h" #include #include @@ -2563,6 +2564,7 @@ void initServer(void) { signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); + ThreadsManager_init(); makeThreadKillable(); if (server.syslog_enabled) { @@ -6526,7 +6528,7 @@ void setupSignalHandlers(void) { sigaction(SIGTERM, &act, NULL); sigaction(SIGINT, &act, NULL); - setupSigSegvHandler(); + setupDebugSigHandlers(); } /* This is the signal handler for children process. It is currently useful diff --git a/src/server.h b/src/server.h index 4634cbdf5..ac324eb6c 100644 --- a/src/server.h +++ b/src/server.h @@ -3707,6 +3707,7 @@ void _serverPanic(const char *file, int line, const char *msg, ...) void _serverPanic(const char *file, int line, const char *msg, ...); #endif void serverLogObjectDebugInfo(const robj *o); +void setupDebugSigHandlers(void); void setupSigSegvHandler(void); void removeSigSegvHandlers(void); const char *getSafeInfoString(const char *s, size_t len, char **tmp); diff --git a/src/threads_mngr.c b/src/threads_mngr.c new file mode 100644 index 000000000..cb93dcf33 --- /dev/null +++ b/src/threads_mngr.c @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "threads_mngr.h" +/* Anti-warning macro... */ +#define UNUSED(V) ((void) V) + +#ifdef __linux__ +#include "zmalloc.h" +#include "atomicvar.h" +#include "server.h" + +#include +#include +#include +#include +#include + +#define IN_PROGRESS 1 +static const clock_t RUN_ON_THREADS_TIMEOUT = 2; + +/*================================= Globals ================================= */ + +static run_on_thread_cb g_callback = NULL; +static volatile size_t g_tids_len = 0; +static void **g_output_array = NULL; +static redisAtomic size_t g_thread_ids = 0; +static redisAtomic size_t g_num_threads_done = 0; + +static sem_t wait_for_threads_sem; + +/* This flag is set while ThreadsManager_runOnThreads is running */ +static redisAtomic int g_in_progress = 0; + +/*============================ Internal prototypes ========================== */ + +static void invoke_callback(int sig); +/* returns 0 if it is safe to start, IN_PROGRESS otherwise. */ +static int test_and_start(void); +static void wait_threads(void); +/* Clean up global variable. +Assuming we are under the g_in_progress protection, this is not a thread-safe function */ +static void ThreadsManager_cleanups(void); + +/*============================ API functions implementations ========================== */ + +void ThreadsManager_init(void) { + /* Register signal handler */ + struct sigaction act; + sigemptyset(&act.sa_mask); + /* Not setting SA_RESTART flag means that If a signal handler is invoked while a + system call or library function call is blocked, use the default behavior + i.e., the call fails with the error EINTR */ + act.sa_flags = 0; + act.sa_handler = invoke_callback; + sigaction(SIGUSR2, &act, NULL); +} + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* Check if it is safe to start running. If not - return */ + if(test_and_start() == IN_PROGRESS) { + return NULL; + } + + /* Update g_callback */ + g_callback = callback; + + /* Set g_tids_len */ + g_tids_len = tids_len; + + /* Allocate the output buffer */ + g_output_array = zmalloc(sizeof(void*) * tids_len); + + /* Initialize a semaphore that we will be waiting on for the threads + use pshared = 0 to indicate the semaphore is shared between the process's threads (and not between processes), + and value = 0 as the initial semaphore value. */ + sem_init(&wait_for_threads_sem, 0, 0); + + /* Send signal to all the threads in tids */ + pid_t pid = getpid(); + for (size_t i = 0; i < tids_len ; ++i) { + syscall(SYS_tgkill, pid, tids[i], THREADS_SIGNAL); + } + + /* Wait for all the threads to write to the output array, or until timeout is reached */ + wait_threads(); + + void **ret = g_output_array; + + /* Cleanups to allow next execution */ + ThreadsManager_cleanups(); + + return ret; +} + +/*============================ Internal functions implementations ========================== */ + + +static int test_and_start(void) { + /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ + int prev_state; + atomicFlagGetSet(g_in_progress, prev_state); + + /* If prev_state is 1, g_in_progress was on. */ + return prev_state; +} + +static void invoke_callback(int sig) { + UNUSED(sig); + + size_t thread_id; + atomicGetIncr(g_thread_ids, thread_id, 1); + g_output_array[thread_id] = g_callback(); + size_t curr_done_count; + atomicIncrGet(g_num_threads_done, curr_done_count, 1); + + /* last thread shuts down the light */ + if (curr_done_count == g_tids_len) { + sem_post(&wait_for_threads_sem); + } +} + +static void wait_threads(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + + /* calculate relative time until timeout */ + ts.tv_sec += RUN_ON_THREADS_TIMEOUT; + + int status = 0; + + /* lock the semaphore until the semaphore value rises above zero or a signal + handler interrupts the call. In the later case continue to wait. */ + while ((status = sem_timedwait(&wait_for_threads_sem, &ts)) == -1 && errno == EINTR) { + serverLog(LL_WARNING, "threads_mngr: waiting for threads' output was interrupted by signal. Continue waiting."); + continue; + } + + if (status == -1) { + if (errno == ETIMEDOUT) { + serverLog(LL_WARNING, "threads_mngr: waiting for threads' output timed out"); + } + } +} + +static void ThreadsManager_cleanups(void) { + g_callback = NULL; + g_tids_len = 0; + g_output_array = NULL; + g_thread_ids = 0; + g_num_threads_done = 0; + sem_destroy(&wait_for_threads_sem); + + /* Lastly, turn off g_in_progress */ + atomicSet(g_in_progress, 0); +} +#else + +void ThreadsManager_init(void) { + /* DO NOTHING */ +} + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* DO NOTHING */ + UNUSED(tids); + UNUSED(tids_len); + UNUSED(callback); + return NULL; +} + +#endif /* __linux__ */ diff --git a/src/threads_mngr.h b/src/threads_mngr.h new file mode 100644 index 000000000..2dc82ca0e --- /dev/null +++ b/src/threads_mngr.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021, Redis Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once + +#include "fmacros.h" + +#include +#include + +/** This is an API to invoke callback on a list of threads using a user defined signal handler. + * NOTE: This is API is only supported only in linux systems. + * Calling the functions below on any other system does nothing. +*/ + +#define THREADS_SIGNAL SIGUSR2 + +/* Callback signature */ +typedef void*(*run_on_thread_cb)(void); + +/* Register the process to THREADS_SIGNAL */ +void ThreadsManager_init(void); + +/** @brief Invoke callback by each thread in tids. + * + * @param tids An array of threads that need to invoke callback. + * @param tids_len The number of threads in @param tids. + * @param callback A callback to be invoked by each thread in @param tids. + * + * NOTES: + * It is assumed that all the threads don't block or ignore THREADS_SIGNAL. + * + * It is safe to include the calling thread in @param tids. However, be aware that subsequent tids will + * not be signaled until the calling thread returns from the callback invocation. + * Hence, it is recommended to place the calling thread last in @param tids. + * + * The function returns only when @param tids_len threads have returned from @param callback. + * + * @return NULL If ThreadsManager_runOnThreads is already in the middle of execution. + * Otherwise, it returns an array of the threads return value from @param callback. + * NOTES: + * The indices of the outputs in the output array are NOT associated with the threads indices in @param tids. + * + * The returned array length will be @param tids_len, but some of the entries might be set to NULL if the + * invocation of @param callback was unsuccessful. + * + * The output array should be freed by the caller by calling zfree(). +**/ + +void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback); diff --git a/tests/integration/logging.tcl b/tests/integration/logging.tcl index 4f8639be0..7bd2eb83c 100644 --- a/tests/integration/logging.tcl +++ b/tests/integration/logging.tcl @@ -2,11 +2,13 @@ tags {"external:skip"} { set system_name [string tolower [exec uname -s]] set backtrace_supported 0 +set threads_mngr_supported 0 # We only support darwin or Linux with glibc if {$system_name eq {darwin}} { set backtrace_supported 1 } elseif {$system_name eq {linux}} { + set threads_mngr_supported 1 # Avoid the test on libmusl, which does not support backtrace # and on static binaries (ldd exit code 1) where we can't detect libmusl catch { @@ -23,6 +25,17 @@ if {$backtrace_supported} { test "Server is able to generate a stack trace on selected systems" { r config set watchdog-period 200 r debug sleep 1 + if {$threads_mngr_supported} { + assert_equal [count_log_message 0 "failed to open /proc/"] 0 + assert_equal [count_log_message 0 "failed to find SigBlk or/and SigIgn"] 0 + assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output was interrupted by signal"] 0 + assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output timed out"] 0 + assert_equal [count_log_message 0 "bioProcessBackgroundJobs"] 3 + } + + set pattern "*redis-server *main*" + set res [wait_for_log_messages 0 \"$pattern\" 0 100 100] + if {$::verbose} { puts $res } set pattern "*debugCommand*" set res [wait_for_log_messages 0 \"$pattern\" 0 100 100] if {$::verbose} { puts $res }