Print stack trace from all threads in crash report (#12453)

In this PR we are adding the functionality to collect all the process's threads' backtraces.

## Changes made in this PR

### **introduce threads mngr API**
The **threads mngr API** which has 2 abilities:
* `ThreadsManager_init() `- register to SIGUSR2. called on the server start-up.
* ` ThreadsManager_runOnThreads()` - receives a list of a pid_t and a callback, tells every
  thread in the list to invoke the callback, and returns the output collected by each invocation.
**Elaborating atomicvar API**
* `atomicIncrGet(var,newvalue_var,count) `-- Increment and get the atomic counter new value
* `atomicFlagGetSet` -- Get and set the atomic counter value to 1

### **Always set SIGALRM handler**
SIGALRM handler prints the process's stacktrace to the log file. Up until now, it was set only if the
`server.watchdog_period` > 0. This can be also useful if debugging is needed. However, in situations
where the server can't get requests, (a deadlock, for example) we weren't able to change the signal handler.
To make it available at run time we set SIGALRM handler on server startup. The signal handler name was
changed to a more general `sigalrmSignalHandler`.

### **Print all the process' threads' stacktraces**

`logStackTrace()` now calls `writeStacktraces()`, instead of logging the current thread stacktrace.
`writeStacktraces()`:
* On Linux systems we use the threads manager API to collect the backtraces of all the process' threads.
  To get the `tids` list (threads ids) we read the `/proc/<redis-server-pid>/tasks` file which includes a list of directories.
  Each directory name corresponds to one tid (including the main thread). For each thread, we also need to check if it
  can get the signal from the threads manager (meaning it is not blocking/ignoring that signal). We send the threads
  manager this tids list and `collect_stacktrace_data()` callback, which collects the thread's backtrace addresses,
  its name, and tid.
* On other systems, the behavior remained as it was (writing only the current thread stacktrace to the log file).

## compatibility notes
1. **The threads mngr API is only supported in linux.** 
2. glibc earlier than 2.3 We use `syscall(SYS_gettid)` and `syscall(SYS_tgkill...)` because their dedicated
  alternatives (`gettid()` and `tgkill`) were added in glibc 2.3.

## Output example

Each thread backtrace will have the following format:
`<tid> <thread_name> [additional_info]`
* **tid**: as read from the `/proc/<redis-server-pid>/tasks` file
* **thread_name**: the tread name as it is registered in the os/
* **additional_info**: Sometimes we want to add specific information about one of the threads. currently.
  it is only used to mark the thread that handles the backtraces collection by adding "*".
  In case of crash - this also indicates which thread caused the crash. The handling thread in won't
  necessarily appear first.

```
------ STACK TRACE ------
EIP:
/lib/aarch64-linux-gnu/libc.so.6(epoll_pwait+0x9c)[0xffffb9295ebc]

67089 redis-server *
linux-vdso.so.1(__kernel_rt_sigreturn+0x0)[0xffffb9437790]
/lib/aarch64-linux-gnu/libc.so.6(epoll_pwait+0x9c)[0xffffb9295ebc]
redis-server *:6379(+0x75e0c)[0xaaaac2fe5e0c]
redis-server *:6379(aeProcessEvents+0x18c)[0xaaaac2fe6c00]
redis-server *:6379(aeMain+0x24)[0xaaaac2fe7038]
redis-server *:6379(main+0xe0c)[0xaaaac3001afc]
/lib/aarch64-linux-gnu/libc.so.6(+0x273fc)[0xffffb91d73fc]
/lib/aarch64-linux-gnu/libc.so.6(__libc_start_main+0x98)[0xffffb91d74cc]
redis-server *:6379(_start+0x30)[0xaaaac2fe0370]

67093 bio_lazy_free
/lib/aarch64-linux-gnu/libc.so.6(+0x79dfc)[0xffffb9229dfc]
/lib/aarch64-linux-gnu/libc.so.6(pthread_cond_wait+0x208)[0xffffb922c8fc]
redis-server *:6379(bioProcessBackgroundJobs+0x174)[0xaaaac30976e8]
/lib/aarch64-linux-gnu/libc.so.6(+0x7d5c8)[0xffffb922d5c8]
/lib/aarch64-linux-gnu/libc.so.6(+0xe5d1c)[0xffffb9295d1c]

67091 bio_close_file
/lib/aarch64-linux-gnu/libc.so.6(+0x79dfc)[0xffffb9229dfc]
/lib/aarch64-linux-gnu/libc.so.6(pthread_cond_wait+0x208)[0xffffb922c8fc]
redis-server *:6379(bioProcessBackgroundJobs+0x174)[0xaaaac30976e8]
/lib/aarch64-linux-gnu/libc.so.6(+0x7d5c8)[0xffffb922d5c8]
/lib/aarch64-linux-gnu/libc.so.6(+0xe5d1c)[0xffffb9295d1c]

67092 bio_aof
/lib/aarch64-linux-gnu/libc.so.6(+0x79dfc)[0xffffb9229dfc]
/lib/aarch64-linux-gnu/libc.so.6(pthread_cond_wait+0x208)[0xffffb922c8fc]
redis-server *:6379(bioProcessBackgroundJobs+0x174)[0xaaaac30976e8]
/lib/aarch64-linux-gnu/libc.so.6(+0x7d5c8)[0xffffb922d5c8]
/lib/aarch64-linux-gnu/libc.so.6(+0xe5d1c)[0xffffb9295d1c]
67089:signal-handler (1693824528) --------
```
This commit is contained in:
meiravgri 2023-09-24 09:47:23 +03:00 committed by GitHub
parent 2aad03fa39
commit cc2be63997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 561 additions and 30 deletions

View File

@ -345,7 +345,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -1,16 +1,41 @@
/* This file implements atomic counters using c11 _Atomic, __atomic or __sync
* macros if available, otherwise we will throw an error when compile.
*
* The exported interface is composed of three macros:
* The exported interface is composed of the following macros:
*
* atomicIncr(var,count) -- Increment the atomic counter
* atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter
* atomicIncrGet(var,newvalue_var,count) -- Increment and get the atomic counter new value
* atomicDecr(var,count) -- Decrement the atomic counter
* atomicGet(var,dstvar) -- Fetch the atomic counter value
* atomicSet(var,value) -- Set the atomic counter value
* atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization
* atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization
*
*
* Atomic operations on flags.
* Flag type can be int, long, long long or their unsigned counterparts.
* The value of the flag can be 1 or 0.
*
* atomicFlagGetSet(var,oldvalue_var) -- Get and set the atomic counter value
*
* NOTE1: __atomic* and _Atomic implementations can be actually elaborated to support any value by changing the
* hardcoded new value passed to __atomic_exchange* from 1 to @param count
* i.e oldvalue_var = atomic_exchange_explicit(&var, count).
* However, in order to be compatible with the __sync functions family, we can use only 0 and 1.
* The only exchange alternative suggested by __sync is __sync_lock_test_and_set,
* But as described by the gnu manual for __sync_lock_test_and_set():
* https://gcc.gnu.org/onlinedocs/gcc/_005f_005fsync-Builtins.html
* "A target may support reduced functionality here by which the only valid value to store is the immediate constant 1. The exact value
* actually stored in *ptr is implementation defined."
* Hence, we can't rely on it for a any value other than 1.
* We eventually chose to implement this method with __sync_val_compare_and_swap since it satisfies functionality needed for atomicFlagGetSet
* (if the flag was 0 -> set to 1, if it's already 1 -> do nothing, but the final result is that the flag is set),
* and also it has a full barrier (__sync_lock_test_and_set has acquire barrier).
*
* NOTE2: Unlike other atomic type, which aren't guaranteed to be lock free, c11 atmoic_flag does.
* To check whether a type is lock free, atomic_is_lock_free() can be used.
* It can be considered to limit the flag type to atomic_flag to improve performance.
*
* Never use return value from the macros, instead use the AtomicGetIncr()
* if you need to get the current value and increment it atomically, like
* in the following example:
@ -93,6 +118,8 @@
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \
} while(0)
#define atomicIncrGet(var, newvalue_var, count) \
newvalue_var = atomicIncr(var,count) + count
#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed)
#define atomicGet(var,dstvar) do { \
dstvar = atomic_load_explicit(&var,memory_order_relaxed); \
@ -103,6 +130,8 @@
} while(0)
#define atomicSetWithSync(var,value) \
atomic_store_explicit(&var,value,memory_order_seq_cst)
#define atomicFlagGetSet(var,oldvalue_var) \
oldvalue_var = atomic_exchange_explicit(&var,1,memory_order_relaxed)
#define REDIS_ATOMIC_API "c11-builtin"
#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && \
@ -111,6 +140,8 @@
/* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
#define atomicIncrGet(var, newvalue_var, count) \
newvalue_var = __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \
} while(0)
@ -124,12 +155,16 @@
} while(0)
#define atomicSetWithSync(var,value) \
__atomic_store_n(&var,value,__ATOMIC_SEQ_CST)
#define atomicFlagGetSet(var,oldvalue_var) \
oldvalue_var = __atomic_exchange_n(&var,1,__ATOMIC_RELAXED)
#define REDIS_ATOMIC_API "atomic-builtin"
#elif defined(HAVE_ATOMIC)
/* Implementation using __sync macros. */
#define atomicIncr(var,count) __sync_add_and_fetch(&var,(count))
#define atomicIncrGet(var, newvalue_var, count) \
newvalue_var = __sync_add_and_fetch(&var,(count))
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = __sync_fetch_and_add(&var,(count)); \
} while(0)
@ -149,6 +184,8 @@
ANNOTATE_HAPPENS_BEFORE(&var); \
while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \
} while(0)
#define atomicFlagGetSet(var,oldvalue_var) \
oldvalue_var = __sync_val_compare_and_swap(&var,0,1)
#define REDIS_ATOMIC_API "sync-builtin"
#else

View File

@ -36,6 +36,7 @@
#include "quicklist.h"
#include "fpconv_dtoa.h"
#include "cluster.h"
#include "threads_mngr.h"
#include <arpa/inet.h>
#include <signal.h>
@ -75,6 +76,7 @@ void bugReportStart(void);
void printCrashReport(void);
void bugReportEnd(int killViaSignal, int sig);
void logStackTrace(void *eip, int uplevel);
void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret);
/* ================================= Debugging ============================== */
@ -1819,6 +1821,102 @@ void closeDirectLogFiledes(int fd) {
}
#ifdef HAVE_BACKTRACE
#define BACKTRACE_MAX_SIZE 100
#ifdef __linux__
#include <sys/prctl.h>
#include <sys/syscall.h>
#include <dirent.h>
static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output);
#define MAX_BUFF_LENGTH 256
typedef struct {
char thread_name[16];
int trace_size;
pid_t tid;
void *trace[BACKTRACE_MAX_SIZE];
} stacktrace_data;
static void *collect_stacktrace_data(void) {
/* allocate stacktrace_data struct */
stacktrace_data *trace_data = zmalloc(sizeof(stacktrace_data));
/* Get the stack trace first! */
trace_data->trace_size = backtrace(trace_data->trace, BACKTRACE_MAX_SIZE);
/* get the thread name */
prctl(PR_GET_NAME, trace_data->thread_name);
/* get the thread id */
trace_data->tid = syscall(SYS_gettid);
/* return the trace data */
return trace_data;
}
static void writeStacktraces(int fd, int uplevel) {
/* get the list of all the process's threads that don't block or ignore the THREADS_SIGNAL */
pid_t pid = getpid();
size_t len_tids = 0;
pid_t *tids = get_ready_to_signal_threads_tids(pid, THREADS_SIGNAL, &len_tids);
/* This call returns either NULL or the stacktraces data from all tids */
stacktrace_data **stacktraces_data = (stacktrace_data **)ThreadsManager_runOnThreads(tids, len_tids, collect_stacktrace_data);
/* free tids */
zfree(tids);
/* ThreadsManager_runOnThreads returns NULL if it is already running */
if (!stacktraces_data) return;
char buff[MAX_BUFF_LENGTH];
pid_t calling_tid = syscall(SYS_gettid);
/* for backtrace_data in backtraces_data: */
for (size_t i = 0; i < len_tids; i++) {
stacktrace_data *curr_stacktrace_data = stacktraces_data[i];
/*ThreadsManager_runOnThreads might fail to collect the thread's data */
if (!curr_stacktrace_data) continue;
/* stacktrace header includes the tid and the thread's name */
snprintf(buff, MAX_BUFF_LENGTH, "\n%d %s", curr_stacktrace_data->tid, curr_stacktrace_data->thread_name);
if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */};
/* skip kernel call to the signal handler, the signal handler and the callback addresses */
int curr_uplevel = 3;
if (curr_stacktrace_data->tid == calling_tid) {
/* skip signal syscall and ThreadsManager_runOnThreads */
curr_uplevel += uplevel + 2;
/* Add an indication to header of the thread that is handling the log file */
snprintf(buff, MAX_BUFF_LENGTH, " *\n");
} else {
/* just add a new line */
snprintf(buff, MAX_BUFF_LENGTH, "\n");
}
if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */};
/* add the stacktrace */
backtrace_symbols_fd(curr_stacktrace_data->trace+curr_uplevel, curr_stacktrace_data->trace_size-curr_uplevel, fd);
zfree(curr_stacktrace_data);
}
zfree(stacktraces_data);
}
#else /* __linux__*/
static void writeStacktraces(int fd, int uplevel) {
void *trace[BACKTRACE_MAX_SIZE];
int trace_size = backtrace(trace, BACKTRACE_MAX_SIZE);
char *msg = "\nBacktrace:\n";
if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */};
backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd);
}
#endif /* __linux__ */
/* Logs the stack trace using the backtrace() call. This function is designed
* to be called from signal handlers safely.
@ -1826,16 +1924,12 @@ void closeDirectLogFiledes(int fd) {
* The uplevel argument indicates how many of the calling functions to skip.
*/
void logStackTrace(void *eip, int uplevel) {
void *trace[100];
int trace_size = 0, fd = openDirectLogFiledes();
int fd = openDirectLogFiledes();
char *msg;
uplevel++; /* skip this function */
if (fd == -1) return; /* If we can't log there is anything to do. */
/* Get the stack trace first! */
trace_size = backtrace(trace, 100);
msg = "\n------ STACK TRACE ------\n";
if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */};
@ -1847,9 +1941,8 @@ void logStackTrace(void *eip, int uplevel) {
}
/* Write symbols to log file */
msg = "\nBacktrace:\n";
if (write(fd,msg,strlen(msg)) == -1) {/* Avoid warning. */};
backtrace_symbols_fd(trace+uplevel, trace_size-uplevel, fd);
++uplevel;
writeStacktraces(fd, uplevel);
/* Cleanup */
closeDirectLogFiledes(fd);
@ -2184,6 +2277,17 @@ static void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
bugReportEnd(1, sig);
}
void setupDebugSigHandlers(void) {
setupSigSegvHandler();
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = sigalrmSignalHandler;
sigaction(SIGALRM, &act, NULL);
}
void setupSigSegvHandler(void) {
/* Initialize the signal handler lock.
Attempting to initialize an already initialized mutex or mutexattr results in undefined behavior. */
@ -2305,16 +2409,21 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) {
/* =========================== Software Watchdog ============================ */
#include <sys/time.h>
void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) {
void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret) {
#ifdef HAVE_BACKTRACE
ucontext_t *uc = (ucontext_t*) secret;
#else
(void)secret;
#endif
UNUSED(info);
UNUSED(sig);
serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---");
/* SIGALRM can be sent explicitly to the process calling kill() to get the stacktraces,
or every watchdog_period interval. In the last case, si_pid is not set */
if(info->si_pid == 0) {
serverLogFromHandler(LL_WARNING,"\n--- WATCHDOG TIMER EXPIRED ---");
} else {
serverLogFromHandler(LL_WARNING, "\nReceived SIGALRM");
}
#ifdef HAVE_BACKTRACE
logStackTrace(getAndSetMcontextEip(uc, NULL), 1);
#else
@ -2338,25 +2447,10 @@ void watchdogScheduleSignal(int period) {
setitimer(ITIMER_REAL, &it, NULL);
}
void applyWatchdogPeriod(void) {
struct sigaction act;
/* Disable watchdog when period is 0 */
if (server.watchdog_period == 0) {
watchdogScheduleSignal(0); /* Stop the current timer. */
/* Set the signal handler to SIG_IGN, this will also remove pending
* signals from the queue. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = SIG_IGN;
sigaction(SIGALRM, &act, NULL);
} else {
/* Setup the signal handler. */
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = watchdogSignalHandler;
sigaction(SIGALRM, &act, NULL);
/* If the configured period is smaller than twice the timer period, it is
* too short for the software watchdog to work reliably. Fix it now
* if needed. */
@ -2374,3 +2468,114 @@ void debugDelay(int usec) {
if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0;
if (usec) usleep(usec);
}
#ifdef HAVE_BACKTRACE
#ifdef __linux__
/* =========================== Stacktrace Utils ============================ */
#define TIDS_INITIAL_SIZE 50
/** If it doesn't block and doesn't ignore, return 1 (the thread will handle the signal)
* If thread tid blocks or ignores sig_num returns 0 (thread is not ready to catch the signal).
* also returns 0 if something is wrong and prints a warning message to the log file **/
static int is_thread_ready_to_signal(pid_t pid, pid_t tid, int sig_num) {
/* open the threads status file */
char buff[MAX_BUFF_LENGTH];
snprintf(buff, MAX_BUFF_LENGTH, "/proc/%d/task/%d/status", pid, tid);
FILE *thread_status_file = fopen(buff, "r");
if (thread_status_file == NULL) {
serverLog(LL_WARNING,
"tid:%d: failed to open /proc/%d/task/%d/status file", tid, pid, tid);
return 0;
}
int ret = 1;
size_t field_name_len = strlen("SigBlk:"); /* SigIgn has the same length */
char *line = NULL;
size_t fields_count = 2;
while ((line = fgets(buff, MAX_BUFF_LENGTH, thread_status_file)) && fields_count) {
/* iterate the file until we reach SigBlk or SigIgn field line */
if (!strncmp(buff, "SigBlk:", field_name_len) || !strncmp(buff, "SigIgn:", field_name_len)) {
/* check if the signal exist in the mask */
unsigned long sig_mask = strtoul(buff + field_name_len, NULL, 16);
if(sig_mask & sig_num) { /* if the signal is blocked/ignored return 0 */
ret = 0;
break;
}
--fields_count;
}
}
fclose(thread_status_file);
/* if we reached EOF, it means we haven't found SigBlk or/and SigIgn, something is wrong */
if (line == NULL) {
ret = 0;
serverLog(LL_WARNING,
"tid:%d: failed to find SigBlk or/and SigIgn field(s) in /proc/%d/task/%d/status file", tid, pid, tid);
}
return ret;
}
/** Returns a list of all the process's (pid) threads that can receive signal sig_num.
* Also updates tids_len_output to the number of valid threads' ids in the returned array
* NOTE: It is the caller responsibility to free the returned array with zfree(). */
static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output) {
/* Initialize the path the process threads' directory. */
char path_buff[MAX_BUFF_LENGTH];
snprintf(path_buff, MAX_BUFF_LENGTH, "/proc/%d/task", pid);
/* Get the directory handler. */
DIR *dir;
if (!(dir = opendir(path_buff))) return NULL;
size_t tids_cap = TIDS_INITIAL_SIZE;
pid_t *tids = zmalloc(sizeof(pid_t) * tids_cap);
size_t tids_count = 0;
struct dirent *entry;
pid_t calling_tid = syscall(SYS_gettid);
int current_thread_index = -1;
/* Each thread is represented by a directory */
while ((entry = readdir(dir)) != NULL) {
if (entry->d_type == DT_DIR) {
/* Skip irrelevant directories. */
if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) {
/* the thread's directory name is equivalent to its tid. */
pid_t tid = atoi(entry->d_name);
if(!is_thread_ready_to_signal(pid, tid, sig_num)) continue;
if(tid == calling_tid) {
current_thread_index = tids_count;
}
/* increase tids capacity if needed */
if(tids_count >= tids_cap) {
tids_cap *= 2;
tids = zrealloc(tids, sizeof(pid_t) * tids_cap);
}
/* save the thread id */
tids[tids_count++] = tid;
}
}
}
/* Swap the last tid with the the current thread id */
if(current_thread_index != -1) {
pid_t last_tid = tids[tids_count - 1];
tids[tids_count - 1] = calling_tid;
tids[current_thread_index] = last_tid;
}
closedir(dir);
*tids_len_output = tids_count;
return tids;
}
#endif /* __linux__ */
#endif /* HAVE_BACKTRACE */

View File

@ -38,6 +38,7 @@
#include "functions.h"
#include "hdr_histogram.h"
#include "syscheck.h"
#include "threads_mngr.h"
#include <time.h>
#include <signal.h>
@ -2563,6 +2564,7 @@ void initServer(void) {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
ThreadsManager_init();
makeThreadKillable();
if (server.syslog_enabled) {
@ -6526,7 +6528,7 @@ void setupSignalHandlers(void) {
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
setupSigSegvHandler();
setupDebugSigHandlers();
}
/* This is the signal handler for children process. It is currently useful

View File

@ -3707,6 +3707,7 @@ void _serverPanic(const char *file, int line, const char *msg, ...)
void _serverPanic(const char *file, int line, const char *msg, ...);
#endif
void serverLogObjectDebugInfo(const robj *o);
void setupDebugSigHandlers(void);
void setupSigSegvHandler(void);
void removeSigSegvHandlers(void);
const char *getSafeInfoString(const char *s, size_t len, char **tmp);

197
src/threads_mngr.c Normal file
View File

@ -0,0 +1,197 @@
/*
* Copyright (c) 2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "threads_mngr.h"
/* Anti-warning macro... */
#define UNUSED(V) ((void) V)
#ifdef __linux__
#include "zmalloc.h"
#include "atomicvar.h"
#include "server.h"
#include <signal.h>
#include <time.h>
#include <errno.h>
#include <semaphore.h>
#include <sys/syscall.h>
#define IN_PROGRESS 1
static const clock_t RUN_ON_THREADS_TIMEOUT = 2;
/*================================= Globals ================================= */
static run_on_thread_cb g_callback = NULL;
static volatile size_t g_tids_len = 0;
static void **g_output_array = NULL;
static redisAtomic size_t g_thread_ids = 0;
static redisAtomic size_t g_num_threads_done = 0;
static sem_t wait_for_threads_sem;
/* This flag is set while ThreadsManager_runOnThreads is running */
static redisAtomic int g_in_progress = 0;
/*============================ Internal prototypes ========================== */
static void invoke_callback(int sig);
/* returns 0 if it is safe to start, IN_PROGRESS otherwise. */
static int test_and_start(void);
static void wait_threads(void);
/* Clean up global variable.
Assuming we are under the g_in_progress protection, this is not a thread-safe function */
static void ThreadsManager_cleanups(void);
/*============================ API functions implementations ========================== */
void ThreadsManager_init(void) {
/* Register signal handler */
struct sigaction act;
sigemptyset(&act.sa_mask);
/* Not setting SA_RESTART flag means that If a signal handler is invoked while a
system call or library function call is blocked, use the default behavior
i.e., the call fails with the error EINTR */
act.sa_flags = 0;
act.sa_handler = invoke_callback;
sigaction(SIGUSR2, &act, NULL);
}
void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) {
/* Check if it is safe to start running. If not - return */
if(test_and_start() == IN_PROGRESS) {
return NULL;
}
/* Update g_callback */
g_callback = callback;
/* Set g_tids_len */
g_tids_len = tids_len;
/* Allocate the output buffer */
g_output_array = zmalloc(sizeof(void*) * tids_len);
/* Initialize a semaphore that we will be waiting on for the threads
use pshared = 0 to indicate the semaphore is shared between the process's threads (and not between processes),
and value = 0 as the initial semaphore value. */
sem_init(&wait_for_threads_sem, 0, 0);
/* Send signal to all the threads in tids */
pid_t pid = getpid();
for (size_t i = 0; i < tids_len ; ++i) {
syscall(SYS_tgkill, pid, tids[i], THREADS_SIGNAL);
}
/* Wait for all the threads to write to the output array, or until timeout is reached */
wait_threads();
void **ret = g_output_array;
/* Cleanups to allow next execution */
ThreadsManager_cleanups();
return ret;
}
/*============================ Internal functions implementations ========================== */
static int test_and_start(void) {
/* atomicFlagGetSet sets the variable to 1 and returns the previous value */
int prev_state;
atomicFlagGetSet(g_in_progress, prev_state);
/* If prev_state is 1, g_in_progress was on. */
return prev_state;
}
static void invoke_callback(int sig) {
UNUSED(sig);
size_t thread_id;
atomicGetIncr(g_thread_ids, thread_id, 1);
g_output_array[thread_id] = g_callback();
size_t curr_done_count;
atomicIncrGet(g_num_threads_done, curr_done_count, 1);
/* last thread shuts down the light */
if (curr_done_count == g_tids_len) {
sem_post(&wait_for_threads_sem);
}
}
static void wait_threads(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
/* calculate relative time until timeout */
ts.tv_sec += RUN_ON_THREADS_TIMEOUT;
int status = 0;
/* lock the semaphore until the semaphore value rises above zero or a signal
handler interrupts the call. In the later case continue to wait. */
while ((status = sem_timedwait(&wait_for_threads_sem, &ts)) == -1 && errno == EINTR) {
serverLog(LL_WARNING, "threads_mngr: waiting for threads' output was interrupted by signal. Continue waiting.");
continue;
}
if (status == -1) {
if (errno == ETIMEDOUT) {
serverLog(LL_WARNING, "threads_mngr: waiting for threads' output timed out");
}
}
}
static void ThreadsManager_cleanups(void) {
g_callback = NULL;
g_tids_len = 0;
g_output_array = NULL;
g_thread_ids = 0;
g_num_threads_done = 0;
sem_destroy(&wait_for_threads_sem);
/* Lastly, turn off g_in_progress */
atomicSet(g_in_progress, 0);
}
#else
void ThreadsManager_init(void) {
/* DO NOTHING */
}
void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) {
/* DO NOTHING */
UNUSED(tids);
UNUSED(tids_len);
UNUSED(callback);
return NULL;
}
#endif /* __linux__ */

76
src/threads_mngr.h Normal file
View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include "fmacros.h"
#include <sys/types.h>
#include <unistd.h>
/** This is an API to invoke callback on a list of threads using a user defined signal handler.
* NOTE: This is API is only supported only in linux systems.
* Calling the functions below on any other system does nothing.
*/
#define THREADS_SIGNAL SIGUSR2
/* Callback signature */
typedef void*(*run_on_thread_cb)(void);
/* Register the process to THREADS_SIGNAL */
void ThreadsManager_init(void);
/** @brief Invoke callback by each thread in tids.
*
* @param tids An array of threads that need to invoke callback.
* @param tids_len The number of threads in @param tids.
* @param callback A callback to be invoked by each thread in @param tids.
*
* NOTES:
* It is assumed that all the threads don't block or ignore THREADS_SIGNAL.
*
* It is safe to include the calling thread in @param tids. However, be aware that subsequent tids will
* not be signaled until the calling thread returns from the callback invocation.
* Hence, it is recommended to place the calling thread last in @param tids.
*
* The function returns only when @param tids_len threads have returned from @param callback.
*
* @return NULL If ThreadsManager_runOnThreads is already in the middle of execution.
* Otherwise, it returns an array of the threads return value from @param callback.
* NOTES:
* The indices of the outputs in the output array are NOT associated with the threads indices in @param tids.
*
* The returned array length will be @param tids_len, but some of the entries might be set to NULL if the
* invocation of @param callback was unsuccessful.
*
* The output array should be freed by the caller by calling zfree().
**/
void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback);

View File

@ -2,11 +2,13 @@ tags {"external:skip"} {
set system_name [string tolower [exec uname -s]]
set backtrace_supported 0
set threads_mngr_supported 0
# We only support darwin or Linux with glibc
if {$system_name eq {darwin}} {
set backtrace_supported 1
} elseif {$system_name eq {linux}} {
set threads_mngr_supported 1
# Avoid the test on libmusl, which does not support backtrace
# and on static binaries (ldd exit code 1) where we can't detect libmusl
catch {
@ -23,6 +25,17 @@ if {$backtrace_supported} {
test "Server is able to generate a stack trace on selected systems" {
r config set watchdog-period 200
r debug sleep 1
if {$threads_mngr_supported} {
assert_equal [count_log_message 0 "failed to open /proc/"] 0
assert_equal [count_log_message 0 "failed to find SigBlk or/and SigIgn"] 0
assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output was interrupted by signal"] 0
assert_equal [count_log_message 0 "threads_mngr: waiting for threads' output timed out"] 0
assert_equal [count_log_message 0 "bioProcessBackgroundJobs"] 3
}
set pattern "*redis-server *main*"
set res [wait_for_log_messages 0 \"$pattern\" 0 100 100]
if {$::verbose} { puts $res }
set pattern "*debugCommand*"
set res [wait_for_log_messages 0 \"$pattern\" 0 100 100]
if {$::verbose} { puts $res }