diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 71a49a1e8..75a0f8381 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef __linux__ #include #endif @@ -125,6 +126,60 @@ #endif +#pragma weak _serverPanic +extern "C" void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...) +{ + *((char*)-1) = 'x'; +} + +class DeadlockDetector +{ + std::map m_mapwait; + fastlock m_lock; +public: + void registerwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock) + return; + fastlock_lock(&m_lock); + m_mapwait.insert(std::make_pair(thispid, lock)); + + // Detect cycles + pid_t pidCheck = thispid; + for (;;) + { + auto itr = m_mapwait.find(pidCheck); + if (itr == m_mapwait.end()) + break; + pidCheck = itr->second->m_pidOwner; + if (pidCheck == thispid) + _serverPanic(__FILE__, __LINE__, "Deadlock detected"); + } + fastlock_unlock(&m_lock); + } + + void clearwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock) + return; + fastlock_lock(&m_lock); + m_mapwait.erase(thispid); + fastlock_unlock(&m_lock); + } +}; + +DeadlockDetector g_dlock; + +extern "C" void registerwait(fastlock *lock, pid_t thispid) +{ + g_dlock.registerwait(lock, thispid); +} + +extern "C" void clearwait(fastlock *lock, pid_t thispid) +{ + g_dlock.clearwait(lock, thispid); +} + static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; @@ -184,34 +239,41 @@ extern "C" void fastlock_lock(struct fastlock *lock) return; } + int tid = gettid(); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); #ifdef __linux__ unsigned mask = (1U << (myticket % 32)); #endif int cloops = 0; ticket ticketT; - for (;;) + + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) != myticket) { - __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); - if ((ticketT.u & 0xffff) == myticket) - break; + registerwait(lock, tid); + for (;;) + { + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) == myticket) + break; #if defined(__i386__) || defined(__amd64__) - __asm__ ("pause"); + __asm__ ("pause"); #endif - if ((++cloops % 1024*1024) == 0) - { + if ((++cloops % 1024*1024) == 0) + { #ifdef __linux__ - __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); - futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); - __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); + __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); #endif - __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); + } } + clearwait(lock, tid); } lock->m_depth = 1; - int tid = gettid(); __atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE); ANNOTATE_RWLOCK_ACQUIRED(lock, true); std::atomic_thread_fence(std::memory_order_acquire); diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index baf33654f..c054e6bc6 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -3,6 +3,8 @@ section .text extern gettid extern sched_yield extern g_longwaits +extern registerwait +extern clearwait ; This is the first use of assembly in this codebase, a valid question is WHY? ; The spinlock we implement here is performance critical, and simply put GCC @@ -31,11 +33,24 @@ fastlock_lock: inc eax ; we want to add one lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition ; ax now contains the ticket + mov edx, [rdi] + cmp dx, ax ; is our ticket up? + je .LLocked ; no need to loop + ; Lock is contended, so inform the deadlock detector + push rax + push rdi + push rsi + call registerwait + pop rsi + pop rdi + pop rax + ; OK Start the wait loop + xor ecx, ecx ALIGN 16 .LLoop: mov edx, [rdi] cmp dx, ax ; is our ticket up? - je .LLocked ; leave the loop + je .LExitLoop ; leave the loop pause add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) ; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) @@ -69,6 +84,13 @@ ALIGN 16 xor ecx, ecx ; Reset our loop counter jmp .LLoop ; Get back in the game ALIGN 16 +.LExitLoop: + push rsi + push rdi + call clearwait + pop rdi + pop rsi +ALIGN 16 .LLocked: mov [rdi+4], esi ; lock->m_pidOwner = gettid() inc dword [rdi+8] ; lock->m_depth++