diff --git a/src/debug.c b/src/debug.c index 8bbb59b04..9e1d48bd0 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1822,12 +1822,14 @@ void closeDirectLogFiledes(int fd) { #ifdef HAVE_BACKTRACE #define BACKTRACE_MAX_SIZE 100 + #ifdef __linux__ #include #include #include -static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output); +#define TIDS_MAX_SIZE 50 +static size_t get_ready_to_signal_threads_tids(pid_t pid, int sig_num, pid_t tids[TIDS_MAX_SIZE]); #define MAX_BUFF_LENGTH 256 typedef struct { @@ -1837,38 +1839,48 @@ typedef struct { void *trace[BACKTRACE_MAX_SIZE]; } stacktrace_data; -__attribute__ ((noinline)) static void *collect_stacktrace_data(void) { +static stacktrace_data *stacktraces_mempool = NULL; +static redisAtomic size_t g_thread_ids = 0; + +static stacktrace_data *get_stack_trace_pool(void) { + size_t thread_id; + atomicGetIncr(g_thread_ids, thread_id, 1); + return stacktraces_mempool + thread_id; +} + +__attribute__ ((noinline)) static void collect_stacktrace_data(void) { /* allocate stacktrace_data struct */ - stacktrace_data *trace_data = zmalloc(sizeof(stacktrace_data)); + stacktrace_data *trace_data = get_stack_trace_pool(); /* Get the stack trace first! */ trace_data->trace_size = backtrace(trace_data->trace, BACKTRACE_MAX_SIZE); - + /* get the thread name */ prctl(PR_GET_NAME, trace_data->thread_name); /* get the thread id */ trace_data->tid = syscall(SYS_gettid); - - /* return the trace data */ - return trace_data; } -__attribute__ ((noinline)) +__attribute__ ((noinline)) static void writeStacktraces(int fd, int uplevel) { /* get the list of all the process's threads that don't block or ignore the THREADS_SIGNAL */ pid_t pid = getpid(); - size_t len_tids = 0; - pid_t *tids = get_ready_to_signal_threads_tids(pid, THREADS_SIGNAL, &len_tids); + pid_t tids[TIDS_MAX_SIZE]; + size_t len_tids = get_ready_to_signal_threads_tids(pid, THREADS_SIGNAL, tids); + if (!len_tids) { + serverLogFromHandler(LL_WARNING, "writeStacktraces(): Failed to get the process's threads."); + } - /* This call returns either NULL or the stacktraces data from all tids */ - stacktrace_data **stacktraces_data = (stacktrace_data **)ThreadsManager_runOnThreads(tids, len_tids, collect_stacktrace_data); + stacktrace_data stacktraces[len_tids]; + stacktraces_mempool = stacktraces; + memset(stacktraces, 0, sizeof(stacktrace_data) * len_tids); - /* free tids */ - zfree(tids); + /* restart mempool iterator*/ + atomicSet(g_thread_ids, 0); - /* ThreadsManager_runOnThreads returns NULL if it is already running */ - if (!stacktraces_data) return; + /* ThreadsManager_runOnThreads returns 0 if it is already running */ + if (!ThreadsManager_runOnThreads(tids, len_tids, collect_stacktrace_data)) return; size_t skipped = 0; @@ -1876,21 +1888,21 @@ static void writeStacktraces(int fd, int uplevel) { pid_t calling_tid = syscall(SYS_gettid); /* for backtrace_data in backtraces_data: */ for (size_t i = 0; i < len_tids; i++) { - stacktrace_data *curr_stacktrace_data = stacktraces_data[i]; + stacktrace_data curr_stacktrace_data = stacktraces[i]; /*ThreadsManager_runOnThreads might fail to collect the thread's data */ - if (!curr_stacktrace_data) { + if (0 == curr_stacktrace_data.trace_size) { skipped++; continue; } /* stacktrace header includes the tid and the thread's name */ - snprintf(buff, MAX_BUFF_LENGTH, "\n%d %s", curr_stacktrace_data->tid, curr_stacktrace_data->thread_name); + snprintf(buff, MAX_BUFF_LENGTH, "\n%d %s", curr_stacktrace_data.tid, curr_stacktrace_data.thread_name); if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; /* skip kernel call to the signal handler, the signal handler and the callback addresses */ int curr_uplevel = 3; - if (curr_stacktrace_data->tid == calling_tid) { + if (curr_stacktrace_data.tid == calling_tid) { /* skip signal syscall and ThreadsManager_runOnThreads */ curr_uplevel += uplevel + 2; /* Add an indication to header of the thread that is handling the log file */ @@ -1903,11 +1915,8 @@ static void writeStacktraces(int fd, int uplevel) { if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; /* add the stacktrace */ - backtrace_symbols_fd(curr_stacktrace_data->trace+curr_uplevel, curr_stacktrace_data->trace_size-curr_uplevel, fd); - - zfree(curr_stacktrace_data); + backtrace_symbols_fd(curr_stacktrace_data.trace+curr_uplevel, curr_stacktrace_data.trace_size-curr_uplevel, fd); } - zfree(stacktraces_data); snprintf(buff, MAX_BUFF_LENGTH, "\n%zu/%zu expected stacktraces.\n", len_tids - skipped, len_tids); if (write(fd,buff,strlen(buff)) == -1) {/* Avoid warning. */}; @@ -2486,9 +2495,8 @@ void debugDelay(int usec) { #ifdef __linux__ /* =========================== Stacktrace Utils ============================ */ -#define TIDS_INITIAL_SIZE 50 -/** If it doesn't block and doesn't ignore, return 1 (the thread will handle the signal) +/** If it doesn't block and doesn't ignore, return 1 (the thread will handle the signal) * If thread tid blocks or ignores sig_num returns 0 (thread is not ready to catch the signal). * also returns 0 if something is wrong and prints a warning message to the log file **/ static int is_thread_ready_to_signal(pid_t pid, pid_t tid, int sig_num) { @@ -2527,23 +2535,21 @@ static int is_thread_ready_to_signal(pid_t pid, pid_t tid, int sig_num) { serverLog(LL_WARNING, "tid:%d: failed to find SigBlk or/and SigIgn field(s) in /proc/%d/task/%d/status file", tid, pid, tid); } - return ret; + return ret; } -/** Returns a list of all the process's (pid) threads that can receive signal sig_num. - * Also updates tids_len_output to the number of valid threads' ids in the returned array - * NOTE: It is the caller responsibility to free the returned array with zfree(). */ -static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *tids_len_output) { +/** Returns the number of the process's threads that can receive signal sig_num. + * Writes into tids the tids of these threads. + * If it fails, returns 0. +*/ +static size_t get_ready_to_signal_threads_tids(pid_t pid, int sig_num, pid_t tids[TIDS_MAX_SIZE]) { /* Initialize the path the process threads' directory. */ char path_buff[MAX_BUFF_LENGTH]; snprintf(path_buff, MAX_BUFF_LENGTH, "/proc/%d/task", pid); /* Get the directory handler. */ DIR *dir; - if (!(dir = opendir(path_buff))) return NULL; - - size_t tids_cap = TIDS_INITIAL_SIZE; - pid_t *tids = zmalloc(sizeof(pid_t) * tids_cap); + if (!(dir = opendir(path_buff))) return 0; size_t tids_count = 0; struct dirent *entry; @@ -2555,7 +2561,7 @@ static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *t if (entry->d_type == DT_DIR) { /* Skip irrelevant directories. */ if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) { - /* the thread's directory name is equivalent to its tid. */ + /* the thread's directory name is equivalent to its tid. */ pid_t tid = atoi(entry->d_name); if(!is_thread_ready_to_signal(pid, tid, sig_num)) continue; @@ -2564,31 +2570,28 @@ static pid_t *get_ready_to_signal_threads_tids(pid_t pid, int sig_num, size_t *t current_thread_index = tids_count; } - /* increase tids capacity if needed */ - if(tids_count >= tids_cap) { - tids_cap *= 2; - tids = zrealloc(tids, sizeof(pid_t) * tids_cap); - } - /* save the thread id */ tids[tids_count++] = tid; } } + /* Stop if we reached the maximum threads number. */ + if(tids_count == TIDS_MAX_SIZE) { + serverLogFromHandler(LL_WARNING,"get_ready_to_signal_threads_tids(): Reached the limit of the tids buffer."); + break; + } } /* Swap the last tid with the the current thread id */ if(current_thread_index != -1) { pid_t last_tid = tids[tids_count - 1]; - + tids[tids_count - 1] = calling_tid; tids[current_thread_index] = last_tid; } - closedir(dir); - *tids_len_output = tids_count; - return tids; + return tids_count; } #endif /* __linux__ */ #endif /* HAVE_BACKTRACE */ diff --git a/src/threads_mngr.c b/src/threads_mngr.c index b6bb97a8a..af3fea940 100644 --- a/src/threads_mngr.c +++ b/src/threads_mngr.c @@ -49,8 +49,6 @@ static const clock_t RUN_ON_THREADS_TIMEOUT = 2; static run_on_thread_cb g_callback = NULL; static volatile size_t g_tids_len = 0; -static void **g_output_array = NULL; -static redisAtomic size_t g_thread_ids = 0; static redisAtomic size_t g_num_threads_done = 0; static sem_t wait_for_threads_sem; @@ -83,11 +81,11 @@ void ThreadsManager_init(void) { sigaction(SIGUSR2, &act, NULL); } -__attribute__ ((noinline)) -void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { +__attribute__ ((noinline)) +int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { /* Check if it is safe to start running. If not - return */ if(test_and_start() == IN_PROGRESS) { - return NULL; + return 0; } /* Update g_callback */ @@ -96,9 +94,6 @@ void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_c /* Set g_tids_len */ g_tids_len = tids_len; - /* Allocate the output buffer */ - g_output_array = zcalloc(sizeof(void*) * tids_len); - /* Initialize a semaphore that we will be waiting on for the threads use pshared = 0 to indicate the semaphore is shared between the process's threads (and not between processes), and value = 0 as the initial semaphore value. */ @@ -113,12 +108,10 @@ void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_c /* Wait for all the threads to write to the output array, or until timeout is reached */ wait_threads(); - void **ret = g_output_array; - /* Cleanups to allow next execution */ ThreadsManager_cleanups(); - return ret; + return 1; } /*============================ Internal functions implementations ========================== */ @@ -133,7 +126,7 @@ static int test_and_start(void) { return prev_state; } -__attribute__ ((noinline)) +__attribute__ ((noinline)) static void invoke_callback(int sig) { UNUSED(sig); @@ -143,10 +136,8 @@ static void invoke_callback(int sig) { return; } - if (g_output_array) { - size_t thread_id; - atomicGetIncr(g_thread_ids, thread_id, 1); - g_output_array[thread_id] = g_callback(); + if (g_callback) { + g_callback(); size_t curr_done_count; atomicIncrGet(g_num_threads_done, curr_done_count, 1); @@ -181,8 +172,6 @@ static void ThreadsManager_cleanups(void) { g_callback = NULL; g_tids_len = 0; - g_output_array = NULL; - g_thread_ids = 0; g_num_threads_done = 0; sem_destroy(&wait_for_threads_sem); @@ -197,12 +186,12 @@ void ThreadsManager_init(void) { /* DO NOTHING */ } -void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { +int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { /* DO NOTHING */ UNUSED(tids); UNUSED(tids_len); UNUSED(callback); - return NULL; + return 1; } #endif /* __linux__ */ diff --git a/src/threads_mngr.h b/src/threads_mngr.h index 2dc82ca0e..d84571c7f 100644 --- a/src/threads_mngr.h +++ b/src/threads_mngr.h @@ -42,7 +42,7 @@ #define THREADS_SIGNAL SIGUSR2 /* Callback signature */ -typedef void*(*run_on_thread_cb)(void); +typedef void(*run_on_thread_cb)(void); /* Register the process to THREADS_SIGNAL */ void ThreadsManager_init(void); @@ -62,15 +62,8 @@ void ThreadsManager_init(void); * * The function returns only when @param tids_len threads have returned from @param callback. * - * @return NULL If ThreadsManager_runOnThreads is already in the middle of execution. - * Otherwise, it returns an array of the threads return value from @param callback. - * NOTES: - * The indices of the outputs in the output array are NOT associated with the threads indices in @param tids. + * @return 1 if successful, 0 If ThreadsManager_runOnThreads is already in the middle of execution. * - * The returned array length will be @param tids_len, but some of the entries might be set to NULL if the - * invocation of @param callback was unsuccessful. - * - * The output array should be freed by the caller by calling zfree(). **/ -void **ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback); +int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback);