Ensure CRON jobs run in a clean environment

Former-commit-id: b0e14683b2e655dc3aeb2f19b9227fc7fa24cc73
This commit is contained in:
John Sully 2020-06-05 21:35:47 -04:00
parent e2cd106d2d
commit d0d24e04f0
3 changed files with 23 additions and 13 deletions

View File

@ -285,9 +285,9 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK; return AE_OK;
} }
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock) int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue)
{ {
if (eventLoop == g_eventLoopThisThread) if (eventLoop == g_eventLoopThisThread && !fForceQueue)
{ {
fn(); fn();
return AE_OK; return AE_OK;

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} // EXTERN C } // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true); int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false);
extern "C" { extern "C" {
#endif #endif
void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -82,6 +82,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
robj objKey; robj objKey;
initStaticStringObject(objKey, (char*)e.key()); initStaticStringObject(objKey, (char*)e.key());
bool fTtlChanged = false;
while (!pfat->FEmpty()) while (!pfat->FEmpty())
{ {
@ -128,7 +129,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
break; break;
case OBJ_CRON: case OBJ_CRON:
executeCronJobExpireHook(e.key(), val); {
sds keyCopy = sdsdup(e.key());
incrRefCount(val);
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{
executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy);
decrRefCount(val);
}, false, true /*fLock*/, true /*fForceQueue*/);
}
return; return;
case OBJ_LIST: case OBJ_LIST:
@ -141,11 +150,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
pfat->popfrontExpireEntry(); pfat->popfrontExpireEntry();
fTtlChanged = true;
} }
if (deleted) if (!pfat->FEmpty() && fTtlChanged)
{
if (!pfat->FEmpty())
{ {
// We need to resort the expire entry since it may no longer be in the correct position // We need to resort the expire entry since it may no longer be in the correct position
auto itr = db->setexpire->find(e.key()); auto itr = db->setexpire->find(e.key());
@ -154,6 +162,8 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
db->setexpire->insert(eT); db->setexpire->insert(eT);
} }
if (deleted)
{
switch (val->type) switch (val->type)
{ {
case OBJ_SET: case OBJ_SET: