Simplify the handling of CreateTimer

Former-commit-id: 7b4e25008a352bd425582a3e60b26894826af626
This commit is contained in:
John Sully 2021-01-31 06:31:51 +00:00
parent 55659b1569
commit 23495c22a5

View File

@ -5579,10 +5579,6 @@ void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
static rax *Timers; /* The radix tree of all the timers sorted by expire. */ static rax *Timers; /* The radix tree of all the timers sorted by expire. */
long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */ long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */
static bool aeTimerSet = false; /* Checks whether the main event loop timer is set */
static mstime_t aeTimerPeriod; /* The period of the aeTimer */
static bool aeTimerPending = false; /* Keeps track of if there is a aePostFunction in flight
* that would modify the current main event loop timer */
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
@ -5654,6 +5650,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
* (If the time it takes to execute 'callback' is negligible the two * (If the time it takes to execute 'callback' is negligible the two
* statements above mean the same) */ * statements above mean the same) */
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
static uint64_t pending_key;
static mstime_t pending_period = -1;
RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL); RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
timer->module = ctx->module; timer->module = ctx->module;
timer->callback = callback; timer->callback = callback;
@ -5671,49 +5670,42 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
expiretime++; expiretime++;
} }
} }
bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one
if (pending_period < 0 || period < pending_period) {
pending_period = period;
pending_key = key;
}
/* We need to install the main event loop timer if it's not already /* We need to install the main event loop timer if it's not already
* installed, or we may need to refresh its period if we just installed * installed, or we may need to refresh its period if we just installed
* a timer that will expire sooner than any other else (i.e. the timer * a timer that will expire sooner than any other else (i.e. the timer
* we just installed is the first timer in the Timers rax). */ * we just installed is the first timer in the Timers rax). */
bool isFirstExpiry = false; if (fNeedPost) {
if (aeTimerSet){ aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
raxIterator ri; if (aeTimer != -1) {
raxStart(&ri, Timers); raxIterator ri;
raxSeek(&ri,"^",NULL,0); raxStart(&ri,Timers);
raxNext(&ri); raxSeek(&ri,"^",NULL,0);
if (memcmp(ri.key,&key,sizeof(key)) == 0) raxNext(&ri);
/* This is the first key, we need to re-install the timer according if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) {
* to the just added event. */ /* This is the first key, we need to re-install the timer according
isFirstExpiry = true; * to the just added event. */
raxStop(&ri);
}
/* Now that either we know that we either need to refresh the period of the
* recently installed timer, or that there is no timer to begin with, we must post
* a function call to install the main event timer. */
if (isFirstExpiry || !aeTimerSet){
/* We set the period for the posted function in a global variable
* That is so if a function has been posted but not executed, and another
* function with a different period were to be posted, we can just update
* the period instead of posting a new function. */
aeTimerPeriod = period;
aeTimerSet = true;
if (!aeTimerPending) {
/* We should only have one aePostFunction in flight at a time, aeTimerPending
* keeps track of that. */
aeTimerPending = true;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [isFirstExpiry]{
/* If we deemed that this timer required a reinstall, delete it before proceeding
* to the install */
if (isFirstExpiry)
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
/* If we have no main timer (the old one was invalidated, or this is the aeTimer = -1;
* first module timer we have), install one. */ }
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimerPeriod,moduleTimerHandler,NULL,NULL); raxStop(&ri);
aeTimerPending = false; }
});
} /* If we have no main timer (the old one was invalidated, or this is the
} * first module timer we have), install one. */
if (aeTimer == -1) {
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL);
}
pending_period = -1;
});
}
return key; return key;
} }