hide forklock object in ae

This commit is contained in:
Malavan Sotheeswaran 2022-01-18 11:36:59 -08:00 committed by John Sully
parent 52402c4786
commit 4d4acaa155
7 changed files with 55 additions and 39 deletions

View File

@ -88,8 +88,7 @@ mutex_wrapper g_lock;
#else #else
fastlock g_lock("AE (global)"); fastlock g_lock("AE (global)");
#endif #endif
readWriteLock forkLock("Fork (global)"); readWriteLock g_forkLock("Fork (global)");
readWriteLock *g_forkLock = &forkLock;
thread_local aeEventLoop *g_eventLoopThisThread = NULL; thread_local aeEventLoop *g_eventLoopThisThread = NULL;
/* Include the best multiplexing layer supported by this system. /* Include the best multiplexing layer supported by this system.
@ -158,9 +157,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostFunction: case AE_ASYNC_OP::PostFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) { if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
((aePostFunctionProc*)cmd.proc)(cmd.clientData); ((aePostFunctionProc*)cmd.proc)(cmd.clientData);
break; break;
@ -169,9 +168,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostCppFunction: case AE_ASYNC_OP::PostCppFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) { if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
(*cmd.pfn)(); (*cmd.pfn)();
@ -557,9 +556,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
te->next->prev = te->prev; te->next->prev = te->prev;
if (te->finalizerProc) { if (te->finalizerProc) {
if (!ulock.owns_lock()) { if (!ulock.owns_lock()) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
te->finalizerProc(eventLoop, te->clientData); te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs(); now = getMonotonicUs();
@ -581,9 +580,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
if (te->when <= now) { if (te->when <= now) {
if (!ulock.owns_lock()) { if (!ulock.owns_lock()) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
int retval; int retval;
@ -609,9 +608,9 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
#define LOCK_IF_NECESSARY(fe, tsmask) \ #define LOCK_IF_NECESSARY(fe, tsmask) \
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \ std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
if (!(fe->mask & tsmask)) { \ if (!(fe->mask & tsmask)) { \
g_forkLock->releaseRead(); \ g_forkLock.releaseRead(); \
ulock.lock(); \ ulock.lock(); \
g_forkLock->acquireRead(); \ g_forkLock.acquireRead(); \
} }
int fired = 0; /* Number of events fired for current fd. */ int fired = 0; /* Number of events fired for current fd. */
@ -725,9 +724,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) { if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
eventLoop->beforesleep(eventLoop); eventLoop->beforesleep(eventLoop);
} }
@ -740,9 +739,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) { if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock->releaseRead(); g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead(); g_forkLock.acquireRead();
} }
eventLoop->aftersleep(eventLoop); eventLoop->aftersleep(eventLoop);
} }
@ -818,9 +817,21 @@ void setAeLockSetThreadSpinWorker(spin_worker worker)
tl_worker = worker; tl_worker = worker;
} }
void aeThreadOnline()
{
g_forkLock.acquireRead();
}
void aeAcquireLock() void aeAcquireLock()
{ {
g_forkLock.releaseRead();
g_lock.lock(tl_worker); g_lock.lock(tl_worker);
g_forkLock.acquireRead();
}
void aeAcquireForkLock()
{
g_forkLock.upgradeWrite();
} }
int aeTryAcquireLock(int fWeak) int aeTryAcquireLock(int fWeak)
@ -828,6 +839,11 @@ int aeTryAcquireLock(int fWeak)
return g_lock.try_lock(!!fWeak); return g_lock.try_lock(!!fWeak);
} }
void aeThreadOffline()
{
g_forkLock.releaseRead();
}
void aeReleaseLock() void aeReleaseLock()
{ {
g_lock.unlock(); g_lock.unlock();
@ -838,6 +854,11 @@ void aeSetThreadOwnsLockOverride(int fOverride)
fOwnLockOverride = fOverride; fOwnLockOverride = fOverride;
} }
void aeReleaseForkLock()
{
g_forkLock.downgradeWrite();
}
int aeThreadOwnsLock() int aeThreadOwnsLock()
{ {
return fOwnLockOverride || g_lock.fOwnLock(); return fOwnLockOverride || g_lock.fOwnLock();

View File

@ -164,9 +164,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
void aeClosePipesForForkChild(aeEventLoop *eventLoop); void aeClosePipesForForkChild(aeEventLoop *eventLoop);
void setAeLockSetThreadSpinWorker(spin_worker worker); void setAeLockSetThreadSpinWorker(spin_worker worker);
void aeThreadOnline();
void aeAcquireLock(); void aeAcquireLock();
void aeAcquireForkLock();
int aeTryAcquireLock(int fWeak); int aeTryAcquireLock(int fWeak);
void aeThreadOffline();
void aeReleaseLock(); void aeReleaseLock();
void aeReleaseForkLock();
int aeThreadOwnsLock(); int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(int fOverride); void aeSetThreadOwnsLockOverride(int fOverride);
int aeLockContested(int threshold); int aeLockContested(int threshold);

View File

@ -34,9 +34,7 @@ public:
clientNesting = c->lock.unlock_recursive(); clientNesting = c->lock.unlock_recursive();
fOwnClientLock = false; fOwnClientLock = false;
} }
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive
{ {
aeReleaseLock(); aeReleaseLock();
@ -54,9 +52,7 @@ public:
else if (!m_fArmed) else if (!m_fArmed)
{ {
m_fArmed = true; m_fArmed = true;
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
} }
} }

View File

@ -1359,9 +1359,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
} }
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
acceptCommonHandler(conn,flags,cip,ielCur); acceptCommonHandler(conn,flags,cip,ielCur);
aeReleaseLock(); aeReleaseLock();
} }

View File

@ -5334,9 +5334,7 @@ struct RemoteMasterState
~RemoteMasterState() ~RemoteMasterState()
{ {
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
freeClient(cFake); freeClient(cFake);
aeReleaseLock(); aeReleaseLock();
} }

View File

@ -2973,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
g_pserver->garbageCollector.endEpoch(epoch); g_pserver->garbageCollector.endEpoch(epoch);
}, true /*fHiPri*/); }, true /*fHiPri*/);
} }
g_forkLock->releaseRead(); aeThreadOffline();
/* Determine whether the modules are enabled before sleeping, and use that result /* Determine whether the modules are enabled before sleeping, and use that result
both here, and after wakeup to avoid double acquire or release of the GIL */ both here, and after wakeup to avoid double acquire or release of the GIL */
serverTL->modulesEnabledThisAeLoop = !!moduleCount(); serverTL->modulesEnabledThisAeLoop = !!moduleCount();
@ -2994,7 +2994,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Otherwise you may double acquire the GIL and cause deadlocks in the module */ Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) { if (!ProcessingEventsWhileBlocked) {
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
g_forkLock->acquireRead(); aeThreadOnline();
wakeTimeThread(); wakeTimeThread();
serverAssert(serverTL->gcEpoch.isReset()); serverAssert(serverTL->gcEpoch.isReset());
@ -6851,7 +6851,7 @@ int redisFork(int purpose) {
openChildInfoPipe(); openChildInfoPipe();
} }
long long startWriteLock = ustime(); long long startWriteLock = ustime();
g_forkLock->upgradeWrite(); aeAcquireForkLock();
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) { if ((childpid = fork()) == 0) {
/* Child */ /* Child */
@ -6861,7 +6861,7 @@ int redisFork(int purpose) {
closeChildUnusedResourceAfterFork(); closeChildUnusedResourceAfterFork();
} else { } else {
/* Parent */ /* Parent */
g_forkLock->downgradeWrite(); aeReleaseForkLock();
g_pserver->stat_total_forks++; g_pserver->stat_total_forks++;
g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
@ -7218,21 +7218,21 @@ void *timeThreadMain(void*) {
delay.tv_sec = 0; delay.tv_sec = 0;
delay.tv_nsec = 100; delay.tv_nsec = 100;
int cycle_count = 0; int cycle_count = 0;
g_forkLock->acquireRead(); aeThreadOnline();
while (true) { while (true) {
{ {
std::unique_lock<std::mutex> lock(time_thread_mutex); std::unique_lock<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads) { if (sleeping_threads >= cserver.cthreads) {
g_forkLock->releaseRead(); aeThreadOffline();
time_thread_cv.wait(lock); time_thread_cv.wait(lock);
g_forkLock->acquireRead(); aeThreadOnline();
cycle_count = 0; cycle_count = 0;
} }
} }
updateCachedTime(); updateCachedTime();
if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) {
g_forkLock->releaseRead(); aeThreadOffline();
g_forkLock->acquireRead(); aeThreadOnline();
cycle_count = 0; cycle_count = 0;
} }
#if defined(__APPLE__) #if defined(__APPLE__)
@ -7242,7 +7242,7 @@ void *timeThreadMain(void*) {
#endif #endif
cycle_count++; cycle_count++;
} }
g_forkLock->releaseRead(); aeThreadOffline();
} }
void *workerThreadMain(void *parg) void *workerThreadMain(void *parg)
@ -7260,7 +7260,7 @@ void *workerThreadMain(void *parg)
} }
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
g_forkLock->acquireRead(); aeThreadOnline();
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try try
{ {
@ -7269,7 +7269,7 @@ void *workerThreadMain(void *parg)
catch (ShutdownException) catch (ShutdownException)
{ {
} }
g_forkLock->releaseRead(); aeThreadOffline();
moduleReleaseGIL(true); moduleReleaseGIL(true);
serverAssert(!GlobalLocksAcquired()); serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el); aeDeleteEventLoop(el);
@ -7418,8 +7418,8 @@ int main(int argc, char **argv) {
g_pserver->sentinel_mode = checkForSentinelMode(argc,argv); g_pserver->sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig(); initServerConfig();
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
aeThreadOnline();
aeAcquireLock(); // We own the lock on boot aeAcquireLock(); // We own the lock on boot
g_forkLock->acquireRead();
ACLInit(); /* The ACL subsystem must be initialized ASAP because the ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */ basic networking code and client creation depends on it. */
moduleInitModulesSystem(); moduleInitModulesSystem();
@ -7653,8 +7653,8 @@ int main(int argc, char **argv) {
} }
redisSetCpuAffinity(g_pserver->server_cpulist); redisSetCpuAffinity(g_pserver->server_cpulist);
g_forkLock->releaseRead();
aeReleaseLock(); //Finally we can dump the lock aeReleaseLock(); //Finally we can dump the lock
aeThreadOffline();
moduleReleaseGIL(true); moduleReleaseGIL(true);
setOOMScoreAdj(-1); setOOMScoreAdj(-1);

View File

@ -2763,7 +2763,6 @@ typedef struct {
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
//extern struct redisServer server; //extern struct redisServer server;
extern readWriteLock *g_forkLock;
extern struct redisServerConst cserver; extern struct redisServerConst cserver;
extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars
extern struct sharedObjectsStruct shared; extern struct sharedObjectsStruct shared;