hide forklock object in ae
This commit is contained in:
parent
a352731178
commit
f35baf8e7d
53
src/ae.cpp
53
src/ae.cpp
@ -88,8 +88,7 @@ mutex_wrapper g_lock;
|
||||
#else
|
||||
fastlock g_lock("AE (global)");
|
||||
#endif
|
||||
readWriteLock forkLock("Fork (global)");
|
||||
readWriteLock *g_forkLock = &forkLock;
|
||||
readWriteLock g_forkLock("Fork (global)");
|
||||
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
||||
|
||||
/* Include the best multiplexing layer supported by this system.
|
||||
@ -158,9 +157,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
case AE_ASYNC_OP::PostFunction:
|
||||
{
|
||||
if (cmd.fLock && !ulock.owns_lock()) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
((aePostFunctionProc*)cmd.proc)(cmd.clientData);
|
||||
break;
|
||||
@ -169,9 +168,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
case AE_ASYNC_OP::PostCppFunction:
|
||||
{
|
||||
if (cmd.fLock && !ulock.owns_lock()) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
(*cmd.pfn)();
|
||||
|
||||
@ -557,9 +556,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||
te->next->prev = te->prev;
|
||||
if (te->finalizerProc) {
|
||||
if (!ulock.owns_lock()) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
te->finalizerProc(eventLoop, te->clientData);
|
||||
now = getMonotonicUs();
|
||||
@ -581,9 +580,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||
|
||||
if (te->when <= now) {
|
||||
if (!ulock.owns_lock()) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
int retval;
|
||||
|
||||
@ -609,9 +608,9 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
|
||||
#define LOCK_IF_NECESSARY(fe, tsmask) \
|
||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
|
||||
if (!(fe->mask & tsmask)) { \
|
||||
g_forkLock->releaseRead(); \
|
||||
g_forkLock.releaseRead(); \
|
||||
ulock.lock(); \
|
||||
g_forkLock->acquireRead(); \
|
||||
g_forkLock.acquireRead(); \
|
||||
}
|
||||
|
||||
int fired = 0; /* Number of events fired for current fd. */
|
||||
@ -725,9 +724,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) {
|
||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
eventLoop->beforesleep(eventLoop);
|
||||
}
|
||||
@ -740,9 +739,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
|
||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||
if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock.releaseRead();
|
||||
ulock.lock();
|
||||
g_forkLock->acquireRead();
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
eventLoop->aftersleep(eventLoop);
|
||||
}
|
||||
@ -818,9 +817,21 @@ void setAeLockSetThreadSpinWorker(spin_worker worker)
|
||||
tl_worker = worker;
|
||||
}
|
||||
|
||||
void aeThreadOnline()
|
||||
{
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
|
||||
void aeAcquireLock()
|
||||
{
|
||||
g_forkLock.releaseRead();
|
||||
g_lock.lock(tl_worker);
|
||||
g_forkLock.acquireRead();
|
||||
}
|
||||
|
||||
void aeAcquireForkLock()
|
||||
{
|
||||
g_forkLock.upgradeWrite();
|
||||
}
|
||||
|
||||
int aeTryAcquireLock(int fWeak)
|
||||
@ -828,6 +839,11 @@ int aeTryAcquireLock(int fWeak)
|
||||
return g_lock.try_lock(!!fWeak);
|
||||
}
|
||||
|
||||
void aeThreadOffline()
|
||||
{
|
||||
g_forkLock.releaseRead();
|
||||
}
|
||||
|
||||
void aeReleaseLock()
|
||||
{
|
||||
g_lock.unlock();
|
||||
@ -838,6 +854,11 @@ void aeSetThreadOwnsLockOverride(int fOverride)
|
||||
fOwnLockOverride = fOverride;
|
||||
}
|
||||
|
||||
void aeReleaseForkLock()
|
||||
{
|
||||
g_forkLock.downgradeWrite();
|
||||
}
|
||||
|
||||
int aeThreadOwnsLock()
|
||||
{
|
||||
return fOwnLockOverride || g_lock.fOwnLock();
|
||||
|
4
src/ae.h
4
src/ae.h
@ -164,9 +164,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
||||
void aeClosePipesForForkChild(aeEventLoop *eventLoop);
|
||||
|
||||
void setAeLockSetThreadSpinWorker(spin_worker worker);
|
||||
void aeThreadOnline();
|
||||
void aeAcquireLock();
|
||||
void aeAcquireForkLock();
|
||||
int aeTryAcquireLock(int fWeak);
|
||||
void aeThreadOffline();
|
||||
void aeReleaseLock();
|
||||
void aeReleaseForkLock();
|
||||
int aeThreadOwnsLock();
|
||||
void aeSetThreadOwnsLockOverride(int fOverride);
|
||||
int aeLockContested(int threshold);
|
||||
|
@ -34,9 +34,7 @@ public:
|
||||
clientNesting = c->lock.unlock_recursive();
|
||||
fOwnClientLock = false;
|
||||
}
|
||||
g_forkLock->releaseRead();
|
||||
aeAcquireLock();
|
||||
g_forkLock->acquireRead();
|
||||
if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive
|
||||
{
|
||||
aeReleaseLock();
|
||||
@ -54,9 +52,7 @@ public:
|
||||
else if (!m_fArmed)
|
||||
{
|
||||
m_fArmed = true;
|
||||
g_forkLock->releaseRead();
|
||||
aeAcquireLock();
|
||||
g_forkLock->acquireRead();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1359,9 +1359,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
|
||||
}
|
||||
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||
|
||||
g_forkLock->releaseRead();
|
||||
aeAcquireLock();
|
||||
g_forkLock->acquireRead();
|
||||
acceptCommonHandler(conn,flags,cip,ielCur);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
@ -5334,9 +5334,7 @@ struct RemoteMasterState
|
||||
|
||||
~RemoteMasterState()
|
||||
{
|
||||
g_forkLock->releaseRead();
|
||||
aeAcquireLock();
|
||||
g_forkLock->acquireRead();
|
||||
freeClient(cFake);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
@ -2973,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
g_pserver->garbageCollector.endEpoch(epoch);
|
||||
}, true /*fHiPri*/);
|
||||
}
|
||||
g_forkLock->releaseRead();
|
||||
aeThreadOffline();
|
||||
/* Determine whether the modules are enabled before sleeping, and use that result
|
||||
both here, and after wakeup to avoid double acquire or release of the GIL */
|
||||
serverTL->modulesEnabledThisAeLoop = !!moduleCount();
|
||||
@ -2994,7 +2994,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
Otherwise you may double acquire the GIL and cause deadlocks in the module */
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
|
||||
g_forkLock->acquireRead();
|
||||
aeThreadOnline();
|
||||
wakeTimeThread();
|
||||
|
||||
serverAssert(serverTL->gcEpoch.isReset());
|
||||
@ -6851,7 +6851,7 @@ int redisFork(int purpose) {
|
||||
openChildInfoPipe();
|
||||
}
|
||||
long long startWriteLock = ustime();
|
||||
g_forkLock->upgradeWrite();
|
||||
aeAcquireForkLock();
|
||||
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
|
||||
if ((childpid = fork()) == 0) {
|
||||
/* Child */
|
||||
@ -6861,7 +6861,7 @@ int redisFork(int purpose) {
|
||||
closeChildUnusedResourceAfterFork();
|
||||
} else {
|
||||
/* Parent */
|
||||
g_forkLock->downgradeWrite();
|
||||
aeReleaseForkLock();
|
||||
g_pserver->stat_total_forks++;
|
||||
g_pserver->stat_fork_time = ustime()-start;
|
||||
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
|
||||
@ -7218,21 +7218,21 @@ void *timeThreadMain(void*) {
|
||||
delay.tv_sec = 0;
|
||||
delay.tv_nsec = 100;
|
||||
int cycle_count = 0;
|
||||
g_forkLock->acquireRead();
|
||||
aeThreadOnline();
|
||||
while (true) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(time_thread_mutex);
|
||||
if (sleeping_threads >= cserver.cthreads) {
|
||||
g_forkLock->releaseRead();
|
||||
aeThreadOffline();
|
||||
time_thread_cv.wait(lock);
|
||||
g_forkLock->acquireRead();
|
||||
aeThreadOnline();
|
||||
cycle_count = 0;
|
||||
}
|
||||
}
|
||||
updateCachedTime();
|
||||
if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) {
|
||||
g_forkLock->releaseRead();
|
||||
g_forkLock->acquireRead();
|
||||
aeThreadOffline();
|
||||
aeThreadOnline();
|
||||
cycle_count = 0;
|
||||
}
|
||||
#if defined(__APPLE__)
|
||||
@ -7242,7 +7242,7 @@ void *timeThreadMain(void*) {
|
||||
#endif
|
||||
cycle_count++;
|
||||
}
|
||||
g_forkLock->releaseRead();
|
||||
aeThreadOffline();
|
||||
}
|
||||
|
||||
void *workerThreadMain(void *parg)
|
||||
@ -7260,7 +7260,7 @@ void *workerThreadMain(void *parg)
|
||||
}
|
||||
|
||||
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
|
||||
g_forkLock->acquireRead();
|
||||
aeThreadOnline();
|
||||
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
|
||||
try
|
||||
{
|
||||
@ -7269,7 +7269,7 @@ void *workerThreadMain(void *parg)
|
||||
catch (ShutdownException)
|
||||
{
|
||||
}
|
||||
g_forkLock->releaseRead();
|
||||
aeThreadOffline();
|
||||
moduleReleaseGIL(true);
|
||||
serverAssert(!GlobalLocksAcquired());
|
||||
aeDeleteEventLoop(el);
|
||||
@ -7418,8 +7418,8 @@ int main(int argc, char **argv) {
|
||||
g_pserver->sentinel_mode = checkForSentinelMode(argc,argv);
|
||||
initServerConfig();
|
||||
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
|
||||
aeThreadOnline();
|
||||
aeAcquireLock(); // We own the lock on boot
|
||||
g_forkLock->acquireRead();
|
||||
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
|
||||
basic networking code and client creation depends on it. */
|
||||
moduleInitModulesSystem();
|
||||
@ -7653,8 +7653,8 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
redisSetCpuAffinity(g_pserver->server_cpulist);
|
||||
g_forkLock->releaseRead();
|
||||
aeReleaseLock(); //Finally we can dump the lock
|
||||
aeThreadOffline();
|
||||
moduleReleaseGIL(true);
|
||||
|
||||
setOOMScoreAdj(-1);
|
||||
|
@ -2763,7 +2763,6 @@ typedef struct {
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
//extern struct redisServer server;
|
||||
extern readWriteLock *g_forkLock;
|
||||
extern struct redisServerConst cserver;
|
||||
extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars
|
||||
extern struct sharedObjectsStruct shared;
|
||||
|
Loading…
x
Reference in New Issue
Block a user