Merge branch 'unstable' into RELEASE_5
Former-commit-id: 6d00a1a5cf86f69bc9204973adb67a4e215260bd
This commit is contained in:
commit
c27596230d
@ -213,7 +213,7 @@ endif
|
|||||||
|
|
||||||
REDIS_SERVER_NAME=keydb-server
|
REDIS_SERVER_NAME=keydb-server
|
||||||
REDIS_SENTINEL_NAME=keydb-sentinel
|
REDIS_SENTINEL_NAME=keydb-sentinel
|
||||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ)
|
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o $(ASM_OBJ)
|
||||||
REDIS_CLI_NAME=keydb-cli
|
REDIS_CLI_NAME=keydb-cli
|
||||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
|
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
|
||||||
REDIS_BENCHMARK_NAME=keydb-benchmark
|
REDIS_BENCHMARK_NAME=keydb-benchmark
|
||||||
|
15
src/ae.cpp
15
src/ae.cpp
@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
|||||||
cmd.proc = proc;
|
cmd.proc = proc;
|
||||||
cmd.clientData = arg;
|
cmd.clientData = arg;
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
return AE_OK;
|
return AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
AE_ASSERT(size == sizeof(cmd));
|
||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
@ -387,10 +390,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) {
|
|||||||
aeApiFree(eventLoop);
|
aeApiFree(eventLoop);
|
||||||
zfree(eventLoop->events);
|
zfree(eventLoop->events);
|
||||||
zfree(eventLoop->fired);
|
zfree(eventLoop->fired);
|
||||||
zfree(eventLoop);
|
|
||||||
fastlock_free(&eventLoop->flock);
|
fastlock_free(&eventLoop->flock);
|
||||||
close(eventLoop->fdCmdRead);
|
close(eventLoop->fdCmdRead);
|
||||||
close(eventLoop->fdCmdWrite);
|
close(eventLoop->fdCmdWrite);
|
||||||
|
|
||||||
|
auto *te = eventLoop->timeEventHead;
|
||||||
|
while (te)
|
||||||
|
{
|
||||||
|
auto *teNext = te->next;
|
||||||
|
zfree(te);
|
||||||
|
te = teNext;
|
||||||
|
}
|
||||||
|
zfree(eventLoop);
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
||||||
|
33
src/aof.cpp
33
src/aof.cpp
@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void installAofRewriteEvent()
|
||||||
|
{
|
||||||
|
serverTL->fRetrySetAofEvent = false;
|
||||||
|
if (!g_pserver->aof_rewrite_pending) {
|
||||||
|
g_pserver->aof_rewrite_pending = true;
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
||||||
|
g_pserver->aof_rewrite_pending = false;
|
||||||
|
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
||||||
|
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
||||||
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
serverTL->fRetrySetAofEvent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
||||||
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
||||||
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
|
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
|
||||||
@ -165,14 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
|||||||
|
|
||||||
/* Install a file event to send data to the rewrite child if there is
|
/* Install a file event to send data to the rewrite child if there is
|
||||||
* not one already. */
|
* not one already. */
|
||||||
if (!g_pserver->aof_rewrite_pending) {
|
installAofRewriteEvent();
|
||||||
g_pserver->aof_rewrite_pending = true;
|
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
|
||||||
g_pserver->aof_rewrite_pending = false;
|
|
||||||
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
|
||||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
||||||
@ -348,6 +356,9 @@ void flushAppendOnlyFile(int force) {
|
|||||||
int sync_in_progress = 0;
|
int sync_in_progress = 0;
|
||||||
mstime_t latency;
|
mstime_t latency;
|
||||||
|
|
||||||
|
if (serverTL->fRetrySetAofEvent)
|
||||||
|
installAofRewriteEvent();
|
||||||
|
|
||||||
if (sdslen(g_pserver->aof_buf) == 0) {
|
if (sdslen(g_pserver->aof_buf) == 0) {
|
||||||
/* Check if we need to do fsync even the aof buffer is empty,
|
/* Check if we need to do fsync even the aof buffer is empty,
|
||||||
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
|
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
|
||||||
@ -1563,16 +1574,18 @@ error:
|
|||||||
|
|
||||||
void aofClosePipes(void) {
|
void aofClosePipes(void) {
|
||||||
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
||||||
aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
||||||
close (fdAofAckPipe);
|
close (fdAofAckPipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
|
|
||||||
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
||||||
close(fdAofWritePipe);
|
close(fdAofWritePipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
g_pserver->aof_pipe_write_data_to_child = -1;
|
g_pserver->aof_pipe_write_data_to_child = -1;
|
||||||
|
|
||||||
close(g_pserver->aof_pipe_read_data_from_parent);
|
close(g_pserver->aof_pipe_read_data_from_parent);
|
||||||
|
@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) {
|
|||||||
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
||||||
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100)
|
||||||
|
{
|
||||||
|
// Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server)
|
||||||
|
// we can increase the grouping of clients on a single thread within reason
|
||||||
|
cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes);
|
||||||
|
cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200);
|
||||||
|
serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold);
|
||||||
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
fmterr:
|
fmterr:
|
||||||
@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) {
|
|||||||
if (link->node)
|
if (link->node)
|
||||||
link->node->link = NULL;
|
link->node->link = NULL;
|
||||||
link->node = nullptr;
|
link->node = nullptr;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||||
freeClusterLink(link);
|
freeClusterLink(link);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
|
@ -405,7 +405,7 @@ void loadServerConfigFromString(char *config) {
|
|||||||
} else if ((!strcasecmp(argv[0],"slaveof") ||
|
} else if ((!strcasecmp(argv[0],"slaveof") ||
|
||||||
!strcasecmp(argv[0],"replicaof")) && argc == 3) {
|
!strcasecmp(argv[0],"replicaof")) && argc == 3) {
|
||||||
slaveof_linenum = linenum;
|
slaveof_linenum = linenum;
|
||||||
replicationAddMaster(sdsnew(argv[1]), atoi(argv[2]));
|
replicationAddMaster(argv[1], atoi(argv[2]));
|
||||||
} else if ((!strcasecmp(argv[0],"repl-ping-slave-period") ||
|
} else if ((!strcasecmp(argv[0],"repl-ping-slave-period") ||
|
||||||
!strcasecmp(argv[0],"repl-ping-replica-period")) &&
|
!strcasecmp(argv[0],"repl-ping-replica-period")) &&
|
||||||
argc == 2)
|
argc == 2)
|
||||||
@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) {
|
|||||||
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
||||||
cserver.fUsePro = true;
|
cserver.fUsePro = true;
|
||||||
break;
|
break;
|
||||||
|
} else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) {
|
||||||
|
cserver.thread_min_client_threshold = atoi(argv[1]);
|
||||||
|
if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) {
|
||||||
|
err = "min-thread-client must be between 0 and 400"; goto loaderr;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||||
}
|
}
|
||||||
|
135
src/cron.cpp
Normal file
135
src/cron.cpp
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
#include "server.h"
|
||||||
|
#include "cron.h"
|
||||||
|
|
||||||
|
void freeCronObject(robj_roptr o)
|
||||||
|
{
|
||||||
|
delete reinterpret_cast<const cronjob*>(ptrFromObj(o));
|
||||||
|
}
|
||||||
|
|
||||||
|
// CRON [name] [single shot] [optional: start] [delay] [script] [numkeys] [key N] [arg N]
|
||||||
|
void cronCommand(client *c)
|
||||||
|
{
|
||||||
|
int arg_offset = 0;
|
||||||
|
static const int ARG_NAME = 1;
|
||||||
|
static const int ARG_SINGLESHOT = 2;
|
||||||
|
static const int ARG_EXPIRE = 3;
|
||||||
|
#define ARG_SCRIPT (4+arg_offset)
|
||||||
|
#define ARG_NUMKEYS (5+arg_offset)
|
||||||
|
#define ARG_KEYSTART (6+arg_offset)
|
||||||
|
|
||||||
|
bool fSingleShot = false;
|
||||||
|
if (strcasecmp("single", szFromObj(c->argv[ARG_SINGLESHOT])) == 0) {
|
||||||
|
fSingleShot = true;
|
||||||
|
} else {
|
||||||
|
if (strcasecmp("repeat", szFromObj(c->argv[ARG_SINGLESHOT])) != 0) {
|
||||||
|
addReply(c, shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long long interval;
|
||||||
|
if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK)
|
||||||
|
return;
|
||||||
|
|
||||||
|
long long base = g_pserver->mstime;
|
||||||
|
if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) {
|
||||||
|
arg_offset++;
|
||||||
|
std::swap(base, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (interval <= 0)
|
||||||
|
{
|
||||||
|
addReplyError(c, "interval must be positive");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long numkeys = 0;
|
||||||
|
if (c->argc > ARG_NUMKEYS)
|
||||||
|
{
|
||||||
|
if (getLongFromObjectOrReply(c, c->argv[ARG_NUMKEYS], &numkeys, NULL) != C_OK)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (c->argc < (6 + numkeys)) {
|
||||||
|
addReplyError(c, "Missing arguments or numkeys is too big");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<cronjob> spjob = std::make_unique<cronjob>();
|
||||||
|
spjob->script = sdsstring(sdsdup(szFromObj(c->argv[ARG_SCRIPT])));
|
||||||
|
spjob->interval = (uint64_t)interval;
|
||||||
|
spjob->startTime = (uint64_t)base;
|
||||||
|
spjob->fSingleShot = fSingleShot;
|
||||||
|
spjob->dbNum = c->db - g_pserver->db;
|
||||||
|
for (long i = 0; i < numkeys; ++i)
|
||||||
|
spjob->veckeys.emplace_back(sdsdup(szFromObj(c->argv[ARG_KEYSTART+i])));
|
||||||
|
for (long i = ARG_KEYSTART + numkeys; i < c->argc; ++i)
|
||||||
|
spjob->vecargs.emplace_back(sdsdup(szFromObj(c->argv[i])));
|
||||||
|
|
||||||
|
robj *o = createObject(OBJ_CRON, spjob.release());
|
||||||
|
setKey(c->db, c->argv[ARG_NAME], o);
|
||||||
|
decrRefCount(o);
|
||||||
|
// use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it.
|
||||||
|
setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval);
|
||||||
|
addReply(c, shared.ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
void executeCronJobExpireHook(const char *key, robj *o)
|
||||||
|
{
|
||||||
|
serverAssert(o->type == OBJ_CRON);
|
||||||
|
cronjob *job = (cronjob*)ptrFromObj(o);
|
||||||
|
|
||||||
|
client *cFake = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
|
cFake->lock.lock();
|
||||||
|
cFake->authenticated = 1;
|
||||||
|
cFake->puser = nullptr;
|
||||||
|
selectDb(cFake, job->dbNum);
|
||||||
|
serverAssert(cFake->argc == 0);
|
||||||
|
|
||||||
|
// Setup the args for the EVAL command
|
||||||
|
cFake->argc = 3 + job->veckeys.size() + job->vecargs.size();
|
||||||
|
cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL);
|
||||||
|
cFake->argv[0] = createStringObject("EVAL", 4);
|
||||||
|
cFake->argv[1] = createStringObject(job->script.get(), job->script.size());
|
||||||
|
cFake->argv[2] = createStringObjectFromLongLong(job->veckeys.size());
|
||||||
|
for (size_t i = 0; i < job->veckeys.size(); ++i)
|
||||||
|
cFake->argv[3+i] = createStringObject(job->veckeys[i].get(), job->veckeys[i].size());
|
||||||
|
for (size_t i = 0; i < job->vecargs.size(); ++i)
|
||||||
|
cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size());
|
||||||
|
|
||||||
|
evalCommand(cFake);
|
||||||
|
resetClient(cFake);
|
||||||
|
|
||||||
|
robj *keyobj = createStringObject(key,sdslen(key));
|
||||||
|
int dbId = job->dbNum;
|
||||||
|
if (job->fSingleShot)
|
||||||
|
{
|
||||||
|
dbSyncDelete(cFake->db, keyobj);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
job->startTime += job->interval;
|
||||||
|
if (job->startTime < (uint64_t)g_pserver->mstime)
|
||||||
|
{
|
||||||
|
// If we are more than one interval in the past then fast forward to
|
||||||
|
// the first interval still in the future. If startTime wasn't zero align
|
||||||
|
// this to the original startTime, if it was zero align to now
|
||||||
|
if (job->startTime == job->interval)
|
||||||
|
{ // startTime was 0
|
||||||
|
job->startTime = g_pserver->mstime + job->interval;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto delta = g_pserver->mstime - job->startTime;
|
||||||
|
auto multiple = (delta / job->interval)+1;
|
||||||
|
job->startTime += job->interval * multiple;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
notifyKeyspaceEvent(NOTIFY_KEYEVENT, "CRON Executed", keyobj, dbId);
|
||||||
|
decrRefCount(keyobj);
|
||||||
|
|
||||||
|
// o is invalid at this point
|
||||||
|
freeClient(cFake);
|
||||||
|
}
|
16
src/cron.h
Normal file
16
src/cron.h
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
struct cronjob
|
||||||
|
{
|
||||||
|
sdsstring script;
|
||||||
|
uint64_t interval;
|
||||||
|
uint64_t startTime;
|
||||||
|
std::vector<sdsstring> veckeys;
|
||||||
|
std::vector<sdsstring> vecargs;
|
||||||
|
int dbNum = 0;
|
||||||
|
bool fSingleShot = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
void freeCronObject(robj_roptr o);
|
||||||
|
void executeCronJobExpireHook(const char *key, robj *o);
|
||||||
|
void cronCommand(client *c);
|
13
src/db.cpp
13
src/db.cpp
@ -177,9 +177,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
|||||||
* Returns the linked value object if the key exists or NULL if the key
|
* Returns the linked value object if the key exists or NULL if the key
|
||||||
* does not exist in the specified DB. */
|
* does not exist in the specified DB. */
|
||||||
robj *lookupKeyWrite(redisDb *db, robj *key) {
|
robj *lookupKeyWrite(redisDb *db, robj *key) {
|
||||||
|
expireIfNeeded(db,key);
|
||||||
robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC);
|
robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC);
|
||||||
if (expireIfNeeded(db,key))
|
|
||||||
o = NULL;
|
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -978,7 +977,7 @@ void shutdownCommand(client *c) {
|
|||||||
* Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
|
* Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
|
||||||
if (g_pserver->loading || g_pserver->sentinel_mode)
|
if (g_pserver->loading || g_pserver->sentinel_mode)
|
||||||
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
|
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
|
||||||
if (prepareForShutdown(flags) == C_OK) exit(0);
|
if (prepareForShutdown(flags) == C_OK) throw ShutdownException();
|
||||||
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
|
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1306,6 +1305,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
|
|||||||
rememberSlaveKeyWithExpire(db,key);
|
rememberSlaveKeyWithExpire(db,key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redisDb::~redisDb()
|
||||||
|
{
|
||||||
|
dictRelease(watched_keys);
|
||||||
|
dictRelease(ready_keys);
|
||||||
|
dictRelease(blocking_keys);
|
||||||
|
listRelease(defrag_later);
|
||||||
|
}
|
||||||
|
|
||||||
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
|
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
|
||||||
{
|
{
|
||||||
dictEntry *kde;
|
dictEntry *kde;
|
||||||
|
@ -685,8 +685,26 @@ NULL
|
|||||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
|
||||||
stringmatchlen_fuzz_test();
|
stringmatchlen_fuzz_test();
|
||||||
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
||||||
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) {
|
||||||
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
|
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
|
||||||
|
if (!strcasecmp(szFromObj(c->argv[2]), "yes"))
|
||||||
|
{
|
||||||
|
redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL);
|
||||||
|
mi->master = c;
|
||||||
|
listAddNodeHead(g_pserver->masters, mi);
|
||||||
|
}
|
||||||
|
else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset
|
||||||
|
{
|
||||||
|
serverAssert(c->flags & CLIENT_MASTER);
|
||||||
|
if (listLength(g_pserver->masters))
|
||||||
|
{
|
||||||
|
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
|
||||||
|
serverAssert(mi->master == c);
|
||||||
|
listDelNode(g_pserver->masters, listFirst(g_pserver->masters));
|
||||||
|
zfree(mi);
|
||||||
|
}
|
||||||
|
c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY);
|
||||||
|
}
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
} else {
|
} else {
|
||||||
addReplySubcommandSyntaxError(c);
|
addReplySubcommandSyntaxError(c);
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
#include "cron.h"
|
||||||
|
|
||||||
void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
|
void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
|
||||||
robj *keyobj = createStringObject(key,sdslen(key));
|
robj *keyobj = createStringObject(key,sdslen(key));
|
||||||
@ -121,6 +122,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case OBJ_CRON:
|
||||||
|
executeCronJobExpireHook(e.key(), val);
|
||||||
|
return;
|
||||||
|
|
||||||
case OBJ_LIST:
|
case OBJ_LIST:
|
||||||
default:
|
default:
|
||||||
serverAssert(false);
|
serverAssert(false);
|
||||||
|
@ -74,6 +74,10 @@ extern int g_fInCrash;
|
|||||||
#define __has_feature(x) 0
|
#define __has_feature(x) 0
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifdef __linux__
|
||||||
|
extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex);
|
||||||
|
#endif
|
||||||
|
|
||||||
#if __has_feature(thread_sanitizer)
|
#if __has_feature(thread_sanitizer)
|
||||||
|
|
||||||
/* Report that a lock has been created at address "lock". */
|
/* Report that a lock has been created at address "lock". */
|
||||||
@ -206,6 +210,11 @@ DeadlockDetector g_dlock;
|
|||||||
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
|
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
|
||||||
uint64_t g_longwaits = 0;
|
uint64_t g_longwaits = 0;
|
||||||
|
|
||||||
|
extern "C" void fastlock_panic(struct fastlock *lock)
|
||||||
|
{
|
||||||
|
_serverPanic(__FILE__, __LINE__, "fastlock lock/unlock mismatch for: %s", lock->szName);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t fastlock_getlongwaitcount()
|
uint64_t fastlock_getlongwaitcount()
|
||||||
{
|
{
|
||||||
uint64_t rval;
|
uint64_t rval;
|
||||||
@ -290,7 +299,7 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
|||||||
|
|
||||||
#if defined(__i386__) || defined(__amd64__)
|
#if defined(__i386__) || defined(__amd64__)
|
||||||
__asm__ __volatile__ ("pause");
|
__asm__ __volatile__ ("pause");
|
||||||
#elif defined(__arm__)
|
#elif defined(__aarch64__)
|
||||||
__asm__ __volatile__ ("yield");
|
__asm__ __volatile__ ("yield");
|
||||||
#endif
|
#endif
|
||||||
if ((++cloops % 0x100000) == 0)
|
if ((++cloops % 0x100000) == 0)
|
||||||
@ -326,7 +335,7 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak)
|
|||||||
|
|
||||||
struct ticket ticket_expect { { { active, active } } };
|
struct ticket ticket_expect { { { active, active } } };
|
||||||
struct ticket ticket_setiflocked { { { active, next } } };
|
struct ticket ticket_setiflocked { { { active, next } } };
|
||||||
if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, fWeak /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
|
if (__atomic_compare_exchange(&lock->m_ticket.u, &ticket_expect.u, &ticket_setiflocked.u, fWeak /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
|
||||||
{
|
{
|
||||||
lock->m_depth = 1;
|
lock->m_depth = 1;
|
||||||
tid = gettid();
|
tid = gettid();
|
||||||
@ -337,31 +346,6 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef __linux__
|
|
||||||
#define ROL32(v, shift) ((v << shift) | (v >> (32-shift)))
|
|
||||||
void unlock_futex(struct fastlock *lock, uint16_t ifutex)
|
|
||||||
{
|
|
||||||
unsigned mask = (1U << (ifutex % 32));
|
|
||||||
unsigned futexT;
|
|
||||||
__atomic_load(&lock->futex, &futexT, __ATOMIC_RELAXED);
|
|
||||||
futexT &= mask;
|
|
||||||
|
|
||||||
if (futexT == 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
__atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE);
|
|
||||||
futexT &= mask;
|
|
||||||
if (!futexT)
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern "C" void fastlock_unlock(struct fastlock *lock)
|
extern "C" void fastlock_unlock(struct fastlock *lock)
|
||||||
{
|
{
|
||||||
--lock->m_depth;
|
--lock->m_depth;
|
||||||
@ -384,6 +368,26 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifdef __linux__
|
||||||
|
#define ROL32(v, shift) ((v << shift) | (v >> (32-shift)))
|
||||||
|
extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex)
|
||||||
|
{
|
||||||
|
unsigned mask = (1U << (ifutex % 32));
|
||||||
|
unsigned futexT;
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
__atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE);
|
||||||
|
futexT &= mask;
|
||||||
|
if (!futexT)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
extern "C" void fastlock_free(struct fastlock *lock)
|
extern "C" void fastlock_free(struct fastlock *lock)
|
||||||
{
|
{
|
||||||
// NOP
|
// NOP
|
||||||
@ -413,4 +417,4 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting)
|
|||||||
{
|
{
|
||||||
fastlock_lock(lock);
|
fastlock_lock(lock);
|
||||||
lock->m_depth = nesting;
|
lock->m_depth = nesting;
|
||||||
}
|
}
|
||||||
|
@ -126,6 +126,7 @@ fastlock_trylock:
|
|||||||
|
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.global fastlock_unlock
|
.global fastlock_unlock
|
||||||
|
.type fastlock_unlock,@function
|
||||||
fastlock_unlock:
|
fastlock_unlock:
|
||||||
# RDI points to the struct:
|
# RDI points to the struct:
|
||||||
# int32_t m_pidOwner
|
# int32_t m_pidOwner
|
||||||
@ -133,34 +134,19 @@ fastlock_unlock:
|
|||||||
# [rdi+64] ...
|
# [rdi+64] ...
|
||||||
# uint16_t active
|
# uint16_t active
|
||||||
# uint16_t avail
|
# uint16_t avail
|
||||||
push r11
|
|
||||||
sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
||||||
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
|
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
|
||||||
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
||||||
mov ecx, [rdi+64] # get current active (this one)
|
mov esi, [rdi+64] # get current active (this one)
|
||||||
inc ecx # bump it to the next thread
|
inc esi # bump it to the next thread
|
||||||
mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
||||||
mfence # sync other threads
|
mfence # sync other threads
|
||||||
# At this point the lock is removed, however we must wake up any pending futexs
|
# At this point the lock is removed, however we must wake up any pending futexs
|
||||||
mov r9d, 1 # eax is the bitmask for 2 threads
|
mov edx, [rdi+64+4] # load the futex mask
|
||||||
rol r9d, cl # place the mask in the right spot for the next 2 threads
|
bt edx, esi # is the next thread waiting on a futex?
|
||||||
add rdi, 64 # rdi now points to the token
|
jc unlock_futex # unlock the futex if necessary
|
||||||
|
ret # if not we're done.
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LRetryWake:
|
|
||||||
mov r11d, [rdi+4] # load the futex mask
|
|
||||||
and r11d, r9d # are any threads waiting on a futex?
|
|
||||||
jz .LDone # if not we're done.
|
|
||||||
# we have to wake the futexs
|
|
||||||
# rdi ARG1 futex (already in rdi)
|
|
||||||
mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE
|
|
||||||
mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake)
|
|
||||||
xor r10d, r10d # r10 ARG4 NULL
|
|
||||||
mov r8, rdi # r8 ARG5 dup rdi
|
|
||||||
# r9 ARG6 mask (already set above)
|
|
||||||
mov eax, 202 # sys_futex
|
|
||||||
syscall
|
|
||||||
cmp eax, 1 # did we wake as many as we expected?
|
|
||||||
jnz .LRetryWake
|
|
||||||
.LDone:
|
.LDone:
|
||||||
pop r11
|
js fastlock_panic # panic if we made m_depth negative
|
||||||
ret
|
ret
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
bool FInReplicaReplay();
|
||||||
|
|
||||||
/* ================================ MULTI/EXEC ============================== */
|
/* ================================ MULTI/EXEC ============================== */
|
||||||
|
|
||||||
@ -172,12 +173,15 @@ void execCommand(client *c) {
|
|||||||
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
||||||
* both the AOF and the replication link will have the same consistency
|
* both the AOF and the replication link will have the same consistency
|
||||||
* and atomicity guarantees. */
|
* and atomicity guarantees. */
|
||||||
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
|
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) {
|
||||||
execCommandPropagateMulti(c);
|
execCommandPropagateMulti(c);
|
||||||
must_propagate = 1;
|
must_propagate = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL);
|
int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL;
|
||||||
|
if (FInReplicaReplay())
|
||||||
|
flags &= ~CMD_CALL_PROPAGATE;
|
||||||
|
call(c,flags);
|
||||||
|
|
||||||
/* Commands may alter argc/argv, restore mstate. */
|
/* Commands may alter argc/argv, restore mstate. */
|
||||||
c->mstate.commands[j].argc = c->argc;
|
c->mstate.commands[j].argc = c->argc;
|
||||||
|
@ -1003,6 +1003,27 @@ int clientHasPendingReplies(client *c) {
|
|||||||
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
|
||||||
|
int chooseBestThreadForAccept()
|
||||||
|
{
|
||||||
|
int ielMinLoad = 0;
|
||||||
|
int cclientsMin = INT_MAX;
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
int cclientsThread;
|
||||||
|
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
|
||||||
|
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
|
||||||
|
if (cclientsThread < cserver.thread_min_client_threshold)
|
||||||
|
return iel;
|
||||||
|
if (cclientsThread < cclientsMin)
|
||||||
|
{
|
||||||
|
cclientsMin = cclientsThread;
|
||||||
|
ielMinLoad = iel;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ielMinLoad;
|
||||||
|
}
|
||||||
|
|
||||||
#define MAX_ACCEPTS_PER_CALL 1000
|
#define MAX_ACCEPTS_PER_CALL 1000
|
||||||
static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
|
static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
|
||||||
client *c;
|
client *c;
|
||||||
@ -1105,7 +1126,25 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
|
|
||||||
if (!g_fTestMode)
|
if (!g_fTestMode)
|
||||||
{
|
{
|
||||||
// We always accept on the same thread
|
{
|
||||||
|
int ielTarget = chooseBestThreadForAccept();
|
||||||
|
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
|
||||||
|
if (ielTarget != ielCur)
|
||||||
|
{
|
||||||
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
|
||||||
|
acceptCommonHandler(cfd,0,szT, ielTarget);
|
||||||
|
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||||
|
zfree(szT);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (res == AE_OK)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
LLocalThread:
|
LLocalThread:
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
acceptCommonHandler(cfd,0,cip, ielCur);
|
acceptCommonHandler(cfd,0,cip, ielCur);
|
||||||
@ -1122,10 +1161,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
goto LLocalThread;
|
goto LLocalThread;
|
||||||
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
memcpy(szT, cip, NET_IP_STR_LEN);
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
||||||
acceptCommonHandler(cfd,0,szT, iel);
|
acceptCommonHandler(cfd,0,szT, iel);
|
||||||
zfree(szT);
|
zfree(szT);
|
||||||
});
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
{
|
||||||
|
zfree(szT);
|
||||||
|
goto LLocalThread;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1151,13 +1195,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
int ielTarget = rand() % cserver.cthreads;
|
int ielTarget = rand() % cserver.cthreads;
|
||||||
if (ielTarget == ielCur)
|
if (ielTarget == ielCur)
|
||||||
{
|
{
|
||||||
|
LLocalThread:
|
||||||
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur);
|
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
|
||||||
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget);
|
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget);
|
||||||
});
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
goto LLocalThread;
|
||||||
}
|
}
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
|
|
||||||
@ -2529,7 +2576,7 @@ NULL
|
|||||||
{
|
{
|
||||||
int iel = client->iel;
|
int iel = client->iel;
|
||||||
freeClientAsync(client);
|
freeClientAsync(client);
|
||||||
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
|
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK
|
||||||
freeClientsInAsyncFreeQueue(iel);
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -3018,6 +3065,7 @@ void unpauseClientsIfNecessary()
|
|||||||
*
|
*
|
||||||
* The function returns the total number of events processed. */
|
* The function returns the total number of events processed. */
|
||||||
int processEventsWhileBlocked(int iel) {
|
int processEventsWhileBlocked(int iel) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
int iterations = 4; /* See the function top-comment. */
|
int iterations = 4; /* See the function top-comment. */
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
@ -3027,14 +3075,29 @@ int processEventsWhileBlocked(int iel) {
|
|||||||
serverAssert(c->flags & CLIENT_PROTECTED);
|
serverAssert(c->flags & CLIENT_PROTECTED);
|
||||||
c->lock.unlock();
|
c->lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
while (iterations--) {
|
try
|
||||||
int events = 0;
|
{
|
||||||
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
while (iterations--) {
|
||||||
events += handleClientsWithPendingWrites(iel);
|
int events = 0;
|
||||||
if (!events) break;
|
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||||
count += events;
|
events += handleClientsWithPendingWrites(iel);
|
||||||
|
if (!events) break;
|
||||||
|
count += events;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
// Caller expects us to be locked so fix and rethrow
|
||||||
|
AeLocker locker;
|
||||||
|
if (c != nullptr)
|
||||||
|
c->lock.lock();
|
||||||
|
locker.arm(c);
|
||||||
|
locker.release();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
if (c != nullptr)
|
if (c != nullptr)
|
||||||
c->lock.lock();
|
c->lock.lock();
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
#include "cron.h"
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
|
|
||||||
@ -369,6 +370,7 @@ void decrRefCount(robj_roptr o) {
|
|||||||
case OBJ_HASH: freeHashObject(o); break;
|
case OBJ_HASH: freeHashObject(o); break;
|
||||||
case OBJ_MODULE: freeModuleObject(o); break;
|
case OBJ_MODULE: freeModuleObject(o); break;
|
||||||
case OBJ_STREAM: freeStreamObject(o); break;
|
case OBJ_STREAM: freeStreamObject(o); break;
|
||||||
|
case OBJ_CRON: freeCronObject(o); break;
|
||||||
default: serverPanic("Unknown object type"); break;
|
default: serverPanic("Unknown object type"); break;
|
||||||
}
|
}
|
||||||
zfree(o.unsafe_robjcast());
|
zfree(o.unsafe_robjcast());
|
||||||
|
43
src/rdb.cpp
43
src/rdb.cpp
@ -33,6 +33,7 @@
|
|||||||
#include "endianconv.h"
|
#include "endianconv.h"
|
||||||
#include "stream.h"
|
#include "stream.h"
|
||||||
#include "storage.h"
|
#include "storage.h"
|
||||||
|
#include "cron.h"
|
||||||
|
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
@ -523,6 +524,11 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sdsstring rdbLoadString(rio *rdb){
|
||||||
|
sds str = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||||
|
return sdsstring(str);
|
||||||
|
}
|
||||||
|
|
||||||
robj *rdbLoadStringObject(rio *rdb) {
|
robj *rdbLoadStringObject(rio *rdb) {
|
||||||
return (robj*)rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
|
return (robj*)rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
|
||||||
}
|
}
|
||||||
@ -657,6 +663,8 @@ int rdbSaveObjectType(rio *rdb, robj_roptr o) {
|
|||||||
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
|
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
|
||||||
case OBJ_MODULE:
|
case OBJ_MODULE:
|
||||||
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
|
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
|
||||||
|
case OBJ_CRON:
|
||||||
|
return rdbSaveType(rdb,RDB_TYPE_CRON);
|
||||||
default:
|
default:
|
||||||
serverPanic("Unknown object type");
|
serverPanic("Unknown object type");
|
||||||
}
|
}
|
||||||
@ -986,6 +994,17 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) {
|
|||||||
zfree(io.ctx);
|
zfree(io.ctx);
|
||||||
}
|
}
|
||||||
return io.error ? -1 : (ssize_t)io.bytes;
|
return io.error ? -1 : (ssize_t)io.bytes;
|
||||||
|
} else if (o->type == OBJ_CRON) {
|
||||||
|
cronjob *job = (cronjob*)ptrFromObj(o);
|
||||||
|
rdbSaveRawString(rdb, (const unsigned char*)job->script.get(), job->script.size());
|
||||||
|
rdbSaveMillisecondTime(rdb, job->startTime);
|
||||||
|
rdbSaveMillisecondTime(rdb, job->interval);
|
||||||
|
rdbSaveLen(rdb, job->veckeys.size());
|
||||||
|
for (auto &key : job->veckeys)
|
||||||
|
rdbSaveRawString(rdb, (const unsigned char*)key.get(), key.size());
|
||||||
|
rdbSaveLen(rdb, job->vecargs.size());
|
||||||
|
for (auto &arg : job->vecargs)
|
||||||
|
rdbSaveRawString(rdb, (const unsigned char*)arg.get(), arg.size());
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown object type");
|
serverPanic("Unknown object type");
|
||||||
}
|
}
|
||||||
@ -1526,9 +1545,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
|||||||
== NULL) return NULL;
|
== NULL) return NULL;
|
||||||
|
|
||||||
if (rdbtype == RDB_TYPE_ZSET_2) {
|
if (rdbtype == RDB_TYPE_ZSET_2) {
|
||||||
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
|
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
|
||||||
|
sdsfree(sdsele);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
|
if (rdbLoadDoubleValue(rdb,&score) == -1) {
|
||||||
|
sdsfree(sdsele);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Don't care about integer-encoded strings. */
|
/* Don't care about integer-encoded strings. */
|
||||||
@ -1848,6 +1873,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
o = createModuleObject(mt,ptr);
|
o = createModuleObject(mt,ptr);
|
||||||
|
} else if (rdbtype == RDB_TYPE_CRON) {
|
||||||
|
std::unique_ptr<cronjob> spjob = std::make_unique<cronjob>();
|
||||||
|
spjob->script = rdbLoadString(rdb);
|
||||||
|
spjob->startTime = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||||
|
spjob->interval = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||||
|
auto ckeys = rdbLoadLen(rdb,NULL);
|
||||||
|
for (uint64_t i = 0; i < ckeys; ++i)
|
||||||
|
spjob->veckeys.push_back(rdbLoadString(rdb));
|
||||||
|
auto cargs = rdbLoadLen(rdb,NULL);
|
||||||
|
for (uint64_t i = 0; i < cargs; ++i)
|
||||||
|
spjob->vecargs.push_back(rdbLoadString(rdb));
|
||||||
|
o = createObject(OBJ_CRON, spjob.release());
|
||||||
} else {
|
} else {
|
||||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||||
}
|
}
|
||||||
@ -2150,6 +2187,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
decrRefCount(val);
|
decrRefCount(val);
|
||||||
val = nullptr;
|
val = nullptr;
|
||||||
}
|
}
|
||||||
|
decrRefCount(key);
|
||||||
|
key = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reset the state that is key-specified and is populated by
|
/* Reset the state that is key-specified and is populated by
|
||||||
|
@ -91,10 +91,13 @@
|
|||||||
#define RDB_TYPE_HASH_ZIPLIST 13
|
#define RDB_TYPE_HASH_ZIPLIST 13
|
||||||
#define RDB_TYPE_LIST_QUICKLIST 14
|
#define RDB_TYPE_LIST_QUICKLIST 14
|
||||||
#define RDB_TYPE_STREAM_LISTPACKS 15
|
#define RDB_TYPE_STREAM_LISTPACKS 15
|
||||||
|
|
||||||
|
/* KeyDB Specific Object Types */
|
||||||
|
#define RDB_TYPE_CRON 64
|
||||||
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
|
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
|
||||||
|
|
||||||
/* Test if a type is an object type. */
|
/* Test if a type is an object type. */
|
||||||
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15))
|
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15) || (t == RDB_TYPE_CRON))
|
||||||
|
|
||||||
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
|
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
|
||||||
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
|
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
|
||||||
|
@ -42,6 +42,8 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <uuid/uuid.h>
|
#include <uuid/uuid.h>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||||
void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
||||||
@ -276,7 +278,7 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
|
|||||||
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
|
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
|
||||||
|
|
||||||
/* Send it to slaves */
|
/* Send it to slaves */
|
||||||
addReply(replica,selectcmd);
|
addReplyAsync(replica,selectcmd);
|
||||||
|
|
||||||
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
||||||
decrRefCount(selectcmd);
|
decrRefCount(selectcmd);
|
||||||
@ -288,12 +290,12 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
|
|||||||
* or are already in sync with the master. */
|
* or are already in sync with the master. */
|
||||||
|
|
||||||
/* Add the multi bulk length. */
|
/* Add the multi bulk length. */
|
||||||
addReplyArrayLen(replica,argc);
|
addReplyArrayLenAsync(replica,argc);
|
||||||
|
|
||||||
/* Finally any additional argument that was not stored inside the
|
/* Finally any additional argument that was not stored inside the
|
||||||
* static buffer if any (from j to argc). */
|
* static buffer if any (from j to argc). */
|
||||||
for (int j = 0; j < argc; j++)
|
for (int j = 0; j < argc; j++)
|
||||||
addReplyBulk(replica,argv[j]);
|
addReplyBulkAsync(replica,argv[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Propagate write commands to slaves, and populate the replication backlog
|
/* Propagate write commands to slaves, and populate the replication backlog
|
||||||
@ -353,6 +355,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||||
|
|
||||||
char szMvcc[128];
|
char szMvcc[128];
|
||||||
|
incrementMvccTstamp();
|
||||||
uint64_t mvccTstamp = getMvccTstamp();
|
uint64_t mvccTstamp = getMvccTstamp();
|
||||||
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
||||||
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
||||||
@ -432,6 +435,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
||||||
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
{
|
{
|
||||||
addReplyAsync(replica,shared.crlf);
|
addReplyAsync(replica,shared.crlf);
|
||||||
@ -1255,19 +1259,19 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
{
|
{
|
||||||
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
|
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
|
||||||
// Because the client could have been closed while the lambda waited to run we need to
|
// Because the client could have been closed while the lambda waited to run we need to
|
||||||
// verify the replica is still connected
|
// verify the replica is still connected
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->slaves,&li);
|
listRewind(g_pserver->slaves,&li);
|
||||||
bool fFound = false;
|
bool fFound = false;
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
if (listNodeValue(ln) == replica) {
|
if (listNodeValue(ln) == replica) {
|
||||||
fFound = true;
|
fFound = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!fFound)
|
if (!fFound)
|
||||||
return;
|
return;
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
|
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
|
||||||
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
|
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
@ -2325,10 +2329,11 @@ int connectWithMaster(redisMaster *mi) {
|
|||||||
void undoConnectWithMaster(redisMaster *mi) {
|
void undoConnectWithMaster(redisMaster *mi) {
|
||||||
int fd = mi->repl_transfer_s;
|
int fd = mi->repl_transfer_s;
|
||||||
|
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE);
|
||||||
close(fd);
|
close(fd);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
mi->repl_transfer_s = -1;
|
mi->repl_transfer_s = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2420,6 +2425,8 @@ void freeMasterInfo(redisMaster *mi)
|
|||||||
{
|
{
|
||||||
zfree(mi->masterauth);
|
zfree(mi->masterauth);
|
||||||
zfree(mi->masteruser);
|
zfree(mi->masteruser);
|
||||||
|
if (mi->clientFake)
|
||||||
|
freeClient(mi->clientFake);
|
||||||
delete mi->staleKeyMap;
|
delete mi->staleKeyMap;
|
||||||
zfree(mi);
|
zfree(mi);
|
||||||
}
|
}
|
||||||
@ -2477,6 +2484,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
|
|||||||
mi->master = NULL;
|
mi->master = NULL;
|
||||||
mi->repl_state = REPL_STATE_CONNECT;
|
mi->repl_state = REPL_STATE_CONNECT;
|
||||||
mi->repl_down_since = g_pserver->unixtime;
|
mi->repl_down_since = g_pserver->unixtime;
|
||||||
|
if (mi->clientFake) {
|
||||||
|
freeClient(mi->clientFake);
|
||||||
|
mi->clientFake = nullptr;
|
||||||
|
|
||||||
|
}
|
||||||
/* We lost connection with our master, don't disconnect slaves yet,
|
/* We lost connection with our master, don't disconnect slaves yet,
|
||||||
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
||||||
* the slaves only if we'll have to do a full resync with our master. */
|
* the slaves only if we'll have to do a full resync with our master. */
|
||||||
@ -3344,16 +3356,35 @@ public:
|
|||||||
return m_cnesting == 1;
|
return m_cnesting == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redisMaster *getMi(client *c)
|
||||||
|
{
|
||||||
|
if (m_mi == nullptr)
|
||||||
|
m_mi = MasterInfoFromClient(c);
|
||||||
|
return m_mi;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nesting() const { return m_cnesting; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int m_cnesting = 0;
|
int m_cnesting = 0;
|
||||||
bool m_fCancelled = false;
|
bool m_fCancelled = false;
|
||||||
|
redisMaster *m_mi = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static thread_local std::unique_ptr<ReplicaNestState> s_pstate;
|
||||||
|
|
||||||
|
bool FInReplicaReplay()
|
||||||
|
{
|
||||||
|
return s_pstate != nullptr && s_pstate->nesting() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::unordered_map<std::string, uint64_t> g_mapmvcc;
|
||||||
|
|
||||||
void replicaReplayCommand(client *c)
|
void replicaReplayCommand(client *c)
|
||||||
{
|
{
|
||||||
static thread_local ReplicaNestState *s_pstate = nullptr;
|
|
||||||
if (s_pstate == nullptr)
|
if (s_pstate == nullptr)
|
||||||
s_pstate = new (MALLOC_LOCAL) ReplicaNestState;
|
s_pstate = std::make_unique<ReplicaNestState>();
|
||||||
|
|
||||||
// the replay command contains two arguments:
|
// the replay command contains two arguments:
|
||||||
// 1: The UUID of the source
|
// 1: The UUID of the source
|
||||||
@ -3375,9 +3406,10 @@ void replicaReplayCommand(client *c)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned char uuid[UUID_BINARY_LEN];
|
std::string uuid;
|
||||||
|
uuid.resize(UUID_BINARY_LEN);
|
||||||
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|
||||||
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
|
|| uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0)
|
||||||
{
|
{
|
||||||
addReplyError(c, "Expected UUID arg1");
|
addReplyError(c, "Expected UUID arg1");
|
||||||
s_pstate->Cancel();
|
s_pstate->Cancel();
|
||||||
@ -3413,7 +3445,7 @@ void replicaReplayCommand(client *c)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FSameUuidNoNil(uuid, cserver.uuid))
|
if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid))
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
s_pstate->Cancel();
|
s_pstate->Cancel();
|
||||||
@ -3423,33 +3455,57 @@ void replicaReplayCommand(client *c)
|
|||||||
if (!s_pstate->FPush())
|
if (!s_pstate->FPush())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
redisMaster *mi = s_pstate->getMi(c);
|
||||||
|
client *cFake = mi->clientFake;
|
||||||
|
if (mi->clientFakeNesting != s_pstate->nesting())
|
||||||
|
cFake = nullptr;
|
||||||
|
serverAssert(mi != nullptr);
|
||||||
|
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
|
||||||
|
{
|
||||||
|
s_pstate->Cancel();
|
||||||
|
s_pstate->Pop();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// OK We've recieved a command lets execute
|
// OK We've recieved a command lets execute
|
||||||
client *current_clientSave = serverTL->current_client;
|
client *current_clientSave = serverTL->current_client;
|
||||||
client *cFake = createClient(-1, c->iel);
|
if (cFake == nullptr)
|
||||||
|
cFake = createClient(-1, c->iel);
|
||||||
cFake->lock.lock();
|
cFake->lock.lock();
|
||||||
cFake->authenticated = c->authenticated;
|
cFake->authenticated = c->authenticated;
|
||||||
cFake->puser = c->puser;
|
cFake->puser = c->puser;
|
||||||
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
||||||
selectDb(cFake, c->db->id);
|
selectDb(cFake, c->db->id);
|
||||||
auto ccmdPrev = serverTL->commandsExecuted;
|
auto ccmdPrev = serverTL->commandsExecuted;
|
||||||
|
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
||||||
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||||
|
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
||||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||||
cFake->lock.unlock();
|
cFake->lock.unlock();
|
||||||
if (fExec)
|
if (fExec || cFake->flags & CLIENT_MULTI)
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
selectDb(c, cFake->db->id);
|
selectDb(c, cFake->db->id);
|
||||||
redisMaster *mi = MasterInfoFromClient(c);
|
if (mvcc > g_mapmvcc[uuid])
|
||||||
if (mi != nullptr) // this should never be null but I'd prefer not to crash
|
g_mapmvcc[uuid] = mvcc;
|
||||||
{
|
|
||||||
mi->mvccLastSync = mvcc;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf);
|
||||||
addReplyError(c, "command did not execute");
|
addReplyError(c, "command did not execute");
|
||||||
}
|
}
|
||||||
freeClient(cFake);
|
serverAssert(sdslen(cFake->querybuf) == 0);
|
||||||
|
if (cFake->flags & CLIENT_MULTI)
|
||||||
|
{
|
||||||
|
mi->clientFake = cFake;
|
||||||
|
mi->clientFakeNesting = s_pstate->nesting();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (mi->clientFake == cFake)
|
||||||
|
mi->clientFake = nullptr;
|
||||||
|
freeClient(cFake);
|
||||||
|
}
|
||||||
serverTL->current_client = current_clientSave;
|
serverTL->current_client = current_clientSave;
|
||||||
|
|
||||||
// call() will not propogate this for us, so we do so here
|
// call() will not propogate this for us, so we do so here
|
||||||
|
14
src/sds.h
14
src/sds.h
@ -396,6 +396,20 @@ public:
|
|||||||
other.m_str = nullptr;
|
other.m_str = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sdsstring &operator=(const sdsstring &other)
|
||||||
|
{
|
||||||
|
sdsfree(m_str);
|
||||||
|
m_str = sdsdup(other.m_str);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdsstring &operator=(sds other)
|
||||||
|
{
|
||||||
|
sdsfree(m_str);
|
||||||
|
m_str = sdsdup(other);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
~sdsstring()
|
~sdsstring()
|
||||||
{
|
{
|
||||||
sdsfree(m_str);
|
sdsfree(m_str);
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
#include "latency.h"
|
#include "latency.h"
|
||||||
#include "atomicvar.h"
|
#include "atomicvar.h"
|
||||||
#include "storage.h"
|
#include "storage.h"
|
||||||
|
#include "cron.h"
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
@ -1023,7 +1024,11 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
|
|
||||||
{"rreplay",replicaReplayCommand,-3,
|
{"rreplay",replicaReplayCommand,-3,
|
||||||
"read-only fast noprop",
|
"read-only fast noprop",
|
||||||
0,NULL,0,0,0,0,0,0}
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
|
{"cron",cronCommand,-5,
|
||||||
|
"write use-memory",
|
||||||
|
0,NULL,1,1,1,0,0,0},
|
||||||
};
|
};
|
||||||
|
|
||||||
/*============================ Utility functions ============================ */
|
/*============================ Utility functions ============================ */
|
||||||
@ -1685,7 +1690,7 @@ void clientsCron(int iel) {
|
|||||||
/* The following functions do different service checks on the client.
|
/* The following functions do different service checks on the client.
|
||||||
* The protocol is that they return non-zero if the client was
|
* The protocol is that they return non-zero if the client was
|
||||||
* terminated. */
|
* terminated. */
|
||||||
if (clientsCronHandleTimeout(c,now)) goto LContinue;
|
if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock
|
||||||
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
|
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
|
||||||
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
|
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
|
||||||
LContinue:
|
LContinue:
|
||||||
@ -1889,7 +1894,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
/* We received a SIGTERM, shutting down here in a safe way, as it is
|
/* We received a SIGTERM, shutting down here in a safe way, as it is
|
||||||
* not ok doing so inside the signal handler. */
|
* not ok doing so inside the signal handler. */
|
||||||
if (g_pserver->shutdown_asap) {
|
if (g_pserver->shutdown_asap) {
|
||||||
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
|
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) throw ShutdownException();
|
||||||
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
|
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
|
||||||
g_pserver->shutdown_asap = 0;
|
g_pserver->shutdown_asap = 0;
|
||||||
}
|
}
|
||||||
@ -2879,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
|||||||
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
||||||
pvar->current_client = nullptr;
|
pvar->current_client = nullptr;
|
||||||
pvar->clients_paused = 0;
|
pvar->clients_paused = 0;
|
||||||
|
pvar->fRetrySetAofEvent = false;
|
||||||
if (pvar->el == NULL) {
|
if (pvar->el == NULL) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
@ -3806,8 +3812,17 @@ int prepareForShutdown(int flags) {
|
|||||||
|
|
||||||
/* Close the listening sockets. Apparently this allows faster restarts. */
|
/* Close the listening sockets. Apparently this allows faster restarts. */
|
||||||
closeListeningSockets(1);
|
closeListeningSockets(1);
|
||||||
|
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{
|
||||||
|
g_pserver->rgthreadvar[iel].el->stop = 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
||||||
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
|
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
|
||||||
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4684,7 +4699,7 @@ void daemonize(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void version(void) {
|
void version(void) {
|
||||||
printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
|
printf("KeyDB server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
|
||||||
KEYDB_REAL_VERSION,
|
KEYDB_REAL_VERSION,
|
||||||
redisGitSHA1(),
|
redisGitSHA1(),
|
||||||
atoi(redisGitDirty()) > 0,
|
atoi(redisGitDirty()) > 0,
|
||||||
@ -5027,8 +5042,16 @@ void *workerThreadMain(void *parg)
|
|||||||
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
|
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
|
||||||
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
||||||
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
|
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
|
||||||
aeMain(el);
|
try
|
||||||
|
{
|
||||||
|
aeMain(el);
|
||||||
|
}
|
||||||
|
catch (ShutdownException)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
serverAssert(!GlobalLocksAcquired());
|
||||||
aeDeleteEventLoop(el);
|
aeDeleteEventLoop(el);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5098,10 +5121,6 @@ int main(int argc, char **argv) {
|
|||||||
dictSetHashFunctionSeed((uint8_t*)hashseed);
|
dictSetHashFunctionSeed((uint8_t*)hashseed);
|
||||||
g_pserver->sentinel_mode = checkForSentinelMode(argc,argv);
|
g_pserver->sentinel_mode = checkForSentinelMode(argc,argv);
|
||||||
initServerConfig();
|
initServerConfig();
|
||||||
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
|
||||||
{
|
|
||||||
initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
|
|
||||||
}
|
|
||||||
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
|
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
|
||||||
aeAcquireLock(); // We own the lock on boot
|
aeAcquireLock(); // We own the lock on boot
|
||||||
|
|
||||||
@ -5228,6 +5247,10 @@ int main(int argc, char **argv) {
|
|||||||
int background = cserver.daemonize && !cserver.supervised;
|
int background = cserver.daemonize && !cserver.supervised;
|
||||||
if (background) daemonize();
|
if (background) daemonize();
|
||||||
|
|
||||||
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
|
||||||
|
}
|
||||||
initServer();
|
initServer();
|
||||||
initNetworking(cserver.cthreads > 1 /* fReusePort */);
|
initNetworking(cserver.cthreads > 1 /* fReusePort */);
|
||||||
|
|
||||||
@ -5328,7 +5351,9 @@ int main(int argc, char **argv) {
|
|||||||
/* The main thread sleeps until all the workers are done.
|
/* The main thread sleeps until all the workers are done.
|
||||||
this is so that all worker threads are orthogonal in their startup/shutdown */
|
this is so that all worker threads are orthogonal in their startup/shutdown */
|
||||||
void *pvRet;
|
void *pvRet;
|
||||||
pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet);
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
pthread_join(rgthread[iel], &pvRet);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
16
src/server.h
16
src/server.h
@ -683,6 +683,8 @@ public:
|
|||||||
* encoding version. */
|
* encoding version. */
|
||||||
#define OBJ_MODULE 5 /* Module object. */
|
#define OBJ_MODULE 5 /* Module object. */
|
||||||
#define OBJ_STREAM 6 /* Stream object. */
|
#define OBJ_STREAM 6 /* Stream object. */
|
||||||
|
#define OBJ_CRON 7 /* CRON job */
|
||||||
|
|
||||||
|
|
||||||
/* Extract encver / signature from a module type ID. */
|
/* Extract encver / signature from a module type ID. */
|
||||||
#define REDISMODULE_TYPE_ENCVER_BITS 10
|
#define REDISMODULE_TYPE_ENCVER_BITS 10
|
||||||
@ -1125,10 +1127,13 @@ typedef struct clientReplyBlock {
|
|||||||
/* Redis database representation. There are multiple databases identified
|
/* Redis database representation. There are multiple databases identified
|
||||||
* by integers from 0 (the default database) up to the max configured
|
* by integers from 0 (the default database) up to the max configured
|
||||||
* database. The database number is the 'id' field in the structure. */
|
* database. The database number is the 'id' field in the structure. */
|
||||||
typedef struct redisDb {
|
struct redisDb {
|
||||||
redisDb()
|
redisDb()
|
||||||
: expireitr(nullptr)
|
: expireitr(nullptr)
|
||||||
{};
|
{};
|
||||||
|
|
||||||
|
~redisDb();
|
||||||
|
|
||||||
dict *pdict; /* The keyspace for this DB */
|
dict *pdict; /* The keyspace for this DB */
|
||||||
expireset *setexpire;
|
expireset *setexpire;
|
||||||
expireset::setiter expireitr;
|
expireset::setiter expireitr;
|
||||||
@ -1140,7 +1145,7 @@ typedef struct redisDb {
|
|||||||
long long last_expire_set; /* when the last expire was set */
|
long long last_expire_set; /* when the last expire was set */
|
||||||
double avg_ttl; /* Average TTL, just for stats */
|
double avg_ttl; /* Average TTL, just for stats */
|
||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
} redisDb;
|
};
|
||||||
|
|
||||||
/* Client MULTI/EXEC state */
|
/* Client MULTI/EXEC state */
|
||||||
typedef struct multiCmd {
|
typedef struct multiCmd {
|
||||||
@ -1524,6 +1529,7 @@ struct redisServerThreadVars {
|
|||||||
struct fastlock lockPendingWrite { "thread pending write" };
|
struct fastlock lockPendingWrite { "thread pending write" };
|
||||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||||
long unsigned commandsExecuted = 0;
|
long unsigned commandsExecuted = 0;
|
||||||
|
bool fRetrySetAofEvent = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
struct redisMaster {
|
||||||
@ -1533,6 +1539,8 @@ struct redisMaster {
|
|||||||
int masterport; /* Port of master */
|
int masterport; /* Port of master */
|
||||||
client *cached_master; /* Cached master to be reused for PSYNC. */
|
client *cached_master; /* Cached master to be reused for PSYNC. */
|
||||||
client *master;
|
client *master;
|
||||||
|
client *clientFake;
|
||||||
|
int clientFakeNesting;
|
||||||
/* The following two fields is where we store master PSYNC replid/offset
|
/* The following two fields is where we store master PSYNC replid/offset
|
||||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||||
* the server->master client structure. */
|
* the server->master client structure. */
|
||||||
@ -1600,6 +1608,7 @@ struct redisServerConst {
|
|||||||
|
|
||||||
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
||||||
bool fUsePro = false;
|
bool fUsePro = false;
|
||||||
|
int thread_min_client_threshold = 50;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
@ -2913,6 +2922,9 @@ inline int FCorrectThread(client *c)
|
|||||||
}
|
}
|
||||||
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c))
|
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c))
|
||||||
|
|
||||||
|
class ShutdownException
|
||||||
|
{};
|
||||||
|
|
||||||
#define redisDebug(fmt, ...) \
|
#define redisDebug(fmt, ...) \
|
||||||
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
|
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
|
||||||
#define redisDebugMark() \
|
#define redisDebugMark() \
|
||||||
|
@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Active replicas propogate transaction} {
|
||||||
|
$master set testkey 0
|
||||||
|
$master multi
|
||||||
|
$master incr testkey
|
||||||
|
$master incr testkey
|
||||||
|
after 5000
|
||||||
|
$master get testkey
|
||||||
|
$master exec
|
||||||
|
assert_equal 2 [$master get testkey]
|
||||||
|
after 500
|
||||||
|
wait_for_condition 50 500 {
|
||||||
|
[string match "2" [$slave get testkey]]
|
||||||
|
} else {
|
||||||
|
fail "Transaction failed to replicate"
|
||||||
|
}
|
||||||
|
$master flushall
|
||||||
|
}
|
||||||
|
|
||||||
test {Active replicas WAIT} {
|
test {Active replicas WAIT} {
|
||||||
# Test that wait succeeds since replicas should be syncronized
|
# Test that wait succeeds since replicas should be syncronized
|
||||||
$master set testkey foo
|
$master set testkey foo
|
||||||
|
74
tests/integration/replication-multimaster.tcl
Normal file
74
tests/integration/replication-multimaster.tcl
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
foreach topology {mesh ring} {
|
||||||
|
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
|
||||||
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
|
set R($j) [srv [expr 0-$j] client]
|
||||||
|
set R_host($j) [srv [expr 0-$j] host]
|
||||||
|
set R_port($j) [srv [expr 0-$j] port]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Initialize as mesh
|
||||||
|
if [string equal $topology "mesh"] {
|
||||||
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
|
for {set k 0} {$k < 4} {incr k} {
|
||||||
|
if $j!=$k {
|
||||||
|
$R($j) replicaof $R_host($k) $R_port($k)
|
||||||
|
after 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
#Else Ring
|
||||||
|
if [string equal $topology "ring"] {
|
||||||
|
$R(0) replicaof $R_host(3) $R_port(3)
|
||||||
|
after 100
|
||||||
|
$R(1) replicaof $R_host(0) $R_port(0)
|
||||||
|
after 100
|
||||||
|
$R(2) replicaof $R_host(1) $R_port(1)
|
||||||
|
after 100
|
||||||
|
$R(3) replicaof $R_host(2) $R_port(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
after 2000
|
||||||
|
|
||||||
|
test "$topology replicates to all nodes" {
|
||||||
|
$R(0) set testkey foo
|
||||||
|
after 500
|
||||||
|
assert_equal foo [$R(1) get testkey] "replicates to 1"
|
||||||
|
assert_equal foo [$R(2) get testkey] "replicates to 2"
|
||||||
|
}
|
||||||
|
|
||||||
|
test "$topology replicates only once" {
|
||||||
|
$R(0) set testkey 1
|
||||||
|
after 500
|
||||||
|
$R(1) incr testkey
|
||||||
|
after 500
|
||||||
|
$R(2) incr testkey
|
||||||
|
after 500
|
||||||
|
assert_equal 3 [$R(0) get testkey]
|
||||||
|
assert_equal 3 [$R(1) get testkey]
|
||||||
|
assert_equal 3 [$R(2) get testkey]
|
||||||
|
assert_equal 3 [$R(3) get testkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "$topology transaction replicates only once" {
|
||||||
|
for {set j 0} {$j < 1000} {incr j} {
|
||||||
|
$R(0) set testkey 1
|
||||||
|
$R(0) multi
|
||||||
|
$R(0) incr testkey
|
||||||
|
$R(0) incr testkey
|
||||||
|
$R(0) exec
|
||||||
|
after 1
|
||||||
|
assert_equal 3 [$R(0) get testkey] "node 0"
|
||||||
|
assert_equal 3 [$R(1) get testkey] "node 1"
|
||||||
|
assert_equal 3 [$R(2) get testkey] "node 2"
|
||||||
|
assert_equal 3 [$R(3) get testkey] "node 3"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -309,3 +309,5 @@ start_server {tags {"repl"}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,6 +36,8 @@ set ::all_tests {
|
|||||||
unit/aofrw
|
unit/aofrw
|
||||||
unit/acl
|
unit/acl
|
||||||
unit/rreplay
|
unit/rreplay
|
||||||
|
unit/cron
|
||||||
|
unit/replication
|
||||||
integration/block-repl
|
integration/block-repl
|
||||||
integration/replication
|
integration/replication
|
||||||
integration/replication-2
|
integration/replication-2
|
||||||
@ -43,6 +45,7 @@ set ::all_tests {
|
|||||||
integration/replication-4
|
integration/replication-4
|
||||||
integration/replication-psync
|
integration/replication-psync
|
||||||
integration/replication-active
|
integration/replication-active
|
||||||
|
integration/replication-multimaster
|
||||||
integration/aof
|
integration/aof
|
||||||
integration/rdb
|
integration/rdb
|
||||||
integration/convert-zipmap-hash-on-load
|
integration/convert-zipmap-hash-on-load
|
||||||
|
47
tests/unit/cron.tcl
Normal file
47
tests/unit/cron.tcl
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
start_server {tags {"CRON"}} {
|
||||||
|
test {cron singleshot past tense} {
|
||||||
|
r flushall
|
||||||
|
r cron testjob single 0 1 {redis.call("incr", "testkey")} 1 testkey
|
||||||
|
after 300
|
||||||
|
assert_equal 1 [r get testkey]
|
||||||
|
assert_equal 0 [r exists testjob]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {cron repeat past tense next exec is in the future} {
|
||||||
|
r flushall
|
||||||
|
r cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
|
||||||
|
after 300
|
||||||
|
assert_equal 1 [r get testkey]
|
||||||
|
assert_equal 1 [r exists testjob]
|
||||||
|
r del testjob
|
||||||
|
}
|
||||||
|
|
||||||
|
test {cron repeat works} {
|
||||||
|
r flushall
|
||||||
|
r cron testjob repeat 0 600 {redis.call("incr","testkey")}
|
||||||
|
after 1000
|
||||||
|
assert_equal 2 [r get testkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {cron overwrite works} {
|
||||||
|
r flushall
|
||||||
|
r cron testjob single 500 {redis.call("set","testkey","a")} 1 testkey
|
||||||
|
r cron testjob single 500 {redis.call("set","anotherkey","b")} 1 anotherkey
|
||||||
|
after 1000
|
||||||
|
assert_equal 0 [r exists testkey]
|
||||||
|
assert_equal b [r get anotherkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {cron delete key stops job} {
|
||||||
|
r flushall
|
||||||
|
r cron testjob single 500 {redis.call("set","testkey","a")}
|
||||||
|
r del testjob
|
||||||
|
after 1000
|
||||||
|
assert_equal 0 [r exists testkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {cron zero interval rejected} {
|
||||||
|
catch {r cron testjob single 0 0 {redis.call("incr","testkey")} 1 testkey} e
|
||||||
|
assert_match {ERR*} $e
|
||||||
|
}
|
||||||
|
}
|
12
tests/unit/replication.tcl
Normal file
12
tests/unit/replication.tcl
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
|
||||||
|
start_server {tags {"repl"}} {
|
||||||
|
test "incr of expired key on replica doesn't cause a crash" {
|
||||||
|
r debug force-master yes
|
||||||
|
r set testkey 1
|
||||||
|
r pexpire testkey 1
|
||||||
|
after 500
|
||||||
|
r incr testkey
|
||||||
|
r incr testkey
|
||||||
|
r debug force-master no
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
start_server {tags {"rreplay"}} {
|
start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
||||||
|
|
||||||
test {RREPLAY use current db} {
|
test {RREPLAY use current db} {
|
||||||
r debug force-master
|
r debug force-master yes
|
||||||
r select 4
|
r select 4
|
||||||
r set dbnum invalid
|
r set dbnum invalid
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
||||||
@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} {
|
|||||||
reconnect
|
reconnect
|
||||||
|
|
||||||
test {RREPLAY db different} {
|
test {RREPLAY db different} {
|
||||||
r debug force-master
|
r debug force-master yes
|
||||||
r select 4
|
r select 4
|
||||||
r set testkey four
|
r set testkey four
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user