Merge branch 'unstable' into RELEASE_5

Former-commit-id: f8220a658e0cf569b94ee5c5fbfeac474c1ef803
This commit is contained in:
John Sully 2019-10-24 23:39:18 -04:00
commit 3eae7183d3
15 changed files with 284 additions and 178 deletions

View File

@ -47,8 +47,6 @@ endif
USEASM?=true USEASM?=true
ifneq ($(SANITIZE),) ifneq ($(SANITIZE),)
CC=clang
CXX=clang++
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
LDFLAGS+= -fsanitize=$(SANITIZE) LDFLAGS+= -fsanitize=$(SANITIZE)
@ -190,7 +188,7 @@ endif
REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS) REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS)
REDIS_NASM=$(QUIET_CC)nasm -felf64 KEYDB_AS=$(QUIET_CC) as --64 -g
REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS) REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS)
REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
@ -295,7 +293,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c
$(REDIS_CXX) -c $< $(REDIS_CXX) -c $<
%.o: %.asm .make-prerequisites %.o: %.asm .make-prerequisites
$(REDIS_NASM) $< $(KEYDB_AS) $< -o $@
clean: clean:
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark

View File

@ -80,7 +80,7 @@ public:
mutex_wrapper g_lock; mutex_wrapper g_lock;
#else #else
fastlock g_lock; fastlock g_lock("AE (global)");
#endif #endif
thread_local aeEventLoop *g_eventLoopThisThread = NULL; thread_local aeEventLoop *g_eventLoopThisThread = NULL;
@ -327,7 +327,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
for (i = 0; i < setsize; i++) for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE; eventLoop->events[i].mask = AE_NONE;
fastlock_init(&eventLoop->flock); fastlock_init(&eventLoop->flock, "event loop");
int rgfd[2]; int rgfd[2];
if (pipe(rgfd) < 0) if (pipe(rgfd) < 0)
goto err; goto err;

View File

@ -678,7 +678,7 @@ client *createFakeClient(void) {
c->puser = NULL; c->puser = NULL;
listSetFreeMethod(c->reply,freeClientReplyValue); listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
fastlock_init(&c->lock); fastlock_init(&c->lock, "fake client");
fastlock_lock(&c->lock); fastlock_lock(&c->lock);
initClientMultiState(c); initClientMultiState(c);
return c; return c;

View File

@ -643,10 +643,6 @@ void keysCommand(client *c) {
unsigned long numkeys = 0; unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c); void *replylen = addReplyDeferredLen(c);
#ifdef MULTITHREADED_KEYS
aeReleaseLock();
#endif
di = dictGetSafeIterator(c->db->pdict); di = dictGetSafeIterator(c->db->pdict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0'); allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
@ -664,14 +660,6 @@ void keysCommand(client *c) {
} }
dictReleaseIterator(di); dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,numkeys); setDeferredArrayLen(c,replylen,numkeys);
#ifdef MULTITHREADED_KEYS
fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks
AeLocker lock;
lock.arm(c);
fastlock_lock(&c->db->lock); // we still need the DB lock
lock.release();
#endif
} }
/* This callback is used by scanGenericCommand in order to collect elements /* This callback is used by scanGenericCommand in order to collect elements

View File

@ -55,7 +55,7 @@ typedef ucontext_t sigcontext_t;
#endif #endif
#endif #endif
bool g_fInCrash = false; int g_fInCrash = false;
/* ================================= Debugging ============================== */ /* ================================= Debugging ============================== */

View File

@ -36,10 +36,13 @@
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <limits.h> #include <limits.h>
#include <map>
#ifdef __linux__ #ifdef __linux__
#include <linux/futex.h> #include <linux/futex.h>
#endif #endif
#include <string.h> #include <string.h>
#include <stdarg.h>
#include <stdio.h>
#ifdef __APPLE__ #ifdef __APPLE__
#include <TargetConditionals.h> #include <TargetConditionals.h>
@ -57,6 +60,7 @@
#define UNUSED(x) ((void)x) #define UNUSED(x) ((void)x)
#endif #endif
extern int g_fInCrash;
/**************************************************** /****************************************************
* *
@ -125,6 +129,80 @@
#endif #endif
#pragma weak _serverPanic
extern "C" __attribute__((weak)) void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...)
{
*((char*)-1) = 'x';
}
#pragma weak serverLog
__attribute__((weak)) void serverLog(int , const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
vprintf(fmt, args);
va_end(args);
printf("\n");
}
class DeadlockDetector
{
std::map<pid_t, fastlock *> m_mapwait;
fastlock m_lock { "deadlock detector" };
public:
void registerwait(fastlock *lock, pid_t thispid)
{
if (lock == &m_lock || g_fInCrash)
return;
fastlock_lock(&m_lock);
m_mapwait.insert(std::make_pair(thispid, lock));
// Detect cycles
pid_t pidCheck = thispid;
size_t cchecks = 0;
for (;;)
{
auto itr = m_mapwait.find(pidCheck);
if (itr == m_mapwait.end())
break;
pidCheck = itr->second->m_pidOwner;
if (pidCheck == thispid)
{
// Deadlock detected, printout some debugging info and crash
serverLog(3 /*LL_WARNING*/, "\n\n");
serverLog(3 /*LL_WARNING*/, "!!! ERROR: Deadlock detected !!!");
pidCheck = thispid;
for (;;)
{
auto itr = m_mapwait.find(pidCheck);
serverLog(3 /* LL_WARNING */, "\t%d: (%p) %s", pidCheck, itr->second, itr->second->szName);
pidCheck = itr->second->m_pidOwner;
if (pidCheck == thispid)
break;
}
serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!");
_serverPanic(__FILE__, __LINE__, "Deadlock detected");
}
if (cchecks > m_mapwait.size())
break; // There is a cycle but we're not in it
++cchecks;
}
fastlock_unlock(&m_lock);
}
void clearwait(fastlock *lock, pid_t thispid)
{
if (lock == &m_lock || g_fInCrash)
return;
fastlock_lock(&m_lock);
m_mapwait.erase(thispid);
fastlock_unlock(&m_lock);
}
};
DeadlockDetector g_dlock;
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
uint64_t g_longwaits = 0; uint64_t g_longwaits = 0;
@ -135,7 +213,6 @@ uint64_t fastlock_getlongwaitcount()
return rval; return rval;
} }
#ifndef ASM_SPINLOCK
#ifdef __linux__ #ifdef __linux__
static int futex(volatile unsigned *uaddr, int futex_op, int val, static int futex(volatile unsigned *uaddr, int futex_op, int val,
const struct timespec *timeout, int val3) const struct timespec *timeout, int val3)
@ -144,7 +221,6 @@ static int futex(volatile unsigned *uaddr, int futex_op, int val,
timeout, uaddr, val3); timeout, uaddr, val3);
} }
#endif #endif
#endif
extern "C" pid_t gettid() extern "C" pid_t gettid()
{ {
@ -163,13 +239,26 @@ extern "C" pid_t gettid()
return pidCache; return pidCache;
} }
extern "C" void fastlock_init(struct fastlock *lock) extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask)
{
#ifdef __linux__
g_dlock.registerwait(lock, pid);
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask);
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
g_dlock.clearwait(lock, pid);
#endif
__atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
}
extern "C" void fastlock_init(struct fastlock *lock, const char *name)
{ {
lock->m_ticket.m_active = 0; lock->m_ticket.m_active = 0;
lock->m_ticket.m_avail = 0; lock->m_ticket.m_avail = 0;
lock->m_depth = 0; lock->m_depth = 0;
lock->m_pidOwner = -1; lock->m_pidOwner = -1;
lock->futex = 0; lock->futex = 0;
lock->szName = name;
ANNOTATE_RWLOCK_CREATE(lock); ANNOTATE_RWLOCK_CREATE(lock);
} }
@ -184,12 +273,12 @@ extern "C" void fastlock_lock(struct fastlock *lock)
return; return;
} }
int tid = gettid();
unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
#ifdef __linux__
unsigned mask = (1U << (myticket % 32)); unsigned mask = (1U << (myticket % 32));
#endif
int cloops = 0; int cloops = 0;
ticket ticketT; ticket ticketT;
for (;;) for (;;)
{ {
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
@ -201,17 +290,11 @@ extern "C" void fastlock_lock(struct fastlock *lock)
#endif #endif
if ((++cloops % 1024*1024) == 0) if ((++cloops % 1024*1024) == 0)
{ {
#ifdef __linux__ fastlock_sleep(lock, tid, ticketT.u, mask);
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask);
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
#endif
__atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
} }
} }
lock->m_depth = 1; lock->m_depth = 1;
int tid = gettid();
__atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE); __atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE);
ANNOTATE_RWLOCK_ACQUIRED(lock, true); ANNOTATE_RWLOCK_ACQUIRED(lock, true);
std::atomic_thread_fence(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_acquire);

View File

@ -7,7 +7,7 @@ extern "C" {
/* Begin C API */ /* Begin C API */
struct fastlock; struct fastlock;
void fastlock_init(struct fastlock *lock); void fastlock_init(struct fastlock *lock, const char *name);
void fastlock_lock(struct fastlock *lock); void fastlock_lock(struct fastlock *lock);
int fastlock_trylock(struct fastlock *lock, int fWeak); int fastlock_trylock(struct fastlock *lock, int fWeak);
void fastlock_unlock(struct fastlock *lock); void fastlock_unlock(struct fastlock *lock);
@ -45,24 +45,25 @@ struct fastlock
volatile int m_pidOwner; volatile int m_pidOwner;
volatile int m_depth; volatile int m_depth;
unsigned futex; unsigned futex;
const char *szName;
#ifdef __cplusplus #ifdef __cplusplus
fastlock() fastlock(const char *name)
{ {
fastlock_init(this); fastlock_init(this, name);
} }
void lock() inline void lock()
{ {
fastlock_lock(this); fastlock_lock(this);
} }
bool try_lock(bool fWeak = false) inline bool try_lock(bool fWeak = false)
{ {
return !!fastlock_trylock(this, fWeak); return !!fastlock_trylock(this, fWeak);
} }
void unlock() inline void unlock()
{ {
fastlock_unlock(this); fastlock_unlock(this);
} }

View File

@ -1,156 +1,160 @@
section .text .intel_syntax noprefix
.text
extern gettid .extern gettid
extern sched_yield .extern fastlock_sleep
extern g_longwaits
; This is the first use of assembly in this codebase, a valid question is WHY? # This is the first use of assembly in this codebase, a valid question is WHY?
; The spinlock we implement here is performance critical, and simply put GCC # The spinlock we implement here is performance critical, and simply put GCC
; emits awful code. The original C code is left in fastlock.cpp for reference # emits awful code. The original C code is left in fastlock.cpp for reference
; and x-plat. # and x-plat.
ALIGN 16 .ALIGN 16
global fastlock_lock .global fastlock_lock
.type fastlock_lock,@function
fastlock_lock: fastlock_lock:
; RDI points to the struct: .cfi_startproc
; uint16_t active .cfi_def_cfa rsp, 8
; uint16_t avail # RDI points to the struct:
; int32_t m_pidOwner # uint16_t active
; int32_t m_depth # uint16_t avail
# int32_t m_pidOwner
# int32_t m_depth
; First get our TID and put it in ecx # First get our TID and put it in ecx
push rdi ; we need our struct pointer (also balance the stack for the call) push rdi # we need our struct pointer (also balance the stack for the call)
call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) .cfi_adjust_cfa_offset 8
mov esi, eax ; back it up in esi call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
pop rdi ; get our pointer back mov esi, eax # back it up in esi
pop rdi # get our pointer back
.cfi_adjust_cfa_offset -8
cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? cmp [rdi+4], esi # Is the TID we got back the owner of the lock?
je .LLocked ; Don't spin in that case je .LLocked # Don't spin in that case
xor eax, eax ; eliminate partial register dependency xor eax, eax # eliminate partial register dependency
inc eax ; we want to add one inc eax # we want to add one
lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition
; ax now contains the ticket # ax now contains the ticket
ALIGN 16 # OK Start the wait loop
xor ecx, ecx
.ALIGN 16
.LLoop: .LLoop:
mov edx, [rdi] mov edx, [rdi]
cmp dx, ax ; is our ticket up? cmp dx, ax # is our ticket up?
je .LLocked ; leave the loop je .LLocked # leave the loop
pause pause
add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have)
; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
jnc .LLoop ; If so, give up our timeslice to someone who's doing real work jnc .LLoop # If so, give up our timeslice to someone who's doing real work
; Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"
; But the compiler doesn't know that we rarely hit this, and when we do we know the lock is # But the compiler doesn't know that we rarely hit this, and when we do we know the lock is
; taking a long time to be released anyways. We optimize for the common case of short # taking a long time to be released anyways. We optimize for the common case of short
; lock intervals. That's why we're using a spinlock in the first place # lock intervals. That's why we're using a spinlock in the first place
; If we get here we're going to sleep in the kernel with a futex # If we get here we're going to sleep in the kernel with a futex
push rdi
push rsi push rsi
push rax push rax
; Setup the syscall args .cfi_adjust_cfa_offset 24
; rdi ARG1 futex (already in rdi) # Setup the syscall args
mov esi, (9 | 128) ; rsi ARG2 FUTEX_WAIT_BITSET_PRIVATE
; rdx ARG3 ticketT.u (already in edx) # rdi ARG1 futex (already in rdi)
xor r10d, r10d ; r10 ARG4 NULL # rsi ARG2 tid (already in esi)
mov r8, rdi ; r8 ARG5 dup rdi # rdx ARG3 ticketT.u (already in edx)
xor r9d, r9d bts ecx, eax # rcx ARG4 mask
bts r9d, eax ; r9 ARG6 mask call fastlock_sleep
mov eax, 202 ; sys_futex # cleanup and continue
; Do the syscall
lock or [rdi+12], r9d ; inform the unlocking thread we're waiting
syscall ; wait for the futex
not r9d ; convert our flag into a mask of bits not to touch
lock and [rdi+12], r9d ; clear the flag in the futex control mask
; cleanup and continue
mov rcx, g_longwaits
inc qword [rcx] ; increment our long wait counter
pop rax pop rax
pop rsi pop rsi
xor ecx, ecx ; Reset our loop counter pop rdi
jmp .LLoop ; Get back in the game .cfi_adjust_cfa_offset -24
ALIGN 16 xor ecx, ecx # Reset our loop counter
jmp .LLoop # Get back in the game
.ALIGN 16
.LLocked: .LLocked:
mov [rdi+4], esi ; lock->m_pidOwner = gettid() mov [rdi+4], esi # lock->m_pidOwner = gettid()
inc dword [rdi+8] ; lock->m_depth++ inc dword ptr [rdi+8] # lock->m_depth++
ret ret
.cfi_endproc
ALIGN 16 .ALIGN 16
global fastlock_trylock .global fastlock_trylock
.type fastlock_trylock,@function
fastlock_trylock: fastlock_trylock:
; RDI points to the struct: # RDI points to the struct:
; uint16_t active # uint16_t active
; uint16_t avail # uint16_t avail
; int32_t m_pidOwner # int32_t m_pidOwner
; int32_t m_depth # int32_t m_depth
; First get our TID and put it in ecx # First get our TID and put it in ecx
push rdi ; we need our struct pointer (also balance the stack for the call) push rdi # we need our struct pointer (also balance the stack for the call)
call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
mov esi, eax ; back it up in esi mov esi, eax # back it up in esi
pop rdi ; get our pointer back pop rdi # get our pointer back
cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? cmp [rdi+4], esi # Is the TID we got back the owner of the lock?
je .LRecursive ; Don't spin in that case je .LRecursive # Don't spin in that case
mov eax, [rdi] ; get both active and avail counters mov eax, [rdi] # get both active and avail counters
mov ecx, eax ; duplicate in ecx mov ecx, eax # duplicate in ecx
ror ecx, 16 ; swap upper and lower 16-bits ror ecx, 16 # swap upper and lower 16-bits
cmp eax, ecx ; are the upper and lower 16-bits the same? cmp eax, ecx # are the upper and lower 16-bits the same?
jnz .LAlreadyLocked ; If not return failure jnz .LAlreadyLocked # If not return failure
; at this point we know eax+ecx have [avail][active] and they are both the same # at this point we know eax+ecx have [avail][active] and they are both the same
add ecx, 10000h ; increment avail, ecx is now our wanted value add ecx, 0x10000 # increment avail, ecx is now our wanted value
lock cmpxchg [rdi], ecx ; If rdi still contains the value in eax, put in ecx (inc avail) lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail)
jnz .LAlreadyLocked ; If Z is not set then someone locked it while we were preparing jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing
xor eax, eax xor eax, eax
inc eax ; return SUCCESS! (eax=1) inc eax # return SUCCESS! (eax=1)
mov [rdi+4], esi ; lock->m_pidOwner = gettid() mov [rdi+4], esi # lock->m_pidOwner = gettid()
mov dword [rdi+8], eax ; lock->m_depth = 1 mov dword ptr [rdi+8], eax # lock->m_depth = 1
ret ret
ALIGN 16 .ALIGN 16
.LRecursive: .LRecursive:
xor eax, eax xor eax, eax
inc eax ; return SUCCESS! (eax=1) inc eax # return SUCCESS! (eax=1)
inc dword [rdi+8] ; lock->m_depth++ inc dword ptr [rdi+8] # lock->m_depth++
ret ret
ALIGN 16 .ALIGN 16
.LAlreadyLocked: .LAlreadyLocked:
xor eax, eax ; return 0; xor eax, eax # return 0
ret ret
ALIGN 16 .ALIGN 16
global fastlock_unlock .global fastlock_unlock
fastlock_unlock: fastlock_unlock:
; RDI points to the struct: # RDI points to the struct:
; uint16_t active # uint16_t active
; uint16_t avail # uint16_t avail
; int32_t m_pidOwner # int32_t m_pidOwner
; int32_t m_depth # int32_t m_depth
push r11 push r11
sub dword [rdi+8], 1 ; decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
jnz .LDone ; if depth is non-zero this is a recursive unlock, and we still hold it jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
mov dword [rdi+4], -1 ; pidOwner = -1 (we don't own it anymore) mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore)
mov ecx, [rdi] ; get current active (this one) mov ecx, [rdi] # get current active (this one)
inc ecx ; bump it to the next thread inc ecx # bump it to the next thread
mov [rdi], cx ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable) mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
; At this point the lock is removed, however we must wake up any pending futexs # At this point the lock is removed, however we must wake up any pending futexs
mov r9d, 1 ; eax is the bitmask for 2 threads mov r9d, 1 # eax is the bitmask for 2 threads
rol r9d, cl ; place the mask in the right spot for the next 2 threads rol r9d, cl # place the mask in the right spot for the next 2 threads
ALIGN 16 .ALIGN 16
.LRetryWake: .LRetryWake:
mov r11d, [rdi+12] ; load the futex mask mov r11d, [rdi+12] # load the futex mask
and r11d, r9d ; are any threads waiting on a futex? and r11d, r9d # are any threads waiting on a futex?
jz .LDone ; if not we're done. jz .LDone # if not we're done.
; we have to wake the futexs # we have to wake the futexs
; rdi ARG1 futex (already in rdi) # rdi ARG1 futex (already in rdi)
mov esi, (10 | 128) ; rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE
mov edx, 0x7fffffff ; rdx ARG3 INT_MAX (number of threads to wake) mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake)
xor r10d, r10d ; r10 ARG4 NULL xor r10d, r10d # r10 ARG4 NULL
mov r8, rdi ; r8 ARG5 dup rdi mov r8, rdi # r8 ARG5 dup rdi
; r9 ARG6 mask (already set above) # r9 ARG6 mask (already set above)
mov eax, 202 ; sys_futex mov eax, 202 # sys_futex
syscall syscall
cmp eax, 1 ; did we wake as many as we expected? cmp eax, 1 # did we wake as many as we expected?
jnz .LRetryWake jnz .LRetryWake
.LDone: .LDone:
pop r11 pop r11

View File

@ -116,7 +116,7 @@ client *createClient(int fd, int iel) {
uint64_t client_id; uint64_t client_id;
client_id = g_pserver->next_client_id.fetch_add(1); client_id = g_pserver->next_client_id.fetch_add(1);
c->iel = iel; c->iel = iel;
fastlock_init(&c->lock); fastlock_init(&c->lock, "client");
c->id = client_id; c->id = client_id;
c->resp = 2; c->resp = 2;
c->fd = fd; c->fd = fd;
@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) {
int prepareClientToWrite(client *c, bool fAsync) { int prepareClientToWrite(client *c, bool fAsync) {
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
serverAssert(FCorrectThread(c) || fAsync); serverAssert(FCorrectThread(c) || fAsync);
if (FCorrectThread(c)) {
serverAssert(c->fd <= 0 || c->lock.fOwnLock()); serverAssert(c->fd <= 0 || c->lock.fOwnLock());
} else {
serverAssert(GlobalLocksAcquired());
}
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
// do not install a write handler // do not install a write handler
@ -1509,7 +1513,6 @@ int writeToClient(int fd, client *c, int handler_installed) {
} else { } else {
serverLog(LL_VERBOSE, serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno)); "Error writing to client: %s", strerror(errno));
lock.unlock();
freeClientAsync(c); freeClientAsync(c);
return C_ERR; return C_ERR;
@ -1528,7 +1531,6 @@ int writeToClient(int fd, client *c, int handler_installed) {
/* Close connection after entire reply has been sent. */ /* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
lock.unlock();
freeClientAsync(c); freeClientAsync(c);
return C_ERR; return C_ERR;
} }
@ -3000,6 +3002,12 @@ int processEventsWhileBlocked(int iel) {
int iterations = 4; /* See the function top-comment. */ int iterations = 4; /* See the function top-comment. */
int count = 0; int count = 0;
client *c = serverTL->current_client;
if (c != nullptr)
{
serverAssert(c->flags & CLIENT_PROTECTED);
c->lock.unlock();
}
aeReleaseLock(); aeReleaseLock();
while (iterations--) { while (iterations--) {
int events = 0; int events = 0;
@ -3008,7 +3016,11 @@ int processEventsWhileBlocked(int iel) {
if (!events) break; if (!events) break;
count += events; count += events;
} }
aeAcquireLock(); AeLocker locker;
if (c != nullptr)
c->lock.lock();
locker.arm(c);
locker.release();
return count; return count;
} }

View File

@ -173,6 +173,8 @@ typedef struct redisConfig {
sds appendonly; sds appendonly;
} redisConfig; } redisConfig;
int g_fInCrash = false;
/* Prototypes */ /* Prototypes */
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
static void createMissingClients(client c); static void createMissingClients(client c);

View File

@ -90,6 +90,8 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette; int *spectrum_palette;
int spectrum_palette_size; int spectrum_palette_size;
int g_fInCrash = 0;
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Utility functions * Utility functions
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */

View File

@ -384,7 +384,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Don't feed slaves that are still waiting for BGSAVE to start */ /* Don't feed slaves that are still waiting for BGSAVE to start */
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
std::unique_lock<decltype(replica->lock)> lock(replica->lock); if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<decltype(replica->lock)> lock(replica->lock, std::defer_lock);
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
if (FCorrectThread(replica))
lock.lock();
if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
{ {
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
@ -433,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *replica = (client*)ln->value; client *replica = (client*)ln->value;
std::lock_guard<decltype(replica->lock)> ulock(replica->lock); std::unique_lock<decltype(replica->lock)> ulock(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ulock.lock();
if (FMasterHost(replica)) if (FMasterHost(replica))
continue; // Active Active case, don't feed back continue; // Active Active case, don't feed back
@ -482,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
listRewind(monitors,&li); listRewind(monitors,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *monitor = (client*)ln->value; client *monitor = (client*)ln->value;
std::lock_guard<decltype(monitor->lock)> lock(monitor->lock); std::unique_lock<decltype(monitor->lock)> lock(monitor->lock, std::defer_lock);
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
if (FCorrectThread(c))
lock.lock();
addReplyAsync(monitor,cmdobj); addReplyAsync(monitor,cmdobj);
} }
decrRefCount(cmdobj); decrRefCount(cmdobj);
@ -1205,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
} }
else else
{ {
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{ aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
// Because the client could have been closed while the lambda waited to run we need to
// verify the replica is still connected
listIter li;
listNode *ln;
listRewind(g_pserver->slaves,&li);
bool fFound = false;
while ((ln = listNext(&li))) {
if (listNodeValue(ln) == replica) {
fFound = true;
break;
}
}
if (!fFound)
return;
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
freeClient(replica); freeClient(replica);

View File

@ -2878,7 +2878,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
exit(1); exit(1);
} }
fastlock_init(&pvar->lockPendingWrite); fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite");
if (!fMain) if (!fMain)
{ {
@ -2925,8 +2925,6 @@ void initServer(void) {
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
setupSignalHandlers(); setupSignalHandlers();
fastlock_init(&g_pserver->flock);
g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL);
/* Create the Redis databases, and initialize other internal state. */ /* Create the Redis databases, and initialize other internal state. */
@ -3706,7 +3704,6 @@ int processCommand(client *c, int callFlags) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
std::unique_lock<decltype(c->db->lock)> ulock(c->db->lock);
call(c,callFlags); call(c,callFlags);
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
if (listLength(g_pserver->ready_keys)) if (listLength(g_pserver->ready_keys))

View File

@ -1057,8 +1057,6 @@ typedef struct redisDb {
long long last_expire_set; /* when the last expire was set */ long long last_expire_set; /* when the last expire was set */
double avg_ttl; /* Average TTL, just for stats */ double avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
fastlock lock;
} redisDb; } redisDb;
/* Client MULTI/EXEC state */ /* Client MULTI/EXEC state */
@ -1437,7 +1435,7 @@ struct redisServerThreadVars {
client blocked on a module command needs client blocked on a module command needs
to be processed. */ to be processed. */
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
struct fastlock lockPendingWrite; struct fastlock lockPendingWrite { "thread pending write" };
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
}; };
@ -1819,8 +1817,6 @@ struct redisServer {
int fActiveReplica; /* Can this replica also be a master? */ int fActiveReplica; /* Can this replica also be a master? */
struct fastlock flock;
// Format: // Format:
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Lower 20 bits: a counter incrementing for each command executed in the same millisecond
// Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition
@ -2799,7 +2795,7 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len);
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
int moduleGILAcquiredByModule(void); int moduleGILAcquiredByModule(void);
extern bool g_fInCrash; extern int g_fInCrash;
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
{ {
return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash; return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash;

View File

@ -6,7 +6,7 @@ if {$system_name eq {linux} || $system_name eq {darwin}} {
test "Server is able to generate a stack trace on selected systems" { test "Server is able to generate a stack trace on selected systems" {
r config set watchdog-period 200 r config set watchdog-period 200
r debug sleep 1 r debug sleep 1
set pattern "*debugCommand*" set pattern "*watchdogSignalHandler*"
set retry 10 set retry 10
while {$retry} { while {$retry} {
set result [exec tail -100 < [srv 0 stdout]] set result [exec tail -100 < [srv 0 stdout]]