Merge pull request #386 from EQ-Alpha/fix_rdb_hang

add readwrite lock for forking

Former-commit-id: dabb81960f6ccc7f62c53648127924fc2fef7cdc
This commit is contained in:
MalavanEQAlpha 2022-01-14 16:17:32 -05:00 committed by Malavan Sotheeswaran
parent ec255fea11
commit 4e2e461acc
5 changed files with 231 additions and 123 deletions

View File

@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters;
/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
static std::recursive_mutex s_mutexModule;
static readWriteLock s_moduleGIL;
thread_local bool g_fModuleThread = false;
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
@ -5969,95 +5965,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
static bool FModuleCallBackLock(bool fServerThread)
{
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader();
}
void moduleAcquireGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
if (FModuleCallBackLock(fServerThread)) {
return;
}
while (*pcheck > 0)
s_cv.wait(lock);
if (fServerThread)
{
++s_cAcquisitionsServer;
s_moduleGIL.acquireRead();
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
s_moduleGIL.acquireWrite(fExclusive);
}
}
int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
if (!lock.try_lock())
return 1;
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
if (FModuleCallBackLock(fServerThread)) {
return 0;
}
if (*pcheck > 0)
return 1;
if (fServerThread)
{
++s_cAcquisitionsServer;
if (!s_moduleGIL.tryAcquireRead())
return 1;
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
if (!s_mutexModule.try_lock())
return 1;
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
if (!s_moduleGIL.tryAcquireWrite(fExclusive))
return 1;
}
return 0;
}
void moduleReleaseGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
if (FModuleCallBackLock(fServerThread)) {
return;
}
if (fServerThread)
{
--s_cAcquisitionsServer;
s_moduleGIL.releaseRead();
}
else
{
// only try to release the mutexModule in exclusive mode
if (fExclusive)
s_mutexModule.unlock();
--s_cAcquisitionsModule;
fModuleGILWlocked--;
s_moduleGIL.releaseWrite(fExclusive);
}
s_cv.notify_all();
}
int moduleGILAcquiredByModule(void) {
return fModuleGILWlocked > 0;
return s_moduleGIL.hasWriter();
}

View File

@ -3863,82 +3863,37 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
*
* The function returns the total number of events processed. */
void processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int iterations = 4; /* See the function top-comment. */
std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
int eventsCount = 0;
executeWithoutGlobalLock([&](){
int iterations = 4; /* See the function top-comment. */
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
}
ProcessingEventsWhileBlocked = 0;
}
}
/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}
long long eventsCount = 0;
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
catch (...)
{
ProcessingEventsWhileBlocked = 0;
throw;
}
ProcessingEventsWhileBlocked = 0;
}
catch (...)
{
// Caller expects us to be locked so fix and rethrow
ProcessingEventsWhileBlocked = 0;
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}
AeLocker locker;
locker.arm(nullptr);
locker.release();
});
g_pserver->events_processed_while_blocked += eventsCount;
whileBlockedCron();
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
for (client *c : vecclients)
c->lock.lock();
// If a different thread processed the shutdown we need to abort the lua command or we will hang
if (serverTL->el->stop)
throw ShutdownException();

113
src/readwritelock.h Normal file
View File

@ -0,0 +1,113 @@
#pragma once
#include <condition_variable>
class readWriteLock {
std::mutex m_readLock;
std::recursive_mutex m_writeLock;
std::condition_variable m_cv;
int m_readCount = 0;
int m_writeCount = 0;
bool m_writeWaiting = false;
public:
void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}
bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_writeCount > 0 || m_writeWaiting)
return false;
m_readCount++;
return true;
}
void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 0)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_writeWaiting = false;
}
void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
}
bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_readCount > 0)
return false;
if (exclusive)
if (!m_writeLock.try_lock())
return false;
m_writeCount++;
return true;
}
void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--;
m_cv.notify_all();
}
void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
m_cv.notify_all();
}
void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}
bool hasReader() {
return m_readCount > 0;
}
bool hasWriter() {
return m_writeCount > 0;
}
bool writeWaiting() {
return m_writeWaiting;
}
};

View File

@ -67,6 +67,7 @@
#include "aelocker.h"
#include "motd.h"
#include "t_nhash.h"
#include "readwritelock.h"
#ifdef __linux__
#include <sys/prctl.h>
#include <sys/mman.h>
@ -91,8 +92,10 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* Global vars */
namespace GlobalHidden {
struct redisServer server; /* Server global state */
readWriteLock forkLock;
}
redisServer *g_pserver = &GlobalHidden::server;
readWriteLock *g_forkLock = &GlobalHidden::forkLock;
struct redisServerConst cserver;
__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex;
@ -2649,8 +2652,8 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Don't check here that modules are enabled, rather use the result from beforeSleep
Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) {
wakeTimeThread();
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
wakeTimeThread();
}
}
@ -6218,6 +6221,63 @@ void closeChildUnusedResourceAfterFork() {
cserver.pidfile = NULL;
}
void executeWithoutGlobalLock(std::function<void()> func) {
serverAssert(GlobalLocksAcquired());
std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED || c->flags & CLIENT_EXECUTING_COMMAND); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
}
}
/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try {
func();
}
catch (...) {
// Caller expects us to be locked so fix and rethrow
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}
AeLocker locker;
locker.arm(nullptr);
locker.release();
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
for (client *c : vecclients)
c->lock.lock();
}
/* purpose is one of CHILD_TYPE_ types */
int redisFork(int purpose) {
int childpid;
@ -6229,7 +6289,9 @@ int redisFork(int purpose) {
openChildInfoPipe();
}
long long startWriteLock = ustime();
g_forkLock->acquireWrite();
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) {
/* Child */
g_pserver->in_fork_child = purpose;
@ -6238,6 +6300,7 @@ int redisFork(int purpose) {
closeChildUnusedResourceAfterFork();
} else {
/* Parent */
g_forkLock->releaseWrite();
g_pserver->stat_total_forks++;
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
@ -6554,20 +6617,32 @@ void *timeThreadMain(void*) {
timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = 100;
int cycle_count = 0;
g_forkLock->acquireRead();
while (true) {
{
std::unique_lock<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads) {
g_forkLock->releaseRead();
time_thread_cv.wait(lock);
g_forkLock->acquireRead();
cycle_count = 0;
}
}
updateCachedTime();
if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) {
g_forkLock->releaseRead();
g_forkLock->acquireRead();
cycle_count = 0;
}
#if defined(__APPLE__)
nanosleep(&delay, nullptr);
#else
clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL);
#endif
cycle_count++;
}
g_forkLock->releaseRead();
}
void *workerThreadMain(void *parg)

View File

@ -96,6 +96,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "connection.h" /* Connection abstraction */
#include "serverassert.h"
#include "expire.h"
#include "readwritelock.h"
#define REDISMODULE_CORE 1
#include "redismodule.h" /* Redis modules API defines. */
@ -733,6 +734,9 @@ typedef enum {
#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
#define REDISMODULE_AUX_AFTER_RDB (1<<1)
/* Number of cycles before time thread gives up fork lock */
#define MAX_CYCLES_TO_HOLD_FORK_LOCK 10
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
@ -2113,6 +2117,7 @@ typedef struct {
//extern struct redisServer server;
extern redisServer *g_pserver;
extern readWriteLock *g_forkLock;
extern struct redisServerConst cserver;
extern __thread struct redisServerThreadVars *serverTL; // thread local server vars
extern struct sharedObjectsStruct shared;
@ -2496,6 +2501,7 @@ void sendChildInfo(childInfoType info_type, size_t keys, const char *pname);
void receiveChildInfo(void);
/* Fork helpers */
void executeWithoutGlobalLock(std::function<void()> func);
int redisFork(int type);
int hasActiveChildProcess();
void resetChildState();