From b7b8eb2ecb7ee6ba305771c5414050453e56a24c Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 21 Jan 2020 19:51:16 -0500 Subject: [PATCH] Missing cron files Former-commit-id: c983d57f739cc9b4b8271a6d069fad1b5a9d3dad --- src/cron.cpp | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/cron.h | 16 +++++++ 2 files changed, 142 insertions(+) create mode 100644 src/cron.cpp create mode 100644 src/cron.h diff --git a/src/cron.cpp b/src/cron.cpp new file mode 100644 index 000000000..72d26f5c3 --- /dev/null +++ b/src/cron.cpp @@ -0,0 +1,126 @@ +#include "server.h" +#include "cron.h" + +void freeCronObject(robj_roptr o) +{ + delete reinterpret_cast(ptrFromObj(o)); +} + +// CRON [name] [single shot] [optional: start] [delay] [script] [numkeys] [key N] [arg N] +void cronCommand(client *c) +{ + int arg_offset = 0; + static const int ARG_NAME = 1; + static const int ARG_SINGLESHOT = 2; + static const int ARG_EXPIRE = 3; + #define ARG_SCRIPT (4+arg_offset) + #define ARG_NUMKEYS (5+arg_offset) + #define ARG_KEYSTART (6+arg_offset) + + bool fSingleShot = false; + if (strcasecmp("single", szFromObj(c->argv[ARG_SINGLESHOT])) == 0) { + fSingleShot = true; + } else { + if (strcasecmp("repeat", szFromObj(c->argv[ARG_SINGLESHOT])) != 0) { + addReply(c, shared.syntaxerr); + return; + } + } + + long long interval; + if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK) + return; + + long long base = g_pserver->mstime; + if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) { + arg_offset++; + std::swap(base, interval); + } + + if (interval <= 0) + { + addReplyError(c, "interval must be positive"); + return; + } + + long numkeys = 0; + if (c->argc > ARG_NUMKEYS) + { + if (getLongFromObjectOrReply(c, c->argv[ARG_NUMKEYS], &numkeys, NULL) != C_OK) + return; + + if (c->argc < (6 + numkeys)) { + addReplyError(c, "Missing arguments or numkeys is too big"); + } + } + + std::unique_ptr spjob = std::make_unique(); + spjob->script = sdsstring(sdsdup(szFromObj(c->argv[ARG_SCRIPT]))); + spjob->interval = (uint64_t)interval; + spjob->startTime = (uint64_t)base; + spjob->fSingleShot = fSingleShot; + spjob->dbNum = c->db - g_pserver->db; + for (long i = 0; i < numkeys; ++i) + spjob->veckeys.emplace_back(sdsdup(szFromObj(c->argv[ARG_KEYSTART+i]))); + for (long i = ARG_KEYSTART + numkeys; i < c->argc; ++i) + spjob->vecargs.emplace_back(sdsdup(szFromObj(c->argv[i]))); + + robj *o = createObject(OBJ_CRON, spjob.release()); + setKey(c->db, c->argv[ARG_NAME], o); + // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. + setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); + addReply(c, shared.ok); +} + +void executeCronJobExpireHook(const char *key, robj *o) +{ + serverAssert(o->type == OBJ_CRON); + cronjob *job = (cronjob*)ptrFromObj(o); + + client *cFake = createClient(-1, IDX_EVENT_LOOP_MAIN); + cFake->lock.lock(); + cFake->authenticated = 1; + cFake->puser = nullptr; + selectDb(cFake, job->dbNum); + serverAssert(cFake->argc == 0); + + // Setup the args for the EVAL command + cFake->argc = 3 + job->veckeys.size() + job->vecargs.size(); + cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL); + cFake->argv[0] = createStringObject("EVAL", 4); + cFake->argv[1] = createStringObject(job->script.get(), job->script.size()); + cFake->argv[2] = createStringObjectFromLongLong(job->veckeys.size()); + for (size_t i = 0; i < job->veckeys.size(); ++i) + cFake->argv[3+i] = createStringObject(job->veckeys[i].get(), job->veckeys[i].size()); + for (size_t i = 0; i < job->vecargs.size(); ++i) + cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size()); + + evalCommand(cFake); + resetClient(cFake); + + robj *keyobj = createStringObject(key,sdslen(key)); + int dbId = job->dbNum; + if (job->fSingleShot) + { + dbSyncDelete(cFake->db, keyobj); + } + else + { + job->startTime += job->interval; + if (job->startTime < (uint64_t)g_pserver->mstime) + { + // If we are more than one interval in the past then fast forward to + // the first interval still in the future + auto delta = g_pserver->mstime - job->startTime; + auto multiple = (delta / job->interval)+1; + job->startTime += job->interval * multiple; + } + setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime + job->interval); + } + + notifyKeyspaceEvent(NOTIFY_KEYEVENT, "CRON Executed", keyobj, dbId); + decrRefCount(keyobj); + + // o is invalid at this point + freeClient(cFake); +} \ No newline at end of file diff --git a/src/cron.h b/src/cron.h new file mode 100644 index 000000000..8bac7f604 --- /dev/null +++ b/src/cron.h @@ -0,0 +1,16 @@ +#pragma once + +struct cronjob +{ + sdsstring script; + uint64_t interval; + uint64_t startTime; + std::vector veckeys; + std::vector vecargs; + int dbNum = 0; + bool fSingleShot = false; +}; + +void freeCronObject(robj_roptr o); +void executeCronJobExpireHook(const char *key, robj *o); +void cronCommand(client *c); \ No newline at end of file