diff --git a/src/ae.cpp b/src/ae.cpp index 4643b3999..7c4af1652 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -88,8 +88,7 @@ mutex_wrapper g_lock; #else fastlock g_lock("AE (global)"); #endif -readWriteLock forkLock("Fork (global)"); -readWriteLock *g_forkLock = &forkLock; +readWriteLock g_forkLock("Fork (global)"); thread_local aeEventLoop *g_eventLoopThisThread = NULL; /* Include the best multiplexing layer supported by this system. @@ -158,9 +157,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { if (cmd.fLock && !ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; @@ -169,9 +168,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { if (cmd.fLock && !ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } (*cmd.pfn)(); @@ -557,9 +556,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { te->next->prev = te->prev; if (te->finalizerProc) { if (!ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); @@ -581,9 +580,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { if (te->when <= now) { if (!ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } int retval; @@ -609,9 +608,9 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma #define LOCK_IF_NECESSARY(fe, tsmask) \ std::unique_lock ulock(g_lock, std::defer_lock); \ if (!(fe->mask & tsmask)) { \ - g_forkLock->releaseRead(); \ + g_forkLock.releaseRead(); \ ulock.lock(); \ - g_forkLock->acquireRead(); \ + g_forkLock.acquireRead(); \ } int fired = 0; /* Number of events fired for current fd. */ @@ -725,9 +724,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } eventLoop->beforesleep(eventLoop); } @@ -740,9 +739,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } eventLoop->aftersleep(eventLoop); } @@ -818,9 +817,21 @@ void setAeLockSetThreadSpinWorker(spin_worker worker) tl_worker = worker; } +void aeThreadOnline() +{ + g_forkLock.acquireRead(); +} + void aeAcquireLock() { + g_forkLock.releaseRead(); g_lock.lock(tl_worker); + g_forkLock.acquireRead(); +} + +void aeAcquireForkLock() +{ + g_forkLock.upgradeWrite(); } int aeTryAcquireLock(int fWeak) @@ -828,6 +839,11 @@ int aeTryAcquireLock(int fWeak) return g_lock.try_lock(!!fWeak); } +void aeThreadOffline() +{ + g_forkLock.releaseRead(); +} + void aeReleaseLock() { g_lock.unlock(); @@ -838,6 +854,11 @@ void aeSetThreadOwnsLockOverride(int fOverride) fOwnLockOverride = fOverride; } +void aeReleaseForkLock() +{ + g_forkLock.downgradeWrite(); +} + int aeThreadOwnsLock() { return fOwnLockOverride || g_lock.fOwnLock(); diff --git a/src/ae.h b/src/ae.h index c22624ad6..cd513f652 100644 --- a/src/ae.h +++ b/src/ae.h @@ -164,9 +164,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait); void aeClosePipesForForkChild(aeEventLoop *eventLoop); void setAeLockSetThreadSpinWorker(spin_worker worker); +void aeThreadOnline(); void aeAcquireLock(); +void aeAcquireForkLock(); int aeTryAcquireLock(int fWeak); +void aeThreadOffline(); void aeReleaseLock(); +void aeReleaseForkLock(); int aeThreadOwnsLock(); void aeSetThreadOwnsLockOverride(int fOverride); int aeLockContested(int threshold); diff --git a/src/aelocker.h b/src/aelocker.h index dda2b3142..75a3cef53 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -34,9 +34,7 @@ public: clientNesting = c->lock.unlock_recursive(); fOwnClientLock = false; } - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive { aeReleaseLock(); @@ -54,9 +52,7 @@ public: else if (!m_fArmed) { m_fArmed = true; - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); } } diff --git a/src/networking.cpp b/src/networking.cpp index 73fd4332c..a7920b692 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1359,9 +1359,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) } rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); acceptCommonHandler(conn,flags,cip,ielCur); aeReleaseLock(); } diff --git a/src/replication.cpp b/src/replication.cpp index 8347b8228..74e77c307 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -5334,9 +5334,7 @@ struct RemoteMasterState ~RemoteMasterState() { - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); freeClient(cFake); aeReleaseLock(); } diff --git a/src/server.cpp b/src/server.cpp index 1e6052417..7a91fb97d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2973,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { g_pserver->garbageCollector.endEpoch(epoch); }, true /*fHiPri*/); } - g_forkLock->releaseRead(); + aeThreadOffline(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -2994,7 +2994,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); - g_forkLock->acquireRead(); + aeThreadOnline(); wakeTimeThread(); serverAssert(serverTL->gcEpoch.isReset()); @@ -6851,7 +6851,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } long long startWriteLock = ustime(); - g_forkLock->upgradeWrite(); + aeAcquireForkLock(); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ @@ -6861,7 +6861,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - g_forkLock->downgradeWrite(); + aeReleaseForkLock(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -7218,21 +7218,21 @@ void *timeThreadMain(void*) { delay.tv_sec = 0; delay.tv_nsec = 100; int cycle_count = 0; - g_forkLock->acquireRead(); + aeThreadOnline(); while (true) { { std::unique_lock lock(time_thread_mutex); if (sleeping_threads >= cserver.cthreads) { - g_forkLock->releaseRead(); + aeThreadOffline(); time_thread_cv.wait(lock); - g_forkLock->acquireRead(); + aeThreadOnline(); cycle_count = 0; } } updateCachedTime(); if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { - g_forkLock->releaseRead(); - g_forkLock->acquireRead(); + aeThreadOffline(); + aeThreadOnline(); cycle_count = 0; } #if defined(__APPLE__) @@ -7242,7 +7242,7 @@ void *timeThreadMain(void*) { #endif cycle_count++; } - g_forkLock->releaseRead(); + aeThreadOffline(); } void *workerThreadMain(void *parg) @@ -7260,7 +7260,7 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run - g_forkLock->acquireRead(); + aeThreadOnline(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { @@ -7269,7 +7269,7 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } - g_forkLock->releaseRead(); + aeThreadOffline(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); @@ -7418,8 +7418,8 @@ int main(int argc, char **argv) { g_pserver->sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; + aeThreadOnline(); aeAcquireLock(); // We own the lock on boot - g_forkLock->acquireRead(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); @@ -7653,8 +7653,8 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(g_pserver->server_cpulist); - g_forkLock->releaseRead(); aeReleaseLock(); //Finally we can dump the lock + aeThreadOffline(); moduleReleaseGIL(true); setOOMScoreAdj(-1); diff --git a/src/server.h b/src/server.h index 974661ad8..e3d7e90ae 100644 --- a/src/server.h +++ b/src/server.h @@ -2763,7 +2763,6 @@ typedef struct { *----------------------------------------------------------------------------*/ //extern struct redisServer server; -extern readWriteLock *g_forkLock; extern struct redisServerConst cserver; extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars extern struct sharedObjectsStruct shared;