Initial implementation of the CRON command

Former-commit-id: 3204a39ada15ec33ac7926dc8b8f0e1875b99acb
This commit is contained in:
John Sully 2020-01-21 19:50:28 -05:00
parent ac55fe6dac
commit 262b9a5d92
10 changed files with 113 additions and 3 deletions

View File

@ -213,7 +213,7 @@ endif
REDIS_SERVER_NAME=keydb-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark

View File

@ -31,6 +31,7 @@
*/
#include "server.h"
#include "cron.h"
void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
robj *keyobj = createStringObject(key,sdslen(key));
@ -121,6 +122,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
}
break;
case OBJ_CRON:
executeCronJobExpireHook(e.key(), val);
return;
case OBJ_LIST:
default:
serverAssert(false);

View File

@ -29,6 +29,7 @@
*/
#include "server.h"
#include "cron.h"
#include <math.h>
#include <ctype.h>
@ -369,6 +370,7 @@ void decrRefCount(robj_roptr o) {
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
case OBJ_CRON: freeCronObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o.unsafe_robjcast());

View File

@ -33,6 +33,7 @@
#include "endianconv.h"
#include "stream.h"
#include "storage.h"
#include "cron.h"
#include <math.h>
#include <sys/types.h>
@ -523,6 +524,11 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
}
}
sdsstring rdbLoadString(rio *rdb){
sds str = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
return sdsstring(str);
}
robj *rdbLoadStringObject(rio *rdb) {
return (robj*)rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
}
@ -657,6 +663,8 @@ int rdbSaveObjectType(rio *rdb, robj_roptr o) {
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
case OBJ_CRON:
return rdbSaveType(rdb,RDB_TYPE_CRON);
default:
serverPanic("Unknown object type");
}
@ -986,6 +994,17 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) {
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
} else if (o->type == OBJ_CRON) {
cronjob *job = (cronjob*)ptrFromObj(o);
rdbSaveRawString(rdb, (const unsigned char*)job->script.get(), job->script.size());
rdbSaveMillisecondTime(rdb, job->startTime);
rdbSaveMillisecondTime(rdb, job->interval);
rdbSaveLen(rdb, job->veckeys.size());
for (auto &key : job->veckeys)
rdbSaveRawString(rdb, (const unsigned char*)key.get(), key.size());
rdbSaveLen(rdb, job->vecargs.size());
for (auto &arg : job->vecargs)
rdbSaveRawString(rdb, (const unsigned char*)arg.get(), arg.size());
} else {
serverPanic("Unknown object type");
}
@ -1848,6 +1867,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
exit(1);
}
o = createModuleObject(mt,ptr);
} else if (rdbtype == RDB_TYPE_CRON) {
std::unique_ptr<cronjob> spjob = std::make_unique<cronjob>();
spjob->script = rdbLoadString(rdb);
spjob->startTime = rdbLoadMillisecondTime(rdb,RDB_VERSION);
spjob->interval = rdbLoadMillisecondTime(rdb,RDB_VERSION);
auto ckeys = rdbLoadLen(rdb,NULL);
for (uint64_t i = 0; i < ckeys; ++i)
spjob->veckeys.push_back(rdbLoadString(rdb));
auto cargs = rdbLoadLen(rdb,NULL);
for (uint64_t i = 0; i < cargs; ++i)
spjob->vecargs.push_back(rdbLoadString(rdb));
o = createObject(OBJ_CRON, spjob.release());
} else {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
}

View File

@ -91,10 +91,13 @@
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
#define RDB_TYPE_STREAM_LISTPACKS 15
/* KeyDB Specific Object Types */
#define RDB_TYPE_CRON 64
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15))
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15) || (t == RDB_TYPE_CRON))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */

View File

@ -396,6 +396,20 @@ public:
other.m_str = nullptr;
}
sdsstring &operator=(const sdsstring &other)
{
sdsfree(m_str);
m_str = sdsdup(other.m_str);
return *this;
}
sdsstring &operator=(sds other)
{
sdsfree(m_str);
m_str = sdsdup(other);
return *this;
}
~sdsstring()
{
sdsfree(m_str);

View File

@ -35,6 +35,7 @@
#include "latency.h"
#include "atomicvar.h"
#include "storage.h"
#include "cron.h"
#include <thread>
#include <time.h>
#include <signal.h>
@ -1023,7 +1024,11 @@ struct redisCommand redisCommandTable[] = {
{"rreplay",replicaReplayCommand,-3,
"read-only fast noprop",
0,NULL,0,0,0,0,0,0}
0,NULL,0,0,0,0,0,0},
{"cron",cronCommand,-5,
"write use-memory",
0,NULL,1,1,1,0,0,0},
};
/*============================ Utility functions ============================ */

View File

@ -683,6 +683,8 @@ public:
* encoding version. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
#define OBJ_CRON 7 /* CRON job */
/* Extract encver / signature from a module type ID. */
#define REDISMODULE_TYPE_ENCVER_BITS 10

View File

@ -36,6 +36,7 @@ set ::all_tests {
unit/aofrw
unit/acl
unit/rreplay
unit/cron
integration/block-repl
integration/replication
integration/replication-2

47
tests/unit/cron.tcl Normal file
View File

@ -0,0 +1,47 @@
start_server {tags {"CRON"}} {
test {cron singleshot past tense} {
r flushall
r cron testjob single 0 1 {redis.call("incr", "testkey")} 1 testkey
after 300
assert_equal 1 [r get testkey]
assert_equal 0 [r exists testjob]
}
test {cron repeat past tense next exec is in the future} {
r flushall
r cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
after 300
assert_equal 1 [r get testkey]
assert_equal 1 [r exists testjob]
r del testjob
}
test {cron repeat works} {
r flushall
r cron testjob repeat 0 600 {redis.call("incr","testkey")}
after 1000
assert_equal 2 [r get testkey]
}
test {cron overwrite works} {
r flushall
r cron testjob single 500 {redis.call("set","testkey","a")} 1 testkey
r cron testjob single 500 {redis.call("set","anotherkey","b")} 1 anotherkey
after 1000
assert_equal 0 [r exists testkey]
assert_equal b [r get anotherkey]
}
test {cron delete key stops job} {
r flushall
r cron testjob single 500 {redis.call("set","testkey","a")}
r del testjob
after 1000
assert_equal 0 [r exists testkey]
}
test {cron zero interval rejected} {
catch {r cron testjob single 0 0 {redis.call("incr","testkey")} 1 testkey} e
assert_match {ERR*} $e
}
}