From b84d9671ec4a987f51c007eee1bbb66ad3e7d996 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 22 Oct 2019 00:43:32 -0400 Subject: [PATCH 01/21] Implement deadlock detection Former-commit-id: fa797408d9c5d5f12053641144fe1a8b24f66185 --- src/fastlock.cpp | 86 +++++++++++++++++++++++++++++++++++++------- src/fastlock_x64.asm | 24 ++++++++++++- 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 71a49a1e8..75a0f8381 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef __linux__ #include #endif @@ -125,6 +126,60 @@ #endif +#pragma weak _serverPanic +extern "C" void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...) +{ + *((char*)-1) = 'x'; +} + +class DeadlockDetector +{ + std::map m_mapwait; + fastlock m_lock; +public: + void registerwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock) + return; + fastlock_lock(&m_lock); + m_mapwait.insert(std::make_pair(thispid, lock)); + + // Detect cycles + pid_t pidCheck = thispid; + for (;;) + { + auto itr = m_mapwait.find(pidCheck); + if (itr == m_mapwait.end()) + break; + pidCheck = itr->second->m_pidOwner; + if (pidCheck == thispid) + _serverPanic(__FILE__, __LINE__, "Deadlock detected"); + } + fastlock_unlock(&m_lock); + } + + void clearwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock) + return; + fastlock_lock(&m_lock); + m_mapwait.erase(thispid); + fastlock_unlock(&m_lock); + } +}; + +DeadlockDetector g_dlock; + +extern "C" void registerwait(fastlock *lock, pid_t thispid) +{ + g_dlock.registerwait(lock, thispid); +} + +extern "C" void clearwait(fastlock *lock, pid_t thispid) +{ + g_dlock.clearwait(lock, thispid); +} + static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; @@ -184,34 +239,41 @@ extern "C" void fastlock_lock(struct fastlock *lock) return; } + int tid = gettid(); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); #ifdef __linux__ unsigned mask = (1U << (myticket % 32)); #endif int cloops = 0; ticket ticketT; - for (;;) + + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) != myticket) { - __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); - if ((ticketT.u & 0xffff) == myticket) - break; + registerwait(lock, tid); + for (;;) + { + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) == myticket) + break; #if defined(__i386__) || defined(__amd64__) - __asm__ ("pause"); + __asm__ ("pause"); #endif - if ((++cloops % 1024*1024) == 0) - { + if ((++cloops % 1024*1024) == 0) + { #ifdef __linux__ - __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); - futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); - __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); + __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); #endif - __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); + } } + clearwait(lock, tid); } lock->m_depth = 1; - int tid = gettid(); __atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE); ANNOTATE_RWLOCK_ACQUIRED(lock, true); std::atomic_thread_fence(std::memory_order_acquire); diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index baf33654f..c054e6bc6 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -3,6 +3,8 @@ section .text extern gettid extern sched_yield extern g_longwaits +extern registerwait +extern clearwait ; This is the first use of assembly in this codebase, a valid question is WHY? ; The spinlock we implement here is performance critical, and simply put GCC @@ -31,11 +33,24 @@ fastlock_lock: inc eax ; we want to add one lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition ; ax now contains the ticket + mov edx, [rdi] + cmp dx, ax ; is our ticket up? + je .LLocked ; no need to loop + ; Lock is contended, so inform the deadlock detector + push rax + push rdi + push rsi + call registerwait + pop rsi + pop rdi + pop rax + ; OK Start the wait loop + xor ecx, ecx ALIGN 16 .LLoop: mov edx, [rdi] cmp dx, ax ; is our ticket up? - je .LLocked ; leave the loop + je .LExitLoop ; leave the loop pause add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) ; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) @@ -69,6 +84,13 @@ ALIGN 16 xor ecx, ecx ; Reset our loop counter jmp .LLoop ; Get back in the game ALIGN 16 +.LExitLoop: + push rsi + push rdi + call clearwait + pop rdi + pop rsi +ALIGN 16 .LLocked: mov [rdi+4], esi ; lock->m_pidOwner = gettid() inc dword [rdi+8] ; lock->m_depth++ From 1c1260d71fa6bffc9d78f028af56fc36f93a0226 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 22 Oct 2019 21:34:51 -0400 Subject: [PATCH 02/21] Optimize deadlock detection, fix callstack for ASM, and annotate locks Note: This change moves our assembly code to use the GNU Assembler because NASM seems to be incapable of emitting the necessary debug information for callstack unwinding to work. Former-commit-id: 600fc241cfe79b9b32ac6010c6ea0c66747f0f15 --- src/Makefile | 4 +- src/ae.cpp | 4 +- src/aof.cpp | 2 +- src/fastlock.cpp | 91 +++++++++------ src/fastlock.h | 13 ++- src/fastlock_x64.asm | 260 ++++++++++++++++++++----------------------- src/networking.cpp | 2 +- src/server.cpp | 4 +- src/server.h | 6 +- 9 files changed, 192 insertions(+), 194 deletions(-) diff --git a/src/Makefile b/src/Makefile index be4c07369..ac28ca80e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -190,7 +190,7 @@ endif REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS) -REDIS_NASM=$(QUIET_CC)nasm -felf64 +KEYDB_AS=$(QUIET_CC) as --64 -g REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) @@ -295,7 +295,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c $(REDIS_CXX) -c $< %.o: %.asm .make-prerequisites - $(REDIS_NASM) $< + $(KEYDB_AS) $< -o $@ clean: rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark diff --git a/src/ae.cpp b/src/ae.cpp index f636078b1..0deec264f 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -80,7 +80,7 @@ public: mutex_wrapper g_lock; #else -fastlock g_lock; +fastlock g_lock("AE (global)"); #endif thread_local aeEventLoop *g_eventLoopThisThread = NULL; @@ -327,7 +327,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; - fastlock_init(&eventLoop->flock); + fastlock_init(&eventLoop->flock, "event loop"); int rgfd[2]; if (pipe(rgfd) < 0) goto err; diff --git a/src/aof.cpp b/src/aof.cpp index e18bce652..719b72ed9 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -678,7 +678,7 @@ client *createFakeClient(void) { c->puser = NULL; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); - fastlock_init(&c->lock); + fastlock_init(&c->lock, "fake client"); fastlock_lock(&c->lock); initClientMultiState(c); return c; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 75a0f8381..1bbc2d6d5 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -41,6 +41,7 @@ #include #endif #include +#include #ifdef __APPLE__ #include @@ -127,15 +128,25 @@ #endif #pragma weak _serverPanic -extern "C" void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...) +extern "C" __attribute__((weak)) void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...) { *((char*)-1) = 'x'; } +#pragma weak serverLog +__attribute__((weak)) void serverLog(int , const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + vprintf(fmt, args); + va_end(args); + printf("\n"); +} + class DeadlockDetector { std::map m_mapwait; - fastlock m_lock; + fastlock m_lock { "deadlock detector" }; public: void registerwait(fastlock *lock, pid_t thispid) { @@ -146,6 +157,7 @@ public: // Detect cycles pid_t pidCheck = thispid; + size_t cchecks = 0; for (;;) { auto itr = m_mapwait.find(pidCheck); @@ -153,7 +165,26 @@ public: break; pidCheck = itr->second->m_pidOwner; if (pidCheck == thispid) + { + // Deadlock detected, printout some debugging info and crash + serverLog(3 /*LL_WARNING*/, "\n\n"); + serverLog(3 /*LL_WARNING*/, "!!! ERROR: Deadlock detected !!!"); + pidCheck = thispid; + for (;;) + { + auto itr = m_mapwait.find(pidCheck); + serverLog(3 /* LL_WARNING */, "\t%d: (%p) %s", pidCheck, itr->second, itr->second->szName); + pidCheck = itr->second->m_pidOwner; + if (pidCheck == thispid) + break; + } + serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); _serverPanic(__FILE__, __LINE__, "Deadlock detected"); + } + + if (cchecks > m_mapwait.size()) + break; // There is a cycle but we're not in it + ++cchecks; } fastlock_unlock(&m_lock); } @@ -170,16 +201,6 @@ public: DeadlockDetector g_dlock; -extern "C" void registerwait(fastlock *lock, pid_t thispid) -{ - g_dlock.registerwait(lock, thispid); -} - -extern "C" void clearwait(fastlock *lock, pid_t thispid) -{ - g_dlock.clearwait(lock, thispid); -} - static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; @@ -190,7 +211,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifndef ASM_SPINLOCK #ifdef __linux__ static int futex(volatile unsigned *uaddr, int futex_op, int val, const struct timespec *timeout, int val3) @@ -199,7 +219,6 @@ static int futex(volatile unsigned *uaddr, int futex_op, int val, timeout, uaddr, val3); } #endif -#endif extern "C" pid_t gettid() { @@ -218,13 +237,26 @@ extern "C" pid_t gettid() return pidCache; } -extern "C" void fastlock_init(struct fastlock *lock) +extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) +{ +#ifdef __linux__ + g_dlock.registerwait(lock, pid); + __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask); + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); + g_dlock.clearwait(lock, pid); +#endif + __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); +} + +extern "C" void fastlock_init(struct fastlock *lock, const char *name) { lock->m_ticket.m_active = 0; lock->m_ticket.m_avail = 0; lock->m_depth = 0; lock->m_pidOwner = -1; lock->futex = 0; + lock->szName = name; ANNOTATE_RWLOCK_CREATE(lock); } @@ -241,36 +273,23 @@ extern "C" void fastlock_lock(struct fastlock *lock) int tid = gettid(); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); -#ifdef __linux__ unsigned mask = (1U << (myticket % 32)); -#endif int cloops = 0; ticket ticketT; - __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); - if ((ticketT.u & 0xffff) != myticket) + for (;;) { - registerwait(lock, tid); - for (;;) - { - __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); - if ((ticketT.u & 0xffff) == myticket) - break; + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) == myticket) + break; #if defined(__i386__) || defined(__amd64__) - __asm__ ("pause"); + __asm__ ("pause"); #endif - if ((++cloops % 1024*1024) == 0) - { -#ifdef __linux__ - __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); - futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); - __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); -#endif - __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); - } + if ((++cloops % 1024*1024) == 0) + { + fastlock_sleep(lock, tid, ticketT.u, mask); } - clearwait(lock, tid); } lock->m_depth = 1; diff --git a/src/fastlock.h b/src/fastlock.h index c7a40bdf3..0117049a6 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -7,7 +7,7 @@ extern "C" { /* Begin C API */ struct fastlock; -void fastlock_init(struct fastlock *lock); +void fastlock_init(struct fastlock *lock, const char *name); void fastlock_lock(struct fastlock *lock); int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); @@ -45,24 +45,25 @@ struct fastlock volatile int m_pidOwner; volatile int m_depth; unsigned futex; + const char *szName; #ifdef __cplusplus - fastlock() + fastlock(const char *name) { - fastlock_init(this); + fastlock_init(this, name); } - void lock() + inline void lock() { fastlock_lock(this); } - bool try_lock(bool fWeak = false) + inline bool try_lock(bool fWeak = false) { return !!fastlock_trylock(this, fWeak); } - void unlock() + inline void unlock() { fastlock_unlock(this); } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index c054e6bc6..6c9df490e 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -1,178 +1,160 @@ -section .text +.intel_syntax noprefix +.text -extern gettid -extern sched_yield -extern g_longwaits -extern registerwait -extern clearwait +.extern gettid +.extern fastlock_sleep -; This is the first use of assembly in this codebase, a valid question is WHY? -; The spinlock we implement here is performance critical, and simply put GCC -; emits awful code. The original C code is left in fastlock.cpp for reference -; and x-plat. +# This is the first use of assembly in this codebase, a valid question is WHY? +# The spinlock we implement here is performance critical, and simply put GCC +# emits awful code. The original C code is left in fastlock.cpp for reference +# and x-plat. -ALIGN 16 -global fastlock_lock +.ALIGN 16 +.global fastlock_lock +.type fastlock_lock,@function fastlock_lock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + .cfi_startproc + .cfi_def_cfa rsp, 8 + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth - ; First get our TID and put it in ecx - push rdi ; we need our struct pointer (also balance the stack for the call) - call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) - mov esi, eax ; back it up in esi - pop rdi ; get our pointer back + # First get our TID and put it in ecx + push rdi # we need our struct pointer (also balance the stack for the call) + .cfi_adjust_cfa_offset 8 + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) + mov esi, eax # back it up in esi + pop rdi # get our pointer back + .cfi_adjust_cfa_offset -8 - cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? - je .LLocked ; Don't spin in that case + cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + je .LLocked # Don't spin in that case - xor eax, eax ; eliminate partial register dependency - inc eax ; we want to add one - lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition - ; ax now contains the ticket - mov edx, [rdi] - cmp dx, ax ; is our ticket up? - je .LLocked ; no need to loop - ; Lock is contended, so inform the deadlock detector - push rax - push rdi - push rsi - call registerwait - pop rsi - pop rdi - pop rax - ; OK Start the wait loop + xor eax, eax # eliminate partial register dependency + inc eax # we want to add one + lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition + # ax now contains the ticket + # OK Start the wait loop xor ecx, ecx -ALIGN 16 +.ALIGN 16 .LLoop: mov edx, [rdi] - cmp dx, ax ; is our ticket up? - je .LExitLoop ; leave the loop + cmp dx, ax # is our ticket up? + je .LLocked # leave the loop pause - add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) - ; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) - jnc .LLoop ; If so, give up our timeslice to someone who's doing real work - ; Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" - ; But the compiler doesn't know that we rarely hit this, and when we do we know the lock is - ; taking a long time to be released anyways. We optimize for the common case of short - ; lock intervals. That's why we're using a spinlock in the first place - ; If we get here we're going to sleep in the kernel with a futex + add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) + # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) + jnc .LLoop # If so, give up our timeslice to someone who's doing real work + # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" + # But the compiler doesn't know that we rarely hit this, and when we do we know the lock is + # taking a long time to be released anyways. We optimize for the common case of short + # lock intervals. That's why we're using a spinlock in the first place + # If we get here we're going to sleep in the kernel with a futex + push rdi push rsi push rax - ; Setup the syscall args - ; rdi ARG1 futex (already in rdi) - mov esi, (9 | 128) ; rsi ARG2 FUTEX_WAIT_BITSET_PRIVATE - ; rdx ARG3 ticketT.u (already in edx) - xor r10d, r10d ; r10 ARG4 NULL - mov r8, rdi ; r8 ARG5 dup rdi - xor r9d, r9d - bts r9d, eax ; r9 ARG6 mask - mov eax, 202 ; sys_futex - ; Do the syscall - lock or [rdi+12], r9d ; inform the unlocking thread we're waiting - syscall ; wait for the futex - not r9d ; convert our flag into a mask of bits not to touch - lock and [rdi+12], r9d ; clear the flag in the futex control mask - ; cleanup and continue - mov rcx, g_longwaits - inc qword [rcx] ; increment our long wait counter + .cfi_adjust_cfa_offset 24 + # Setup the syscall args + + # rdi ARG1 futex (already in rdi) + # rsi ARG2 tid (already in esi) + # rdx ARG3 ticketT.u (already in edx) + bts ecx, eax # rcx ARG4 mask + call fastlock_sleep + # cleanup and continue pop rax pop rsi - xor ecx, ecx ; Reset our loop counter - jmp .LLoop ; Get back in the game -ALIGN 16 -.LExitLoop: - push rsi - push rdi - call clearwait pop rdi - pop rsi -ALIGN 16 + .cfi_adjust_cfa_offset -24 + xor ecx, ecx # Reset our loop counter + jmp .LLoop # Get back in the game +.ALIGN 16 .LLocked: - mov [rdi+4], esi ; lock->m_pidOwner = gettid() - inc dword [rdi+8] ; lock->m_depth++ + mov [rdi+4], esi # lock->m_pidOwner = gettid() + inc dword ptr [rdi+8] # lock->m_depth++ ret +.cfi_endproc -ALIGN 16 -global fastlock_trylock +.ALIGN 16 +.global fastlock_trylock +.type fastlock_trylock,@function fastlock_trylock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth - ; First get our TID and put it in ecx - push rdi ; we need our struct pointer (also balance the stack for the call) - call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) - mov esi, eax ; back it up in esi - pop rdi ; get our pointer back + # First get our TID and put it in ecx + push rdi # we need our struct pointer (also balance the stack for the call) + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) + mov esi, eax # back it up in esi + pop rdi # get our pointer back - cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? - je .LRecursive ; Don't spin in that case + cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + je .LRecursive # Don't spin in that case - mov eax, [rdi] ; get both active and avail counters - mov ecx, eax ; duplicate in ecx - ror ecx, 16 ; swap upper and lower 16-bits - cmp eax, ecx ; are the upper and lower 16-bits the same? - jnz .LAlreadyLocked ; If not return failure + mov eax, [rdi] # get both active and avail counters + mov ecx, eax # duplicate in ecx + ror ecx, 16 # swap upper and lower 16-bits + cmp eax, ecx # are the upper and lower 16-bits the same? + jnz .LAlreadyLocked # If not return failure - ; at this point we know eax+ecx have [avail][active] and they are both the same - add ecx, 10000h ; increment avail, ecx is now our wanted value - lock cmpxchg [rdi], ecx ; If rdi still contains the value in eax, put in ecx (inc avail) - jnz .LAlreadyLocked ; If Z is not set then someone locked it while we were preparing + # at this point we know eax+ecx have [avail][active] and they are both the same + add ecx, 0x10000 # increment avail, ecx is now our wanted value + lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail) + jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing xor eax, eax - inc eax ; return SUCCESS! (eax=1) - mov [rdi+4], esi ; lock->m_pidOwner = gettid() - mov dword [rdi+8], eax ; lock->m_depth = 1 + inc eax # return SUCCESS! (eax=1) + mov [rdi+4], esi # lock->m_pidOwner = gettid() + mov dword ptr [rdi+8], eax # lock->m_depth = 1 ret -ALIGN 16 +.ALIGN 16 .LRecursive: xor eax, eax - inc eax ; return SUCCESS! (eax=1) - inc dword [rdi+8] ; lock->m_depth++ + inc eax # return SUCCESS! (eax=1) + inc dword ptr [rdi+8] # lock->m_depth++ ret -ALIGN 16 +.ALIGN 16 .LAlreadyLocked: - xor eax, eax ; return 0; + xor eax, eax # return 0 ret -ALIGN 16 -global fastlock_unlock +.ALIGN 16 +.global fastlock_unlock fastlock_unlock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth push r11 - sub dword [rdi+8], 1 ; decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state - jnz .LDone ; if depth is non-zero this is a recursive unlock, and we still hold it - mov dword [rdi+4], -1 ; pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi] ; get current active (this one) - inc ecx ; bump it to the next thread - mov [rdi], cx ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable) - ; At this point the lock is removed, however we must wake up any pending futexs - mov r9d, 1 ; eax is the bitmask for 2 threads - rol r9d, cl ; place the mask in the right spot for the next 2 threads -ALIGN 16 + sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state + jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it + mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore) + mov ecx, [rdi] # get current active (this one) + inc ecx # bump it to the next thread + mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + # At this point the lock is removed, however we must wake up any pending futexs + mov r9d, 1 # eax is the bitmask for 2 threads + rol r9d, cl # place the mask in the right spot for the next 2 threads +.ALIGN 16 .LRetryWake: - mov r11d, [rdi+12] ; load the futex mask - and r11d, r9d ; are any threads waiting on a futex? - jz .LDone ; if not we're done. - ; we have to wake the futexs - ; rdi ARG1 futex (already in rdi) - mov esi, (10 | 128) ; rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE - mov edx, 0x7fffffff ; rdx ARG3 INT_MAX (number of threads to wake) - xor r10d, r10d ; r10 ARG4 NULL - mov r8, rdi ; r8 ARG5 dup rdi - ; r9 ARG6 mask (already set above) - mov eax, 202 ; sys_futex + mov r11d, [rdi+12] # load the futex mask + and r11d, r9d # are any threads waiting on a futex? + jz .LDone # if not we're done. + # we have to wake the futexs + # rdi ARG1 futex (already in rdi) + mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE + mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake) + xor r10d, r10d # r10 ARG4 NULL + mov r8, rdi # r8 ARG5 dup rdi + # r9 ARG6 mask (already set above) + mov eax, 202 # sys_futex syscall - cmp eax, 1 ; did we wake as many as we expected? + cmp eax, 1 # did we wake as many as we expected? jnz .LRetryWake .LDone: pop r11 diff --git a/src/networking.cpp b/src/networking.cpp index 636e95c62..6aa7e109f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -116,7 +116,7 @@ client *createClient(int fd, int iel) { uint64_t client_id; client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; - fastlock_init(&c->lock); + fastlock_init(&c->lock, "client"); c->id = client_id; c->resp = 2; c->fd = fd; diff --git a/src/server.cpp b/src/server.cpp index c33ff389b..34af382df 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2878,7 +2878,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) exit(1); } - fastlock_init(&pvar->lockPendingWrite); + fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite"); if (!fMain) { @@ -2925,8 +2925,6 @@ void initServer(void) { signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); - fastlock_init(&g_pserver->flock); - g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); /* Create the Redis databases, and initialize other internal state. */ diff --git a/src/server.h b/src/server.h index cd2734a1d..19958e180 100644 --- a/src/server.h +++ b/src/server.h @@ -1044,7 +1044,7 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { redisDb() - : expireitr(nullptr) + : expireitr(nullptr), lock("redisDB") {}; dict *pdict; /* The keyspace for this DB */ expireset *setexpire; @@ -1437,7 +1437,7 @@ struct redisServerThreadVars { client blocked on a module command needs to be processed. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ - struct fastlock lockPendingWrite; + struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; }; @@ -1819,8 +1819,6 @@ struct redisServer { int fActiveReplica; /* Can this replica also be a master? */ - struct fastlock flock; - // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition From 03769b5c172d08eb4df3abf545a01a86331f9d55 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 22 Oct 2019 23:26:37 -0400 Subject: [PATCH 03/21] Remove race conditions Former-commit-id: 5a8cb77d0df7f319809ff965a72fe46925f49289 --- src/debug.cpp | 2 +- src/fastlock.cpp | 5 +++-- src/networking.cpp | 4 ---- src/redis-benchmark.cpp | 2 ++ src/redis-cli.c | 2 ++ src/server.h | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 3a4520776..234f197be 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -55,7 +55,7 @@ typedef ucontext_t sigcontext_t; #endif #endif -bool g_fInCrash = false; +int g_fInCrash = false; /* ================================= Debugging ============================== */ diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 1bbc2d6d5..7a946fa8f 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -59,6 +59,7 @@ #define UNUSED(x) ((void)x) #endif +extern int g_fInCrash; /**************************************************** * @@ -150,7 +151,7 @@ class DeadlockDetector public: void registerwait(fastlock *lock, pid_t thispid) { - if (lock == &m_lock) + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); m_mapwait.insert(std::make_pair(thispid, lock)); @@ -191,7 +192,7 @@ public: void clearwait(fastlock *lock, pid_t thispid) { - if (lock == &m_lock) + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); m_mapwait.erase(thispid); diff --git a/src/networking.cpp b/src/networking.cpp index 6aa7e109f..45a66e387 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1509,7 +1509,6 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); - lock.unlock(); freeClientAsync(c); return C_ERR; @@ -1528,7 +1527,6 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - lock.unlock(); freeClientAsync(c); return C_ERR; } @@ -3000,7 +2998,6 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - aeReleaseLock(); while (iterations--) { int events = 0; events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); @@ -3008,7 +3005,6 @@ int processEventsWhileBlocked(int iel) { if (!events) break; count += events; } - aeAcquireLock(); return count; } diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 17866bec1..4f59c0a15 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -173,6 +173,8 @@ typedef struct redisConfig { sds appendonly; } redisConfig; +int g_fInCrash = false; + /* Prototypes */ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); static void createMissingClients(client c); diff --git a/src/redis-cli.c b/src/redis-cli.c index eae4a1d0e..f2c255c35 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -90,6 +90,8 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253 int *spectrum_palette; int spectrum_palette_size; +int g_fInCrash = false; + /*------------------------------------------------------------------------------ * Utility functions *--------------------------------------------------------------------------- */ diff --git a/src/server.h b/src/server.h index 19958e180..7c90a88c8 100644 --- a/src/server.h +++ b/src/server.h @@ -2797,7 +2797,7 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); int moduleGILAcquiredByModule(void); -extern bool g_fInCrash; +extern int g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate { return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash; From 6dc17bbe1f93aa0330bdf7fa297c3a2f690811d2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 23 Oct 2019 13:19:57 -0400 Subject: [PATCH 04/21] processEventsWhileBlocked needs to release the lock in a safe way Former-commit-id: 1a70af2ae13962db038b0635cc29488019323538 --- src/networking.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 45a66e387..0ebe1575e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2998,6 +2998,14 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; + client *c = serverTL->current_client; + if (c != nullptr) + { + serverAssert(c->flags & CLIENT_PROTECTED); + c->lock.unlock(); + c->db->lock.unlock(); + } + aeReleaseLock(); while (iterations--) { int events = 0; events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); @@ -3005,6 +3013,13 @@ int processEventsWhileBlocked(int iel) { if (!events) break; count += events; } + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + if (c != nullptr) + c->db->lock.lock(); + locker.release(); return count; } From 5f930a4f794370a28797ba991593430c86be7258 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 23 Oct 2019 13:20:23 -0400 Subject: [PATCH 05/21] If a replica is about to be closed, don't wait on its lock (potential deadlock) Former-commit-id: 4986dc6da9855ba14d760a89f13ec40c8bea4baf --- src/replication.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.cpp b/src/replication.cpp index eb8dec503..5abfd2514 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -384,6 +384,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; std::unique_locklock)> lock(replica->lock); if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) { From b14b11c87c26064a0b76e8591827db81fb1ecc86 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 23 Oct 2019 13:31:39 -0400 Subject: [PATCH 06/21] Fix build break in TravisCI Former-commit-id: 2da09e07321e114faa04f84ef7a50908ecac5eea --- src/fastlock.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 7a946fa8f..c74ca8358 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -42,6 +42,7 @@ #endif #include #include +#include #ifdef __APPLE__ #include From 86c1f7aaf7039ccea1ebfa816db09b98c07d5e02 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 23 Oct 2019 13:38:07 -0400 Subject: [PATCH 07/21] Build break, ensure C99 compatibility Former-commit-id: ca4ee4e3e4e28e2a186ac782ab0052c56a798ed2 --- src/redis-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index f2c255c35..ea920569d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -90,7 +90,7 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253 int *spectrum_palette; int spectrum_palette_size; -int g_fInCrash = false; +int g_fInCrash = 0; /*------------------------------------------------------------------------------ * Utility functions From 7a50cebac48602dffbbea1550fb8c7e46713bd96 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 24 Oct 2019 20:18:11 -0400 Subject: [PATCH 08/21] Remove unlocked KEYS command support, we will do this with snapshotting Former-commit-id: 4396682c07d4df3fdca01d1299ad171e310a9fc7 --- src/db.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 0d5042bab..d8aa0dc47 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -643,8 +643,6 @@ void keysCommand(client *c) { unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); - aeReleaseLock(); - di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { @@ -662,12 +660,6 @@ void keysCommand(client *c) { } dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); - - fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks - AeLocker lock; - lock.arm(c); - fastlock_lock(&c->db->lock); // we still need the DB lock - lock.release(); } /* This callback is used by scanGenericCommand in order to collect elements From 8f015eaebbdbcd5d9c2d8613fb15b9537a84355b Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 24 Oct 2019 20:18:48 -0400 Subject: [PATCH 09/21] Remove the DB lock, its unnecessary Former-commit-id: 631f863dd89cd642e2023beabf8b31cdc84bbdff --- src/networking.cpp | 3 --- src/server.cpp | 1 - src/server.h | 4 +--- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 0ebe1575e..34dea1452 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3003,7 +3003,6 @@ int processEventsWhileBlocked(int iel) { { serverAssert(c->flags & CLIENT_PROTECTED); c->lock.unlock(); - c->db->lock.unlock(); } aeReleaseLock(); while (iterations--) { @@ -3017,8 +3016,6 @@ int processEventsWhileBlocked(int iel) { if (c != nullptr) c->lock.lock(); locker.arm(c); - if (c != nullptr) - c->db->lock.lock(); locker.release(); return count; } diff --git a/src/server.cpp b/src/server.cpp index 34af382df..0f0dfe122 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3704,7 +3704,6 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { - std::unique_lockdb->lock)> ulock(c->db->lock); call(c,callFlags); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) diff --git a/src/server.h b/src/server.h index 7c90a88c8..276c939c3 100644 --- a/src/server.h +++ b/src/server.h @@ -1044,7 +1044,7 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { redisDb() - : expireitr(nullptr), lock("redisDB") + : expireitr(nullptr) {}; dict *pdict; /* The keyspace for this DB */ expireset *setexpire; @@ -1057,8 +1057,6 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ - - fastlock lock; } redisDb; /* Client MULTI/EXEC state */ From 1e952f28c2964fc6a15d98787f8f005363a2a7c4 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 24 Oct 2019 23:05:39 -0400 Subject: [PATCH 10/21] Sanitizers work with GCC too Former-commit-id: 549cad378d9faccae0c47917d000c0cad3561351 --- src/Makefile | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Makefile b/src/Makefile index ac28ca80e..9d4c22cf3 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,8 +47,6 @@ endif USEASM?=true ifneq ($(SANITIZE),) - CC=clang - CXX=clang++ CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE LDFLAGS+= -fsanitize=$(SANITIZE) From bd9ea70609643e6c7ac906455f24252ee19b9e60 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 24 Oct 2019 23:07:02 -0400 Subject: [PATCH 11/21] Two fixes: 1) Remove race conditions by not locking clients when async writing. 2) Don't derefence dangling pointers in lambda Former-commit-id: cb93752aff4c67d4475e9ed17833335716c45744 --- src/networking.cpp | 6 +++++- src/replication.cpp | 32 +++++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 34dea1452..d32912bd8 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread serverAssert(FCorrectThread(c) || fAsync); - serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + if (FCorrectThread(c)) { + serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + } else { + serverAssert(GlobalLocksAcquired()); + } if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. // do not install a write handler diff --git a/src/replication.cpp b/src/replication.cpp index 5abfd2514..d2f948567 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -385,7 +385,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_locklock)> lock(replica->lock); + std::unique_locklock)> lock(replica->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(replica)) + lock.lock(); if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) { replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; @@ -434,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *replica = (client*)ln->value; - std::lock_guardlock)> ulock(replica->lock); + std::unique_locklock)> ulock(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ulock.lock(); if (FMasterHost(replica)) continue; // Active Active case, don't feed back @@ -483,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; - std::lock_guardlock)> lock(monitor->lock); + std::unique_locklock)> lock(monitor->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(c)) + lock.lock(); addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); @@ -1206,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } else { - aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{ + aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { + // Because the client could have been closed while the lambda waited to run we need to + // verify the replica is still connected + listIter li; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -3379,4 +3401,4 @@ void updateMasterAuth() if (cserver.default_masteruser) mi->masteruser = zstrdup(cserver.default_masteruser); } -} \ No newline at end of file +} From df8a20a744e406e92856d28fae0ba4a50cfd1579 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 24 Oct 2019 23:30:08 -0400 Subject: [PATCH 12/21] Signals can happen on any thread, so look for the signal handler not the command on the callstack Former-commit-id: f1d2b2945007f8811528b197480e255c6b35559c --- tests/integration/logging.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/logging.tcl b/tests/integration/logging.tcl index c1f4854d4..9c8cbe8ba 100644 --- a/tests/integration/logging.tcl +++ b/tests/integration/logging.tcl @@ -6,7 +6,7 @@ if {$system_name eq {linux} || $system_name eq {darwin}} { test "Server is able to generate a stack trace on selected systems" { r config set watchdog-period 200 r debug sleep 1 - set pattern "*debugCommand*" + set pattern "*watchdogSignalHandler*" set retry 10 while {$retry} { set result [exec tail -100 < [srv 0 stdout]] From 2194d9147682975e7f82539ce2f119bdccc8969f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 02:44:14 -0400 Subject: [PATCH 13/21] Fix potential race in pubsub Former-commit-id: 427c5999f167256dc3a6a0f49ad28313f700a155 --- src/pubsub.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 46677487f..0c657a588 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -301,9 +301,11 @@ int pubsubPublishMessage(robj *channel, robj *message) { client *c = reinterpret_cast(ln->value); if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored continue; - fastlock_lock(&c->lock); + if (FCorrectThread(c)) + fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); - fastlock_unlock(&c->lock); + if (FCorrectThread(c)) + fastlock_unlock(&c->lock); receivers++; } } @@ -321,10 +323,12 @@ int pubsubPublishMessage(robj *channel, robj *message) { { if (pat->pclient->flags & CLIENT_CLOSE_ASAP) continue; - fastlock_lock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); - fastlock_unlock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_unlock(&pat->pclient->lock); receivers++; } } From 75b22fafc2b74f935b3117cb3baf5360b413941e Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 4 Nov 2019 17:31:29 -0800 Subject: [PATCH 14/21] Fix test failure with tcl8.5 (test only issue) Former-commit-id: 5e40ea6ee7f0f34a9e11fdd7518d81383dd73c41 --- tests/unit/bitops.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/bitops.tcl b/tests/unit/bitops.tcl index f8a5cbe18..a280721f0 100644 --- a/tests/unit/bitops.tcl +++ b/tests/unit/bitops.tcl @@ -238,7 +238,7 @@ start_server {tags {"bitops"}} { r set a "abcdefg" r bitop lshift x a 8 r get x - } "\x00abcdefg" + } "\000abcdefg" test {BITOP lshift char} { r set a "\xAA" From 451c6e81f4c4a83160cb14f5556a334c61eaff26 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:34:13 -0500 Subject: [PATCH 15/21] Improve AE Assert message Former-commit-id: cb0fc7cca2406cf24fc238d6b6e1247c60d86704 --- src/ae.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index 0deec264f..87212f704 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -84,7 +84,7 @@ fastlock g_lock("AE (global)"); #endif thread_local aeEventLoop *g_eventLoopThisThread = NULL; -#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0) +#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0) /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ From 93844525eafa8986f9f20109116cf6df0001bf92 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:49:32 -0500 Subject: [PATCH 16/21] killing clients should take effect ASAP Former-commit-id: d0ccb074d5451cd457fe88efeb007cdb9746cb7f --- src/networking.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index d32912bd8..a14994fbc 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2521,9 +2521,17 @@ NULL close_this_client = 1; } else { if (FCorrectThread(client)) + { freeClient(client); + } else + { + int iel = client->iel; freeClientAsync(client); + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + freeClientsInAsyncFreeQueue(iel); + }); + } } killed++; } From b8cc7e2b9c9223de33c1d6513decb80261998461 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 19:57:29 -0500 Subject: [PATCH 17/21] Debug sleep should apply to all threads Former-commit-id: 41b678814b2c2ff93935b57e630028aaf2e9ae62 --- src/debug.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/debug.cpp b/src/debug.cpp index 234f197be..9854d0fd4 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -595,9 +595,19 @@ NULL double dtime = strtod(szFromObj(c->argv[2]),NULL); long long utime = dtime*1000000; struct timespec tv; - tv.tv_sec = utime / 1000000; tv.tv_nsec = (utime % 1000000) * 1000; + + // Ensure all threads sleep + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (iel == ielFromEventLoop(serverTL->el)) + continue; // we will sleep ourselves below + aePostFunction(g_pserver->rgthreadvar[iel].el, [tv]{ + nanosleep(&tv, NULL); + }); + } + nanosleep(&tv, NULL); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"set-active-expire") && From a098681bbf13f382145945d946544ac6d11b8055 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 14 Nov 2019 20:14:24 -0500 Subject: [PATCH 18/21] Cluster multithreading fixes Former-commit-id: 3dd78a6101df0a980e520dcb55d80651bfc5a3a7 --- src/cluster.cpp | 38 ++++++++++++++++++++++++++++++++------ src/networking.cpp | 24 +++++++++++++++++------- src/server.cpp | 18 ++++++++++++------ src/server.h | 5 +++-- tests/cluster/run.tcl | 2 ++ 5 files changed, 66 insertions(+), 21 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 6ba5cfef7..3fd513221 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) { * This function will just make sure that the original node associated * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { + if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN) + { + // we can't perform this operation on this thread, queue it on the main thread + if (link->node) + link->node->link = NULL; + link->node = nullptr; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + freeClusterLink(link); + }); + return; + } if (link->fd != -1) { aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE); } @@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) { * consumed by write(). We don't try to optimize this for speed too much * as this is a very low traffic channel. */ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN); clusterLink *link = (clusterLink*) privdata; ssize_t nwritten; UNUSED(el); UNUSED(mask); - nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); + // We're about to release the lock, so the link's sndbuf needs to be owned fully by us + // allocate a new one in case anyone tries to write while we're waiting + sds sndbuf = link->sndbuf; + link->sndbuf = sdsempty(); + + aeReleaseLock(); + nwritten = write(fd, sndbuf, sdslen(sndbuf)); + aeAcquireLock(); + if (nwritten <= 0) { serverLog(LL_DEBUG,"I/O error writing to node link: %s", (nwritten == -1) ? strerror(errno) : "short write"); + sdsfree(sndbuf); handleLinkIOError(link); return; } - sdsrange(link->sndbuf,nwritten,-1); + sdsrange(sndbuf,nwritten,-1); + // Restore our send buffer, ensuring any unsent data is first + sndbuf = sdscat(sndbuf, link->sndbuf); + sdsfree(link->sndbuf); + link->sndbuf = sndbuf; if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE); + aeDeleteFileEvent(el, link->fd, AE_WRITABLE); } /* Read data. Try to read the first field of the header first to check the @@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * the link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { + serverAssert(GlobalLocksAcquired()); if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, - clusterWriteHandler,link); + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, + clusterWriteHandler,link,false); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); @@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) { void resetManualFailover(void) { if (g_pserver->cluster->mf_end && clientsArePaused()) { g_pserver->clients_pause_end_time = 0; - clientsArePaused(); /* Just use the side effect of the function. */ + unpauseClientsIfNecessary(); } g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */ g_pserver->cluster->mf_can_start = 0; diff --git a/src/networking.cpp b/src/networking.cpp index a14994fbc..ef3ee2683 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2960,38 +2960,48 @@ void flushSlavesOutputBuffers(void) { * than the time left for the previous pause, no change is made to the * left duration. */ void pauseClients(mstime_t end) { - if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time) + serverAssert(GlobalLocksAcquired()); + if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time) g_pserver->clients_pause_end_time = end; - g_pserver->clients_paused = 1; + + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].clients_paused = true; + } } /* Return non-zero if clients are currently paused. As a side effect the * function checks if the pause time was reached and clear it. */ int clientsArePaused(void) { - if (g_pserver->clients_paused && + return serverTL->clients_paused; +} + +void unpauseClientsIfNecessary() +{ + serverAssert(GlobalLocksAcquired()); + if (serverTL->clients_paused && g_pserver->clients_pause_end_time < g_pserver->mstime) { - aeAcquireLock(); listNode *ln; listIter li; client *c; - g_pserver->clients_paused = 0; + serverTL->clients_paused = 0; /* Put all the clients in the unblocked clients queue in order to * force the re-processing of the input buffer if any. */ listRewind(g_pserver->clients,&li); while ((ln = listNext(&li)) != NULL) { c = (client*)listNodeValue(ln); + if (!FCorrectThread(c)) + continue; /* Don't touch slaves and blocked clients. * The latter pending requests will be processed when unblocked. */ if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; queueClientForReprocessing(c); } - aeReleaseLock(); } - return g_pserver->clients_paused; } /* This function is called by Redis in order to process a few events from diff --git a/src/server.cpp b/src/server.cpp index 0f0dfe122..f01c8bb00 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1692,6 +1692,9 @@ void clientsCron(int iel) { fastlock_unlock(&c->lock); } } + + /* Free any pending clients */ + freeClientsInAsyncFreeQueue(iel); } /* This function handles 'background' operations we are required to do @@ -1812,6 +1815,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Update the time cache. */ updateCachedTime(); + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); + g_pserver->hz = g_pserver->config_hz; /* Adapt the g_pserver->hz value to the number of configured clients. If we have * many clients, we want to call serverCron() with an higher frequency. */ @@ -1819,7 +1825,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { while (listLength(g_pserver->clients) / g_pserver->hz > MAX_CLIENTS_PER_CLOCK_TICK) { - g_pserver->hz *= 2; + g_pserver->hz += g_pserver->hz; // *= 2 if (g_pserver->hz > CONFIG_MAX_HZ) { g_pserver->hz = CONFIG_MAX_HZ; break; @@ -2019,9 +2025,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Clear the paused clients flag if needed. */ - clientsArePaused(); /* Don't check return value, just use the side effect.*/ - /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); @@ -2078,6 +2081,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData { processUnblockedClients(iel); } + + /* Unpause clients if enough time has elapsed */ + unpauseClientsIfNecessary(); ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves clientsCron(iel); @@ -2871,6 +2877,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->cclients = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; + pvar->clients_paused = 0; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", @@ -2967,7 +2974,6 @@ void initServer(void) { g_pserver->ready_keys = listCreate(); g_pserver->clients_waiting_acks = listCreate(); g_pserver->get_ack_from_slaves = 0; - g_pserver->clients_paused = 0; cserver.system_memory_size = zmalloc_get_memory_size(); createSharedObjects(); @@ -4088,7 +4094,7 @@ sds genRedisInfoString(const char *section) { g_pserver->port, (intmax_t)uptime, (intmax_t)(uptime/(3600*24)), - g_pserver->hz, + g_pserver->hz.load(), g_pserver->config_hz, (unsigned long) lruclock, cserver.executable ? cserver.executable : "", diff --git a/src/server.h b/src/server.h index 276c939c3..29c2db301 100644 --- a/src/server.h +++ b/src/server.h @@ -1426,6 +1426,7 @@ struct redisServerThreadVars { aeEventLoop *el; int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ + int clients_paused; /* True if clients are currently paused */ std::vector clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; @@ -1518,7 +1519,7 @@ struct redisServer { int config_hz; /* Configured HZ value. May be different than the actual 'hz' field value if dynamic-hz is enabled. */ - int hz; /* serverCron() calls frequency in hertz */ + std::atomic hz; /* serverCron() calls frequency in hertz */ redisDb *db; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ @@ -1553,7 +1554,6 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ rax *clients_index; /* Active clients dictionary by client ID. */ - int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ std::atomic next_client_id; /* Next client unique ID. Incremental. */ @@ -2042,6 +2042,7 @@ void disconnectSlavesExcept(unsigned char *uuid); int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen); void pauseClients(mstime_t duration); int clientsArePaused(void); +void unpauseClientsIfNecessary(); int processEventsWhileBlocked(int iel); int handleClientsWithPendingWrites(int iel); int clientHasPendingReplies(client *c); diff --git a/tests/cluster/run.tcl b/tests/cluster/run.tcl index 93603ddc9..3d96e6c41 100644 --- a/tests/cluster/run.tcl +++ b/tests/cluster/run.tcl @@ -14,6 +14,8 @@ proc main {} { spawn_instance redis $::redis_base_port $::instances_count { "cluster-enabled yes" "appendonly yes" + "testmode yes" + "server-threads 3" } run_tests cleanup From baffeff5c7bdb5797475c6c33e429e3c0af3cf42 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 14:52:12 -0500 Subject: [PATCH 19/21] Improve perf of reading cluster bitfield Former-commit-id: 9371c005aa7ffc2060b1b787e4268bc25336ca15 --- src/cluster.cpp | 91 ++++++++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 3fd513221..f6a6e03dc 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4180,57 +4180,76 @@ void clusterReplyMultiBulkSlots(client *c) { dictIterator *di = dictGetSafeIterator(g_pserver->cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = (clusterNode*)dictGetVal(de); - int j = 0, start = -1; + int start = -1; /* Skip slaves (that are iterated when producing the output of their * master) and masters not serving any slot. */ if (!nodeIsMaster(node) || node->numslots == 0) continue; + + static_assert((CLUSTER_SLOTS % (sizeof(uint32_t)*8)) == 0, "code below assumes the bitfield is a multiple of sizeof(unsinged)"); - for (j = 0; j < CLUSTER_SLOTS; j++) { - int bit, i; - - if ((bit = clusterNodeGetSlotBit(node,j)) != 0) { - if (start == -1) start = j; + for (unsigned iw = 0; iw < (CLUSTER_SLOTS/sizeof(uint32_t)/8); ++iw) + { + uint32_t wordCur = reinterpret_cast(node->slots)[iw]; + if (iw != ((CLUSTER_SLOTS/sizeof(uint32_t)/8)-1)) + { + if (start == -1 && wordCur == 0) + continue; + if (start != -1 && (wordCur+1)==0) + continue; } - if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { - int nested_elements = 3; /* slots (2) + master addr (1). */ - void *nested_replylen = addReplyDeferredLen(c); - if (bit && j == CLUSTER_SLOTS-1) j++; - - /* If slot exists in output map, add to it's list. - * else, create a new output map for this slot */ - if (start == j-1) { - addReplyLongLong(c, start); /* only one slot; low==high */ - addReplyLongLong(c, start); - } else { - addReplyLongLong(c, start); /* low */ - addReplyLongLong(c, j-1); /* high */ + unsigned ibitStartLoop = iw*sizeof(uint32_t)*8; + + for (unsigned j = ibitStartLoop; j < (iw+1)*sizeof(uint32_t)*8; j++) { + int i; + int bit = (int)(wordCur & 1); + wordCur >>= 1; + if (bit != 0) { + if (start == -1) start = j; } - start = -1; + if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) { + int nested_elements = 3; /* slots (2) + master addr (1). */ + void *nested_replylen = addReplyDeferredLen(c); - /* First node reply position is always the master */ - addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->ip); - addReplyLongLong(c, node->port); - addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + if (bit && j == CLUSTER_SLOTS-1) j++; - /* Remaining nodes in reply are replicas for slot range */ - for (i = 0; i < node->numslaves; i++) { - /* This loop is copy/pasted from clusterGenNodeDescription() - * with modifications for per-slot node aggregation */ - if (nodeFailed(node->slaves[i])) continue; + /* If slot exists in output map, add to it's list. + * else, create a new output map for this slot */ + if (start == j-1) { + addReplyLongLong(c, start); /* only one slot; low==high */ + addReplyLongLong(c, start); + } else { + addReplyLongLong(c, start); /* low */ + addReplyLongLong(c, j-1); /* high */ + } + start = -1; + + /* First node reply position is always the master */ addReplyArrayLen(c, 3); - addReplyBulkCString(c, node->slaves[i]->ip); - addReplyLongLong(c, node->slaves[i]->port); - addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN); - nested_elements++; + addReplyBulkCString(c, node->ip); + addReplyLongLong(c, node->port); + addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + + /* Remaining nodes in reply are replicas for slot range */ + for (i = 0; i < node->numslaves; i++) { + /* This loop is copy/pasted from clusterGenNodeDescription() + * with modifications for per-slot node aggregation */ + if (nodeFailed(node->slaves[i])) continue; + addReplyArrayLen(c, 3); + addReplyBulkCString(c, node->slaves[i]->ip); + addReplyLongLong(c, node->slaves[i]->port); + addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN); + nested_elements++; + } + setDeferredArrayLen(c, nested_replylen, nested_elements); + num_masters++; } - setDeferredArrayLen(c, nested_replylen, nested_elements); - num_masters++; } } + serverAssert(start == -1); } + dictReleaseIterator(di); setDeferredArrayLen(c, slot_replylen, num_masters); } From a27c1c9f0a09b614309c08c67eb238c3e4ebcb26 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 15:39:47 -0500 Subject: [PATCH 20/21] Revert "Debug sleep should apply to all threads" This reverts commit b8cc7e2b9c9223de33c1d6513decb80261998461 [formerly 41b678814b2c2ff93935b57e630028aaf2e9ae62]. Former-commit-id: 3ae75c2d2bd952d0a075b9ba257a08f962fe0739 --- src/debug.cpp | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 9854d0fd4..234f197be 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -595,19 +595,9 @@ NULL double dtime = strtod(szFromObj(c->argv[2]),NULL); long long utime = dtime*1000000; struct timespec tv; + tv.tv_sec = utime / 1000000; tv.tv_nsec = (utime % 1000000) * 1000; - - // Ensure all threads sleep - for (int iel = 0; iel < cserver.cthreads; ++iel) - { - if (iel == ielFromEventLoop(serverTL->el)) - continue; // we will sleep ourselves below - aePostFunction(g_pserver->rgthreadvar[iel].el, [tv]{ - nanosleep(&tv, NULL); - }); - } - nanosleep(&tv, NULL); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"set-active-expire") && From 72bbf16c2f36a9188e8a5a54fd878d60931d0f54 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 17 Nov 2019 16:06:49 -0500 Subject: [PATCH 21/21] Fix failure to wakeup from futex sleep due to fastlock_unlock reading the wrong offset in the asm version. Also fix false sharing in spinlock Former-commit-id: 4c8603815cf525c75dcc360fddeab9ca6fe70ae6 --- src/fastlock.cpp | 5 +++- src/fastlock.h | 10 +++++--- src/fastlock_x64.asm | 54 ++++++++++++++++++++++++-------------------- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index c74ca8358..4a64b20fc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -258,7 +258,10 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name) lock->m_depth = 0; lock->m_pidOwner = -1; lock->futex = 0; - lock->szName = name; + int cch = strlen(name); + cch = std::min(cch, sizeof(lock->szName)-1); + memcpy(lock->szName, name, cch); + lock->szName[cch] = '\0'; ANNOTATE_RWLOCK_CREATE(lock); } diff --git a/src/fastlock.h b/src/fastlock.h index 0117049a6..44c09ac17 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -1,5 +1,6 @@ #pragma once #include +#include #ifdef __cplusplus extern "C" { @@ -40,12 +41,13 @@ struct ticket struct fastlock { - volatile struct ticket m_ticket; - volatile int m_pidOwner; volatile int m_depth; + char szName[56]; + /* Volatile data on seperate cache line */ + volatile struct ticket m_ticket; unsigned futex; - const char *szName; + char padding[56]; // ensure ticket and futex are on their own independent cache line #ifdef __cplusplus fastlock(const char *name) @@ -81,3 +83,5 @@ struct fastlock bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only #endif }; + +static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct"); \ No newline at end of file diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 6c9df490e..80791a6de 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -16,10 +16,11 @@ fastlock_lock: .cfi_startproc .cfi_def_cfa rsp, 8 # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail # First get our TID and put it in ecx push rdi # we need our struct pointer (also balance the stack for the call) @@ -29,18 +30,18 @@ fastlock_lock: pop rdi # get our pointer back .cfi_adjust_cfa_offset -8 - cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LLocked # Don't spin in that case xor eax, eax # eliminate partial register dependency inc eax # we want to add one - lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition + lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition # ax now contains the ticket # OK Start the wait loop xor ecx, ecx .ALIGN 16 .LLoop: - mov edx, [rdi] + mov edx, [rdi+64] cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause @@ -72,8 +73,8 @@ fastlock_lock: jmp .LLoop # Get back in the game .ALIGN 16 .LLocked: - mov [rdi+4], esi # lock->m_pidOwner = gettid() - inc dword ptr [rdi+8] # lock->m_depth++ + mov [rdi], esi # lock->m_pidOwner = gettid() + inc dword ptr [rdi+4] # lock->m_depth++ ret .cfi_endproc @@ -82,10 +83,11 @@ fastlock_lock: .type fastlock_trylock,@function fastlock_trylock: # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail # First get our TID and put it in ecx push rdi # we need our struct pointer (also balance the stack for the call) @@ -93,29 +95,29 @@ fastlock_trylock: mov esi, eax # back it up in esi pop rdi # get our pointer back - cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LRecursive # Don't spin in that case - mov eax, [rdi] # get both active and avail counters + mov eax, [rdi+64] # get both active and avail counters mov ecx, eax # duplicate in ecx ror ecx, 16 # swap upper and lower 16-bits cmp eax, ecx # are the upper and lower 16-bits the same? jnz .LAlreadyLocked # If not return failure # at this point we know eax+ecx have [avail][active] and they are both the same - add ecx, 0x10000 # increment avail, ecx is now our wanted value - lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail) - jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing + add ecx, 0x10000 # increment avail, ecx is now our wanted value + lock cmpxchg [rdi+64], ecx # If rdi still contains the value in eax, put in ecx (inc avail) + jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing xor eax, eax - inc eax # return SUCCESS! (eax=1) - mov [rdi+4], esi # lock->m_pidOwner = gettid() - mov dword ptr [rdi+8], eax # lock->m_depth = 1 + inc eax # return SUCCESS! (eax=1) + mov [rdi], esi # lock->m_pidOwner = gettid() + mov dword ptr [rdi+4], eax # lock->m_depth = 1 ret .ALIGN 16 .LRecursive: xor eax, eax inc eax # return SUCCESS! (eax=1) - inc dword ptr [rdi+8] # lock->m_depth++ + inc dword ptr [rdi+4] # lock->m_depth++ ret .ALIGN 16 .LAlreadyLocked: @@ -126,23 +128,25 @@ fastlock_trylock: .global fastlock_unlock fastlock_unlock: # RDI points to the struct: - # uint16_t active - # uint16_t avail # int32_t m_pidOwner # int32_t m_depth + # [rdi+64] ... + # uint16_t active + # uint16_t avail push r11 - sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state + sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it - mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi] # get current active (this one) + mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore) + mov ecx, [rdi+64] # get current active (this one) inc ecx # bump it to the next thread - mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) # At this point the lock is removed, however we must wake up any pending futexs mov r9d, 1 # eax is the bitmask for 2 threads rol r9d, cl # place the mask in the right spot for the next 2 threads + add rdi, 64 # rdi now points to the token .ALIGN 16 .LRetryWake: - mov r11d, [rdi+12] # load the futex mask + mov r11d, [rdi+4] # load the futex mask and r11d, r9d # are any threads waiting on a futex? jz .LDone # if not we're done. # we have to wake the futexs