fork lock for all threads, use fastlock for readwritelock

This commit is contained in:
malavan 2022-01-14 18:50:20 +00:00 committed by John Sully
parent 84b57684b0
commit 52402c4786
7 changed files with 67 additions and 48 deletions

View File

@ -53,6 +53,7 @@
#include "zmalloc.h" #include "zmalloc.h"
#include "config.h" #include "config.h"
#include "serverassert.h" #include "serverassert.h"
#include "server.h"
#ifdef USE_MUTEX #ifdef USE_MUTEX
thread_local int cOwnLock = 0; thread_local int cOwnLock = 0;
@ -87,6 +88,8 @@ mutex_wrapper g_lock;
#else #else
fastlock g_lock("AE (global)"); fastlock g_lock("AE (global)");
#endif #endif
readWriteLock forkLock("Fork (global)");
readWriteLock *g_forkLock = &forkLock;
thread_local aeEventLoop *g_eventLoopThisThread = NULL; thread_local aeEventLoop *g_eventLoopThisThread = NULL;
/* Include the best multiplexing layer supported by this system. /* Include the best multiplexing layer supported by this system.
@ -154,16 +157,22 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostFunction: case AE_ASYNC_OP::PostFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock->releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead();
}
((aePostFunctionProc*)cmd.proc)(cmd.clientData); ((aePostFunctionProc*)cmd.proc)(cmd.clientData);
break; break;
} }
case AE_ASYNC_OP::PostCppFunction: case AE_ASYNC_OP::PostCppFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock->releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead();
}
(*cmd.pfn)(); (*cmd.pfn)();
delete cmd.pfn; delete cmd.pfn;
@ -547,7 +556,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
if (te->next) if (te->next)
te->next->prev = te->prev; te->next->prev = te->prev;
if (te->finalizerProc) { if (te->finalizerProc) {
if (!ulock.owns_lock()) ulock.lock(); if (!ulock.owns_lock()) {
g_forkLock->releaseRead();
ulock.lock();
g_forkLock->acquireRead();
}
te->finalizerProc(eventLoop, te->clientData); te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs(); now = getMonotonicUs();
} }
@ -567,7 +580,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
} }
if (te->when <= now) { if (te->when <= now) {
if (!ulock.owns_lock()) ulock.lock(); if (!ulock.owns_lock()) {
g_forkLock->releaseRead();
ulock.lock();
g_forkLock->acquireRead();
}
int retval; int retval;
id = te->id; id = te->id;
@ -591,8 +608,11 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
{ {
#define LOCK_IF_NECESSARY(fe, tsmask) \ #define LOCK_IF_NECESSARY(fe, tsmask) \
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \ std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
if (!(fe->mask & tsmask)) \ if (!(fe->mask & tsmask)) { \
ulock.lock() g_forkLock->releaseRead(); \
ulock.lock(); \
g_forkLock->acquireRead(); \
}
int fired = 0; /* Number of events fired for current fd. */ int fired = 0; /* Number of events fired for current fd. */
@ -704,8 +724,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock->releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead();
}
eventLoop->beforesleep(eventLoop); eventLoop->beforesleep(eventLoop);
} }
@ -716,8 +739,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* After sleep callback. */ /* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock->releaseRead();
ulock.lock(); ulock.lock();
g_forkLock->acquireRead();
}
eventLoop->aftersleep(eventLoop); eventLoop->aftersleep(eventLoop);
} }

View File

@ -34,7 +34,9 @@ public:
clientNesting = c->lock.unlock_recursive(); clientNesting = c->lock.unlock_recursive();
fOwnClientLock = false; fOwnClientLock = false;
} }
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive
{ {
aeReleaseLock(); aeReleaseLock();
@ -52,7 +54,9 @@ public:
else if (!m_fArmed) else if (!m_fArmed)
{ {
m_fArmed = true; m_fArmed = true;
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
} }
} }

View File

@ -365,7 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters; static list *moduleCommandFilters;
/* Module GIL Variables */ /* Module GIL Variables */
static readWriteLock s_moduleGIL; static readWriteLock s_moduleGIL("Module GIL");
thread_local bool g_fModuleThread = false; thread_local bool g_fModuleThread = false;
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);

View File

@ -1359,7 +1359,9 @@ void acceptOnThread(connection *conn, int flags, char *cip)
} }
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
acceptCommonHandler(conn,flags,cip,ielCur); acceptCommonHandler(conn,flags,cip,ielCur);
aeReleaseLock(); aeReleaseLock();
} }

View File

@ -2,22 +2,24 @@
#include <condition_variable> #include <condition_variable>
class readWriteLock { class readWriteLock {
std::mutex m_readLock; fastlock m_readLock;
std::recursive_mutex m_writeLock; fastlock m_writeLock;
std::condition_variable m_cv; std::condition_variable_any m_cv;
int m_readCount = 0; int m_readCount = 0;
int m_writeCount = 0; int m_writeCount = 0;
bool m_writeWaiting = false; bool m_writeWaiting = false;
public: public:
readWriteLock(const char *name) : m_readLock(name), m_writeLock(name) {}
void acquireRead() { void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting) while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm); m_cv.wait(rm);
m_readCount++; m_readCount++;
} }
bool tryAcquireRead() { bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock); std::unique_lock<fastlock> rm(m_readLock, std::defer_lock);
if (!rm.try_lock()) if (!rm.try_lock())
return false; return false;
if (m_writeCount > 0 || m_writeWaiting) if (m_writeCount > 0 || m_writeWaiting)
@ -27,7 +29,7 @@ public:
} }
void acquireWrite(bool exclusive = true) { void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
m_writeWaiting = true; m_writeWaiting = true;
while (m_readCount > 0) while (m_readCount > 0)
m_cv.wait(rm); m_cv.wait(rm);
@ -43,24 +45,12 @@ public:
} }
void upgradeWrite(bool exclusive = true) { void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); releaseRead();
m_writeWaiting = true; acquireWrite(exclusive);
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
} }
bool tryAcquireWrite(bool exclusive = true) { bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock); std::unique_lock<fastlock> rm(m_readLock, std::defer_lock);
if (!rm.try_lock()) if (!rm.try_lock())
return false; return false;
if (m_readCount > 0) if (m_readCount > 0)
@ -73,14 +63,13 @@ public:
} }
void releaseRead() { void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--; m_readCount--;
m_cv.notify_all(); m_cv.notify_all();
} }
void releaseWrite(bool exclusive = true) { void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
serverAssert(m_writeCount > 0); serverAssert(m_writeCount > 0);
if (exclusive) if (exclusive)
m_writeLock.unlock(); m_writeLock.unlock();
@ -89,14 +78,8 @@ public:
} }
void downgradeWrite(bool exclusive = true) { void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); releaseWrite(exclusive);
serverAssert(m_writeCount > 0); acquireRead();
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
} }
bool hasReader() { bool hasReader() {

View File

@ -5334,7 +5334,9 @@ struct RemoteMasterState
~RemoteMasterState() ~RemoteMasterState()
{ {
g_forkLock->releaseRead();
aeAcquireLock(); aeAcquireLock();
g_forkLock->acquireRead();
freeClient(cFake); freeClient(cFake);
aeReleaseLock(); aeReleaseLock();
} }

View File

@ -92,10 +92,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* Global vars */ /* Global vars */
namespace GlobalHidden { namespace GlobalHidden {
struct redisServer server; /* Server global state */ struct redisServer server; /* Server global state */
readWriteLock forkLock;
} }
redisServer *g_pserver = &GlobalHidden::server; redisServer *g_pserver = &GlobalHidden::server;
readWriteLock *g_forkLock = &GlobalHidden::forkLock;
struct redisServerConst cserver; struct redisServerConst cserver;
thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex; std::mutex time_thread_mutex;
@ -2975,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
g_pserver->garbageCollector.endEpoch(epoch); g_pserver->garbageCollector.endEpoch(epoch);
}, true /*fHiPri*/); }, true /*fHiPri*/);
} }
g_forkLock->releaseRead();
/* Determine whether the modules are enabled before sleeping, and use that result /* Determine whether the modules are enabled before sleeping, and use that result
both here, and after wakeup to avoid double acquire or release of the GIL */ both here, and after wakeup to avoid double acquire or release of the GIL */
serverTL->modulesEnabledThisAeLoop = !!moduleCount(); serverTL->modulesEnabledThisAeLoop = !!moduleCount();
@ -2995,8 +2993,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Don't check here that modules are enabled, rather use the result from beforeSleep Don't check here that modules are enabled, rather use the result from beforeSleep
Otherwise you may double acquire the GIL and cause deadlocks in the module */ Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) { if (!ProcessingEventsWhileBlocked) {
wakeTimeThread();
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
g_forkLock->acquireRead();
wakeTimeThread();
serverAssert(serverTL->gcEpoch.isReset()); serverAssert(serverTL->gcEpoch.isReset());
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
@ -6852,7 +6851,7 @@ int redisFork(int purpose) {
openChildInfoPipe(); openChildInfoPipe();
} }
long long startWriteLock = ustime(); long long startWriteLock = ustime();
g_forkLock->acquireWrite(); g_forkLock->upgradeWrite();
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) { if ((childpid = fork()) == 0) {
/* Child */ /* Child */
@ -6862,7 +6861,7 @@ int redisFork(int purpose) {
closeChildUnusedResourceAfterFork(); closeChildUnusedResourceAfterFork();
} else { } else {
/* Parent */ /* Parent */
g_forkLock->releaseWrite(); g_forkLock->downgradeWrite();
g_pserver->stat_total_forks++; g_pserver->stat_total_forks++;
g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
@ -7261,6 +7260,7 @@ void *workerThreadMain(void *parg)
} }
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
g_forkLock->acquireRead();
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try try
{ {
@ -7269,6 +7269,7 @@ void *workerThreadMain(void *parg)
catch (ShutdownException) catch (ShutdownException)
{ {
} }
g_forkLock->releaseRead();
moduleReleaseGIL(true); moduleReleaseGIL(true);
serverAssert(!GlobalLocksAcquired()); serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el); aeDeleteEventLoop(el);
@ -7418,7 +7419,7 @@ int main(int argc, char **argv) {
initServerConfig(); initServerConfig();
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
aeAcquireLock(); // We own the lock on boot aeAcquireLock(); // We own the lock on boot
g_forkLock->acquireRead();
ACLInit(); /* The ACL subsystem must be initialized ASAP because the ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */ basic networking code and client creation depends on it. */
moduleInitModulesSystem(); moduleInitModulesSystem();
@ -7652,6 +7653,7 @@ int main(int argc, char **argv) {
} }
redisSetCpuAffinity(g_pserver->server_cpulist); redisSetCpuAffinity(g_pserver->server_cpulist);
g_forkLock->releaseRead();
aeReleaseLock(); //Finally we can dump the lock aeReleaseLock(); //Finally we can dump the lock
moduleReleaseGIL(true); moduleReleaseGIL(true);