Merge branch 'unstable' into RELEASE_6

Former-commit-id: 0b7b5332f2a3bb5a6f4310643c343f53f546bd81
This commit is contained in:
John Sully 2021-02-07 23:55:58 +00:00
commit f4c7da7284
8 changed files with 61 additions and 99 deletions

View File

@ -48,8 +48,8 @@ endif
USEASM?=true USEASM?=true
ifneq ($(strip $(SANITIZE)),) ifneq ($(strip $(SANITIZE)),)
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer
LDFLAGS+= -fsanitize=$(SANITIZE) LDFLAGS+= -fsanitize=$(SANITIZE)
MALLOC=libc MALLOC=libc
USEASM=false USEASM=false

View File

@ -109,13 +109,6 @@ enum class AE_ASYNC_OP
CreateFileEvent, CreateFileEvent,
}; };
struct aeCommandControl
{
std::condition_variable cv;
std::atomic<int> rval;
std::mutex mutexcv;
};
struct aeCommand struct aeCommand
{ {
AE_ASYNC_OP op; AE_ASYNC_OP op;
@ -128,7 +121,6 @@ struct aeCommand
std::function<void()> *pfn; std::function<void()> *pfn;
}; };
void *clientData; void *clientData;
aeCommandControl *pctl;
}; };
void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
@ -149,19 +141,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
break; break;
case AE_ASYNC_OP::CreateFileEvent: case AE_ASYNC_OP::CreateFileEvent:
{
if (cmd.pctl != nullptr)
{
cmd.pctl->mutexcv.lock();
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
cmd.pctl->cv.notify_all();
cmd.pctl->mutexcv.unlock();
}
else
{
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
}
}
break; break;
case AE_ASYNC_OP::PostFunction: case AE_ASYNC_OP::PostFunction:
@ -175,19 +155,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostCppFunction: case AE_ASYNC_OP::PostCppFunction:
{ {
if (cmd.pctl != nullptr)
cmd.pctl->mutexcv.lock();
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock) if (cmd.fLock)
ulock.lock(); ulock.lock();
(*cmd.pfn)(); (*cmd.pfn)();
if (cmd.pctl != nullptr)
{
cmd.pctl->cv.notify_all();
cmd.pctl->mutexcv.unlock();
}
delete cmd.pfn; delete cmd.pfn;
} }
break; break;
@ -226,7 +198,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb)
} }
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData, int fSynchronous) aeFileProc *proc, void *clientData)
{ {
if (eventLoop == g_eventLoopThisThread) if (eventLoop == g_eventLoopThisThread)
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
@ -239,13 +211,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
cmd.mask = mask; cmd.mask = mask;
cmd.fproc = proc; cmd.fproc = proc;
cmd.clientData = clientData; cmd.clientData = clientData;
cmd.pctl = nullptr;
cmd.fLock = true; cmd.fLock = true;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
cmd.pctl->mutexcv.lock();
}
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd)) if (size != sizeof(cmd))
@ -255,16 +221,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
ret = AE_ERR; ret = AE_ERR;
} }
if (fSynchronous)
{
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
}
delete cmd.pctl;
}
return ret; return ret;
} }
@ -286,7 +242,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK; return AE_OK;
} }
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue) int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock, bool fForceQueue)
{ {
if (eventLoop == g_eventLoopThisThread && !fForceQueue) if (eventLoop == g_eventLoopThisThread && !fForceQueue)
{ {
@ -297,13 +253,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
aeCommand cmd = {}; aeCommand cmd = {};
cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.op = AE_ASYNC_OP::PostCppFunction;
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn); cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr;
cmd.fLock = fLock; cmd.fLock = fLock;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl;
cmd.pctl->mutexcv.lock();
}
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (!(!size || size == sizeof(cmd))) { if (!(!size || size == sizeof(cmd))) {
@ -314,17 +264,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
if (size == 0) if (size == 0)
return AE_ERR; return AE_ERR;
int ret = AE_OK; return AE_OK;
if (fSynchronous)
{
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
}
delete cmd.pctl;
}
return ret;
} }
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *aeCreateEventLoop(int setsize) {

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} // EXTERN C } // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock = true, bool fForceQueue = false);
extern "C" { extern "C" {
#endif #endif
void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeDeleteEventLoop(aeEventLoop *eventLoop);
@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData); aeFileProc *proc, void *clientData);
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData, int fSynchronous); aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);

View File

@ -5066,12 +5066,15 @@ void mvccrestoreCommand(client *c) {
setMvccTstamp(obj, mvcc); setMvccTstamp(obj, mvcc);
/* Create the key and set the TTL if any */ /* Create the key and set the TTL if any */
dbMerge(c->db,key,obj,true); if (dbMerge(c->db,key,obj,true)) {
if (expire >= 0) { if (expire >= 0) {
setExpire(c,c->db,key,nullptr,expire); setExpire(c,c->db,key,nullptr,expire);
} }
signalModifiedKey(c,c->db,key); signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
} else {
decrRefCount(obj);
}
addReply(c,shared.ok); addReply(c,shared.ok);
g_pserver->dirty++; g_pserver->dirty++;
} }

View File

@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
executeCronJobExpireHook(keyCopy, val); executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy); sdsfree(keyCopy);
decrRefCount(val); decrRefCount(val);
}, false, true /*fLock*/, true /*fForceQueue*/); }, true /*fLock*/, true /*fForceQueue*/);
} }
return; return;

View File

@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx); zfree(ctx);
} }
static bool g_fModuleThread = false; thread_local bool g_fModuleThread = false;
/* Acquire the server lock before executing a thread safe API call. /* Acquire the server lock before executing a thread safe API call.
* This is not needed for `RedisModule_Reply*` calls when there is * This is not needed for `RedisModule_Reply*` calls when there is
* a blocked client connected to the thread safe context. */ * a blocked client connected to the thread safe context. */
@ -5104,7 +5104,14 @@ void moduleAcquireGIL(int fServerThread) {
} }
else else
{ {
s_mutexModule.lock(); // It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
++s_cAcquisitionsModule; ++s_cAcquisitionsModule;
fModuleGILWlocked++; fModuleGILWlocked++;
} }
@ -5643,6 +5650,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
* (If the time it takes to execute 'callback' is negligible the two * (If the time it takes to execute 'callback' is negligible the two
* statements above mean the same) */ * statements above mean the same) */
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
static uint64_t pending_key;
static mstime_t pending_period = -1;
RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL); RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
timer->module = ctx->module; timer->module = ctx->module;
timer->callback = callback; timer->callback = callback;
@ -5661,21 +5671,27 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
} }
} }
bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one
if (pending_period < 0 || period < pending_period) {
pending_period = period;
pending_key = key;
}
/* We need to install the main event loop timer if it's not already /* We need to install the main event loop timer if it's not already
* installed, or we may need to refresh its period if we just installed * installed, or we may need to refresh its period if we just installed
* a timer that will expire sooner than any other else (i.e. the timer * a timer that will expire sooner than any other else (i.e. the timer
* we just installed is the first timer in the Timers rax). */ * we just installed is the first timer in the Timers rax). */
if (fNeedPost) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeTimer != -1) { if (aeTimer != -1) {
raxIterator ri; raxIterator ri;
raxStart(&ri,Timers); raxStart(&ri,Timers);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
raxNext(&ri); raxNext(&ri);
if (memcmp(ri.key,&key,sizeof(key)) == 0) { if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according /* This is the first key, we need to re-install the timer according
* to the just added event. */ * to the just added event. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
}, true /* synchronous */, false /* fLock */);
aeTimer = -1; aeTimer = -1;
} }
raxStop(&ri); raxStop(&ri);
@ -5684,9 +5700,11 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
/* If we have no main timer (the old one was invalidated, or this is the /* If we have no main timer (the old one was invalidated, or this is the
* first module timer we have), install one. */ * first module timer we have), install one. */
if (aeTimer == -1) { if (aeTimer == -1) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL);
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); }
}, true /* synchronous */, false /* fLock */);
pending_period = -1;
});
} }
return key; return key;

View File

@ -4087,7 +4087,7 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
std::lock_guard<decltype(this->lock)> lock(this->lock); std::lock_guard<decltype(this->lock)> lock(this->lock);
fn(this); fn(this);
--casyncOpsPending; --casyncOpsPending;
}, false, fLock) == AE_OK; }, fLock) == AE_OK;
} }
/*================================== Shutdown =============================== */ /*================================== Shutdown =============================== */

View File

@ -132,6 +132,7 @@ tags "modules" {
} }
$replica replicaof no one $replica replicaof no one
after 300
test {Test role-master hook} { test {Test role-master hook} {
assert_equal [r hooks.event_count role-replica] 1 assert_equal [r hooks.event_count role-replica] 1