From 262b9a5d92c93856909eb6625ab1dedbcc32adf5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 21 Jan 2020 19:50:28 -0500 Subject: [PATCH] Initial implementation of the CRON command Former-commit-id: 3204a39ada15ec33ac7926dc8b8f0e1875b99acb --- src/Makefile | 2 +- src/expire.cpp | 5 +++++ src/object.cpp | 2 ++ src/rdb.cpp | 31 ++++++++++++++++++++++++++++ src/rdb.h | 5 ++++- src/sds.h | 14 +++++++++++++ src/server.cpp | 7 ++++++- src/server.h | 2 ++ tests/test_helper.tcl | 1 + tests/unit/cron.tcl | 47 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 tests/unit/cron.tcl diff --git a/src/Makefile b/src/Makefile index cd411f9cd..a0e48831c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -213,7 +213,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/expire.cpp b/src/expire.cpp index b311a5673..66a05e4ef 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -31,6 +31,7 @@ */ #include "server.h" +#include "cron.h" void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { robj *keyobj = createStringObject(key,sdslen(key)); @@ -121,6 +122,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { } break; + case OBJ_CRON: + executeCronJobExpireHook(e.key(), val); + return; + case OBJ_LIST: default: serverAssert(false); diff --git a/src/object.cpp b/src/object.cpp index 2a9c3f215..14abfcfbc 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -29,6 +29,7 @@ */ #include "server.h" +#include "cron.h" #include #include @@ -369,6 +370,7 @@ void decrRefCount(robj_roptr o) { case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; + case OBJ_CRON: freeCronObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o.unsafe_robjcast()); diff --git a/src/rdb.cpp b/src/rdb.cpp index b76afdfd7..48205f430 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -33,6 +33,7 @@ #include "endianconv.h" #include "stream.h" #include "storage.h" +#include "cron.h" #include #include @@ -523,6 +524,11 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { } } +sdsstring rdbLoadString(rio *rdb){ + sds str = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + return sdsstring(str); +} + robj *rdbLoadStringObject(rio *rdb) { return (robj*)rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); } @@ -657,6 +663,8 @@ int rdbSaveObjectType(rio *rdb, robj_roptr o) { return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); + case OBJ_CRON: + return rdbSaveType(rdb,RDB_TYPE_CRON); default: serverPanic("Unknown object type"); } @@ -986,6 +994,17 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) { zfree(io.ctx); } return io.error ? -1 : (ssize_t)io.bytes; + } else if (o->type == OBJ_CRON) { + cronjob *job = (cronjob*)ptrFromObj(o); + rdbSaveRawString(rdb, (const unsigned char*)job->script.get(), job->script.size()); + rdbSaveMillisecondTime(rdb, job->startTime); + rdbSaveMillisecondTime(rdb, job->interval); + rdbSaveLen(rdb, job->veckeys.size()); + for (auto &key : job->veckeys) + rdbSaveRawString(rdb, (const unsigned char*)key.get(), key.size()); + rdbSaveLen(rdb, job->vecargs.size()); + for (auto &arg : job->vecargs) + rdbSaveRawString(rdb, (const unsigned char*)arg.get(), arg.size()); } else { serverPanic("Unknown object type"); } @@ -1848,6 +1867,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { exit(1); } o = createModuleObject(mt,ptr); + } else if (rdbtype == RDB_TYPE_CRON) { + std::unique_ptr spjob = std::make_unique(); + spjob->script = rdbLoadString(rdb); + spjob->startTime = rdbLoadMillisecondTime(rdb,RDB_VERSION); + spjob->interval = rdbLoadMillisecondTime(rdb,RDB_VERSION); + auto ckeys = rdbLoadLen(rdb,NULL); + for (uint64_t i = 0; i < ckeys; ++i) + spjob->veckeys.push_back(rdbLoadString(rdb)); + auto cargs = rdbLoadLen(rdb,NULL); + for (uint64_t i = 0; i < cargs; ++i) + spjob->vecargs.push_back(rdbLoadString(rdb)); + o = createObject(OBJ_CRON, spjob.release()); } else { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); } diff --git a/src/rdb.h b/src/rdb.h index edf43d422..b7fc0b71d 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -91,10 +91,13 @@ #define RDB_TYPE_HASH_ZIPLIST 13 #define RDB_TYPE_LIST_QUICKLIST 14 #define RDB_TYPE_STREAM_LISTPACKS 15 + +/* KeyDB Specific Object Types */ +#define RDB_TYPE_CRON 64 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15)) +#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15) || (t == RDB_TYPE_CRON)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/sds.h b/src/sds.h index 23a11afa4..3315ed9b1 100644 --- a/src/sds.h +++ b/src/sds.h @@ -396,6 +396,20 @@ public: other.m_str = nullptr; } + sdsstring &operator=(const sdsstring &other) + { + sdsfree(m_str); + m_str = sdsdup(other.m_str); + return *this; + } + + sdsstring &operator=(sds other) + { + sdsfree(m_str); + m_str = sdsdup(other); + return *this; + } + ~sdsstring() { sdsfree(m_str); diff --git a/src/server.cpp b/src/server.cpp index 12dff6a85..c6f360905 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -35,6 +35,7 @@ #include "latency.h" #include "atomicvar.h" #include "storage.h" +#include "cron.h" #include #include #include @@ -1023,7 +1024,11 @@ struct redisCommand redisCommandTable[] = { {"rreplay",replicaReplayCommand,-3, "read-only fast noprop", - 0,NULL,0,0,0,0,0,0} + 0,NULL,0,0,0,0,0,0}, + + {"cron",cronCommand,-5, + "write use-memory", + 0,NULL,1,1,1,0,0,0}, }; /*============================ Utility functions ============================ */ diff --git a/src/server.h b/src/server.h index 6b8903092..781f043f0 100644 --- a/src/server.h +++ b/src/server.h @@ -683,6 +683,8 @@ public: * encoding version. */ #define OBJ_MODULE 5 /* Module object. */ #define OBJ_STREAM 6 /* Stream object. */ +#define OBJ_CRON 7 /* CRON job */ + /* Extract encver / signature from a module type ID. */ #define REDISMODULE_TYPE_ENCVER_BITS 10 diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index a06afca3e..ec1d7cd31 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -36,6 +36,7 @@ set ::all_tests { unit/aofrw unit/acl unit/rreplay + unit/cron integration/block-repl integration/replication integration/replication-2 diff --git a/tests/unit/cron.tcl b/tests/unit/cron.tcl new file mode 100644 index 000000000..554df9328 --- /dev/null +++ b/tests/unit/cron.tcl @@ -0,0 +1,47 @@ +start_server {tags {"CRON"}} { + test {cron singleshot past tense} { + r flushall + r cron testjob single 0 1 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [r get testkey] + assert_equal 0 [r exists testjob] + } + + test {cron repeat past tense next exec is in the future} { + r flushall + r cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [r get testkey] + assert_equal 1 [r exists testjob] + r del testjob + } + + test {cron repeat works} { + r flushall + r cron testjob repeat 0 600 {redis.call("incr","testkey")} + after 1000 + assert_equal 2 [r get testkey] + } + + test {cron overwrite works} { + r flushall + r cron testjob single 500 {redis.call("set","testkey","a")} 1 testkey + r cron testjob single 500 {redis.call("set","anotherkey","b")} 1 anotherkey + after 1000 + assert_equal 0 [r exists testkey] + assert_equal b [r get anotherkey] + } + + test {cron delete key stops job} { + r flushall + r cron testjob single 500 {redis.call("set","testkey","a")} + r del testjob + after 1000 + assert_equal 0 [r exists testkey] + } + + test {cron zero interval rejected} { + catch {r cron testjob single 0 0 {redis.call("incr","testkey")} 1 testkey} e + assert_match {ERR*} $e + } +}