From 52402c47868f75b0b93c4d0b70f4d8766bf767b7 Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 14 Jan 2022 18:50:20 +0000 Subject: [PATCH] fork lock for all threads, use fastlock for readwritelock --- src/ae.cpp | 42 ++++++++++++++++++++++++++++++++-------- src/aelocker.h | 4 ++++ src/module.cpp | 2 +- src/networking.cpp | 2 ++ src/readwritelock.h | 47 +++++++++++++++------------------------------ src/replication.cpp | 2 ++ src/server.cpp | 16 ++++++++------- 7 files changed, 67 insertions(+), 48 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 9c1155a2f..4643b3999 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -53,6 +53,7 @@ #include "zmalloc.h" #include "config.h" #include "serverassert.h" +#include "server.h" #ifdef USE_MUTEX thread_local int cOwnLock = 0; @@ -87,6 +88,8 @@ mutex_wrapper g_lock; #else fastlock g_lock("AE (global)"); #endif +readWriteLock forkLock("Fork (global)"); +readWriteLock *g_forkLock = &forkLock; thread_local aeEventLoop *g_eventLoopThisThread = NULL; /* Include the best multiplexing layer supported by this system. @@ -154,16 +157,22 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { - if (cmd.fLock && !ulock.owns_lock()) + if (cmd.fLock && !ulock.owns_lock()) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; } case AE_ASYNC_OP::PostCppFunction: { - if (cmd.fLock && !ulock.owns_lock()) + if (cmd.fLock && !ulock.owns_lock()) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } (*cmd.pfn)(); delete cmd.pfn; @@ -547,7 +556,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) { if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { - if (!ulock.owns_lock()) ulock.lock(); + if (!ulock.owns_lock()) { + g_forkLock->releaseRead(); + ulock.lock(); + g_forkLock->acquireRead(); + } te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } @@ -567,7 +580,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) { } if (te->when <= now) { - if (!ulock.owns_lock()) ulock.lock(); + if (!ulock.owns_lock()) { + g_forkLock->releaseRead(); + ulock.lock(); + g_forkLock->acquireRead(); + } int retval; id = te->id; @@ -591,8 +608,11 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma { #define LOCK_IF_NECESSARY(fe, tsmask) \ std::unique_lock ulock(g_lock, std::defer_lock); \ - if (!(fe->mask & tsmask)) \ - ulock.lock() + if (!(fe->mask & tsmask)) { \ + g_forkLock->releaseRead(); \ + ulock.lock(); \ + g_forkLock->acquireRead(); \ + } int fired = 0; /* Number of events fired for current fd. */ @@ -704,8 +724,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) + if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } eventLoop->beforesleep(eventLoop); } @@ -716,8 +739,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) + if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } eventLoop->aftersleep(eventLoop); } diff --git a/src/aelocker.h b/src/aelocker.h index 75a3cef53..dda2b3142 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -34,7 +34,9 @@ public: clientNesting = c->lock.unlock_recursive(); fOwnClientLock = false; } + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive { aeReleaseLock(); @@ -52,7 +54,9 @@ public: else if (!m_fArmed) { m_fArmed = true; + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); } } diff --git a/src/module.cpp b/src/module.cpp index 6f4315c32..cf3c81c1a 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -365,7 +365,7 @@ typedef struct RedisModuleCommandFilter { static list *moduleCommandFilters; /* Module GIL Variables */ -static readWriteLock s_moduleGIL; +static readWriteLock s_moduleGIL("Module GIL"); thread_local bool g_fModuleThread = false; typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); diff --git a/src/networking.cpp b/src/networking.cpp index a7920b692..73fd4332c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1359,7 +1359,9 @@ void acceptOnThread(connection *conn, int flags, char *cip) } rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); acceptCommonHandler(conn,flags,cip,ielCur); aeReleaseLock(); } diff --git a/src/readwritelock.h b/src/readwritelock.h index d03e1c82b..a7318a29f 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -2,22 +2,24 @@ #include class readWriteLock { - std::mutex m_readLock; - std::recursive_mutex m_writeLock; - std::condition_variable m_cv; + fastlock m_readLock; + fastlock m_writeLock; + std::condition_variable_any m_cv; int m_readCount = 0; int m_writeCount = 0; bool m_writeWaiting = false; public: + readWriteLock(const char *name) : m_readLock(name), m_writeLock(name) {} + void acquireRead() { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); while (m_writeCount > 0 || m_writeWaiting) m_cv.wait(rm); m_readCount++; } bool tryAcquireRead() { - std::unique_lock rm(m_readLock, std::defer_lock); + std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; if (m_writeCount > 0 || m_writeWaiting) @@ -27,7 +29,7 @@ public: } void acquireWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); m_writeWaiting = true; while (m_readCount > 0) m_cv.wait(rm); @@ -43,24 +45,12 @@ public: } void upgradeWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); - m_writeWaiting = true; - while (m_readCount > 1) - m_cv.wait(rm); - if (exclusive) { - /* Another thread might have the write lock while we have the read lock - but won't be able to release it until they can acquire the read lock - so release the read lock and try again instead of waiting to avoid deadlock */ - while(!m_writeLock.try_lock()) - m_cv.wait(rm); - } - m_writeCount++; - m_readCount--; - m_writeWaiting = false; + releaseRead(); + acquireWrite(exclusive); } bool tryAcquireWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock, std::defer_lock); + std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; if (m_readCount > 0) @@ -73,14 +63,13 @@ public: } void releaseRead() { - std::unique_lock rm(m_readLock); - serverAssert(m_readCount > 0); + std::unique_lock rm(m_readLock); m_readCount--; m_cv.notify_all(); } void releaseWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); serverAssert(m_writeCount > 0); if (exclusive) m_writeLock.unlock(); @@ -89,14 +78,8 @@ public: } void downgradeWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); - serverAssert(m_writeCount > 0); - if (exclusive) - m_writeLock.unlock(); - m_writeCount--; - while (m_writeCount > 0 || m_writeWaiting) - m_cv.wait(rm); - m_readCount++; + releaseWrite(exclusive); + acquireRead(); } bool hasReader() { diff --git a/src/replication.cpp b/src/replication.cpp index 74e77c307..8347b8228 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -5334,7 +5334,9 @@ struct RemoteMasterState ~RemoteMasterState() { + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); freeClient(cFake); aeReleaseLock(); } diff --git a/src/server.cpp b/src/server.cpp index 99cbec3f0..1e6052417 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -92,10 +92,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /* Global vars */ namespace GlobalHidden { struct redisServer server; /* Server global state */ -readWriteLock forkLock; } redisServer *g_pserver = &GlobalHidden::server; -readWriteLock *g_forkLock = &GlobalHidden::forkLock; struct redisServerConst cserver; thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars std::mutex time_thread_mutex; @@ -2975,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { g_pserver->garbageCollector.endEpoch(epoch); }, true /*fHiPri*/); } - + g_forkLock->releaseRead(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -2995,8 +2993,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { Don't check here that modules are enabled, rather use the result from beforeSleep Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { - wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); + g_forkLock->acquireRead(); + wakeTimeThread(); serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); @@ -6852,7 +6851,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } long long startWriteLock = ustime(); - g_forkLock->acquireWrite(); + g_forkLock->upgradeWrite(); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ @@ -6862,7 +6861,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - g_forkLock->releaseWrite(); + g_forkLock->downgradeWrite(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -7261,6 +7260,7 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run + g_forkLock->acquireRead(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { @@ -7269,6 +7269,7 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } + g_forkLock->releaseRead(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); @@ -7418,7 +7419,7 @@ int main(int argc, char **argv) { initServerConfig(); serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; aeAcquireLock(); // We own the lock on boot - + g_forkLock->acquireRead(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); @@ -7652,6 +7653,7 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(g_pserver->server_cpulist); + g_forkLock->releaseRead(); aeReleaseLock(); //Finally we can dump the lock moduleReleaseGIL(true);