From d0d24e04f0278650cf1c30aa7a0d7ac76fa02eb3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 5 Jun 2020 21:35:47 -0400 Subject: [PATCH] Ensure CRON jobs run in a clean environment Former-commit-id: b0e14683b2e655dc3aeb2f19b9227fc7fa24cc73 --- src/ae.cpp | 4 ++-- src/ae.h | 2 +- src/expire.cpp | 30 ++++++++++++++++++++---------- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index ab5eeb163..7387e2a7f 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -285,9 +285,9 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) { - if (eventLoop == g_eventLoopThisThread) + if (eventLoop == g_eventLoopThisThread && !fForceQueue) { fn(); return AE_OK; diff --git a/src/ae.h b/src/ae.h index 3f1ddbf06..fdd444d3a 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/expire.cpp b/src/expire.cpp index 123f55817..b7f648117 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -82,6 +82,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { robj objKey; initStaticStringObject(objKey, (char*)e.key()); + bool fTtlChanged = false; while (!pfat->FEmpty()) { @@ -128,7 +129,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { break; case OBJ_CRON: - executeCronJobExpireHook(e.key(), val); + { + sds keyCopy = sdsdup(e.key()); + incrRefCount(val); + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{ + executeCronJobExpireHook(keyCopy, val); + sdsfree(keyCopy); + decrRefCount(val); + }, false, true /*fLock*/, true /*fForceQueue*/); + } return; case OBJ_LIST: @@ -141,19 +150,20 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); pfat->popfrontExpireEntry(); + fTtlChanged = true; + } + + if (!pfat->FEmpty() && fTtlChanged) + { + // We need to resort the expire entry since it may no longer be in the correct position + auto itr = db->setexpire->find(e.key()); + expireEntry eT = std::move(e); + db->setexpire->erase(itr); + db->setexpire->insert(eT); } if (deleted) { - if (!pfat->FEmpty()) - { - // We need to resort the expire entry since it may no longer be in the correct position - auto itr = db->setexpire->find(e.key()); - expireEntry eT = std::move(e); - db->setexpire->erase(itr); - db->setexpire->insert(eT); - } - switch (val->type) { case OBJ_SET: