Merge branch 'unstable' into redis_6_merge
Former-commit-id: cc9924ffa606200f331b3bf5e1e1a4aa3f2702fa
This commit is contained in:
commit
c001ea5b41
89
src/aof.cpp
89
src/aof.cpp
@ -599,21 +599,59 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r
|
||||
return buf;
|
||||
}
|
||||
|
||||
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
|
||||
sds buf = sdsempty();
|
||||
robj *tmpargv[3];
|
||||
|
||||
/* The DB this command was targeting is not the same as the last command
|
||||
* we appended. To issue a SELECT command is needed. */
|
||||
if (dictid != g_pserver->aof_selected_db) {
|
||||
char seldb[64];
|
||||
|
||||
snprintf(seldb,sizeof(seldb),"%d",dictid);
|
||||
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
|
||||
(unsigned long)strlen(seldb),seldb);
|
||||
g_pserver->aof_selected_db = dictid;
|
||||
sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj **argv, const size_t argc) {
|
||||
long long when = 0;
|
||||
int unit = UNIT_SECONDS;
|
||||
bool fAbsolute = false;
|
||||
|
||||
if (cmd->proc == expireMemberCommand) {
|
||||
if (getLongLongFromObject(argv[3], &when) != C_OK)
|
||||
serverPanic("propogating invalid EXPIREMEMBER command");
|
||||
|
||||
if (argc == 5) {
|
||||
unit = parseUnitString(szFromObj(argv[4]));
|
||||
}
|
||||
} else if (cmd->proc == expireMemberAtCommand) {
|
||||
if (getLongLongFromObject(argv[3], &when) != C_OK)
|
||||
serverPanic("propogating invalid EXPIREMEMBERAT command");
|
||||
fAbsolute = true;
|
||||
} else if (cmd->proc == pexpireMemberAtCommand) {
|
||||
if (getLongLongFromObject(argv[3], &when) != C_OK)
|
||||
serverPanic("propogating invalid PEXPIREMEMBERAT command");
|
||||
fAbsolute = true;
|
||||
unit = UNIT_MILLISECONDS;
|
||||
} else {
|
||||
serverPanic("Unknown expiremember command");
|
||||
}
|
||||
|
||||
switch (unit)
|
||||
{
|
||||
case UNIT_SECONDS:
|
||||
when *= 1000;
|
||||
break;
|
||||
|
||||
case UNIT_MILLISECONDS:
|
||||
break;
|
||||
}
|
||||
|
||||
if (!fAbsolute)
|
||||
when += mstime();
|
||||
|
||||
robj *argvNew[4];
|
||||
argvNew[0] = createStringObject("PEXPIREMEMBERAT",15);
|
||||
argvNew[1] = argv[1];
|
||||
argvNew[2] = argv[2];
|
||||
argvNew[3] = createStringObjectFromLongLong(when);
|
||||
buf = catAppendOnlyGenericCommand(buf, 4, argvNew);
|
||||
decrRefCount(argvNew[0]);
|
||||
decrRefCount(argvNew[3]);
|
||||
return buf;
|
||||
}
|
||||
|
||||
sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc)
|
||||
{
|
||||
robj *tmpargv[3];
|
||||
|
||||
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
|
||||
cmd->proc == expireatCommand) {
|
||||
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
|
||||
@ -642,6 +680,10 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
|
||||
if (pxarg)
|
||||
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1],
|
||||
pxarg);
|
||||
} else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand ||
|
||||
cmd->proc == pexpireMemberAtCommand) {
|
||||
/* Translate subkey expire commands to PEXPIREMEMBERAT */
|
||||
buf = catAppendOnlyExpireMemberAtCommand(buf, cmd, argv, argc);
|
||||
} else {
|
||||
/* All the other commands don't need translation or need the
|
||||
* same translation already operated in the command vector
|
||||
@ -649,6 +691,25 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
|
||||
buf = catAppendOnlyGenericCommand(buf,argc,argv);
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
|
||||
sds buf = sdsempty();
|
||||
|
||||
/* The DB this command was targeting is not the same as the last command
|
||||
* we appended. To issue a SELECT command is needed. */
|
||||
if (dictid != g_pserver->aof_selected_db) {
|
||||
char seldb[64];
|
||||
|
||||
snprintf(seldb,sizeof(seldb),"%d",dictid);
|
||||
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
|
||||
(unsigned long)strlen(seldb),seldb);
|
||||
g_pserver->aof_selected_db = dictid;
|
||||
}
|
||||
|
||||
buf = catCommandForAofAndActiveReplication(buf, cmd, argv, argc);
|
||||
|
||||
/* Append to the AOF buffer. This will be flushed on disk just before
|
||||
* of re-entering the event loop, so before the client will get a
|
||||
* positive reply about the operation performed. */
|
||||
@ -1398,7 +1459,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
||||
}
|
||||
else
|
||||
{
|
||||
char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n";
|
||||
char cmd[]="*4\r\n$12\r\nPEXPIREMEMBERAT\r\n";
|
||||
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
|
||||
if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr;
|
||||
|
@ -530,8 +530,14 @@ void loadServerConfigFromString(char *config) {
|
||||
} else if (strcasecmp(argv[1], "false") == 0) {
|
||||
cserver.fThreadAffinity = FALSE;
|
||||
} else {
|
||||
err = "Unknown argument: server-thread-affinity expects either true or false";
|
||||
goto loaderr;
|
||||
int offset = atoi(argv[1]);
|
||||
if (offset > 0) {
|
||||
cserver.fThreadAffinity = TRUE;
|
||||
cserver.threadAffinityOffset = offset-1;
|
||||
} else {
|
||||
err = "Unknown argument: server-thread-affinity expects either true or false";
|
||||
goto loaderr;
|
||||
}
|
||||
}
|
||||
} else if (!strcasecmp(argv[0], "active-replica") && argc == 2) {
|
||||
g_pserver->fActiveReplica = yesnotoi(argv[1]);
|
||||
|
12
src/cron.cpp
12
src/cron.cpp
@ -70,6 +70,7 @@ void cronCommand(client *c)
|
||||
decrRefCount(o);
|
||||
// use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it.
|
||||
setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval);
|
||||
++g_pserver->dirty;
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
@ -86,6 +87,7 @@ void executeCronJobExpireHook(const char *key, robj *o)
|
||||
serverAssert(cFake->argc == 0);
|
||||
|
||||
// Setup the args for the EVAL command
|
||||
cFake->cmd = lookupCommandByCString("EVAL");
|
||||
cFake->argc = 3 + job->veckeys.size() + job->vecargs.size();
|
||||
cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL);
|
||||
cFake->argv[0] = createStringObject("EVAL", 4);
|
||||
@ -96,7 +98,17 @@ void executeCronJobExpireHook(const char *key, robj *o)
|
||||
for (size_t i = 0; i < job->vecargs.size(); ++i)
|
||||
cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size());
|
||||
|
||||
int lua_replicate_backup = g_pserver->lua_always_replicate_commands;
|
||||
g_pserver->lua_always_replicate_commands = 0;
|
||||
evalCommand(cFake);
|
||||
g_pserver->lua_always_replicate_commands = lua_replicate_backup;
|
||||
|
||||
if (g_pserver->aof_state != AOF_OFF)
|
||||
feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc);
|
||||
// Active replicas do their own expiries, do not propogate
|
||||
if (!g_pserver->fActiveReplica)
|
||||
replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc);
|
||||
|
||||
resetClient(cFake);
|
||||
|
||||
robj *keyobj = createStringObject(key,sdslen(key));
|
||||
|
43
src/db.cpp
43
src/db.cpp
@ -1446,6 +1446,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) {
|
||||
* keys. */
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
robj *argv[2];
|
||||
|
||||
argv[0] = lazy ? shared.unlink : shared.del;
|
||||
@ -1463,6 +1464,48 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
decrRefCount(argv[1]);
|
||||
}
|
||||
|
||||
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
|
||||
{
|
||||
robj *argv[3];
|
||||
robj objT;
|
||||
redisCommand *cmd = nullptr;
|
||||
switch (type)
|
||||
{
|
||||
case OBJ_SET:
|
||||
argv[0] = shared.srem;
|
||||
argv[1] = key;
|
||||
argv[2] = subkey;
|
||||
cmd = cserver.sremCommand;
|
||||
break;
|
||||
|
||||
case OBJ_HASH:
|
||||
argv[0] = shared.hdel;
|
||||
argv[1] = key;
|
||||
argv[2] = subkey;
|
||||
cmd = cserver.hdelCommand;
|
||||
break;
|
||||
|
||||
case OBJ_ZSET:
|
||||
argv[0] = shared.zrem;
|
||||
argv[1] = key;
|
||||
argv[2] = subkey;
|
||||
cmd = cserver.zremCommand;
|
||||
break;
|
||||
|
||||
case OBJ_CRON:
|
||||
return; // CRON jobs replicate in their own handler
|
||||
|
||||
default:
|
||||
serverPanic("Unknown subkey type");
|
||||
}
|
||||
|
||||
if (g_pserver->aof_state != AOF_OFF)
|
||||
feedAppendOnlyFile(cmd,db->id,argv,3);
|
||||
// Active replicas do their own expiries, do not propogate
|
||||
if (!g_pserver->fActiveReplica)
|
||||
replicationFeedSlaves(g_pserver->slaves,db->id,argv,3);
|
||||
}
|
||||
|
||||
/* Check if the key is expired. Note, this does not check subexpires */
|
||||
int keyIsExpired(redisDb *db, robj *key) {
|
||||
expireEntry *pexpire = getExpire(db,key);
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "server.h"
|
||||
#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */
|
||||
#include "crc64.h"
|
||||
#include "cron.h"
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <signal.h>
|
||||
@ -251,6 +252,10 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj
|
||||
mt->digest(&md,mv->value);
|
||||
xorDigest(digest,md.x,sizeof(md.x));
|
||||
}
|
||||
} else if (o->type == OBJ_CRON) {
|
||||
cronjob *job = (cronjob*)ptrFromObj(o);
|
||||
mixDigest(digest, &job->interval, sizeof(job->interval));
|
||||
mixDigest(digest, job->script.get(), job->script.size());
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ int freeMemoryIfNeeded(void) {
|
||||
int keys_freed = 0;
|
||||
/* By default replicas should ignore maxmemory
|
||||
* and just be masters exact copies. */
|
||||
if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory) return C_OK;
|
||||
if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK;
|
||||
|
||||
size_t mem_reported, mem_tofree, mem_freed;
|
||||
mstime_t latency, eviction_latency;
|
||||
|
@ -78,6 +78,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
dictEntry *de = dictFind(db->pdict, e.key());
|
||||
robj *val = (robj*)dictGetVal(de);
|
||||
int deleted = 0;
|
||||
|
||||
robj objKey;
|
||||
initStaticStringObject(objKey, (char*)e.key());
|
||||
|
||||
while (!pfat->FEmpty())
|
||||
{
|
||||
if (pfat->nextExpireEntry().when > now)
|
||||
@ -130,6 +134,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
default:
|
||||
serverAssert(false);
|
||||
}
|
||||
|
||||
robj objSubkey;
|
||||
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
|
||||
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
|
||||
|
||||
pfat->popfrontExpireEntry();
|
||||
}
|
||||
|
||||
@ -144,22 +153,18 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
db->setexpire->insert(eT);
|
||||
}
|
||||
|
||||
robj objT;
|
||||
switch (val->type)
|
||||
{
|
||||
case OBJ_SET:
|
||||
initStaticStringObject(objT, (char*)e.key());
|
||||
signalModifiedKey(db,&objT);
|
||||
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id);
|
||||
signalModifiedKey(db,&objKey);
|
||||
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objKey,db->id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pfat->FEmpty())
|
||||
{
|
||||
robj *keyobj = createStringObject(e.key(),sdslen(e.key()));
|
||||
removeExpire(db, keyobj);
|
||||
decrRefCount(keyobj);
|
||||
removeExpire(db, &objKey);
|
||||
}
|
||||
}
|
||||
|
||||
@ -224,7 +229,8 @@ void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, lo
|
||||
}
|
||||
|
||||
setExpire(c, c->db, key, subkey, when);
|
||||
|
||||
signalModifiedKey(c->db, key);
|
||||
g_pserver->dirty++;
|
||||
addReply(c, shared.cone);
|
||||
}
|
||||
|
||||
@ -256,6 +262,14 @@ void expireMemberAtCommand(client *c)
|
||||
expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS);
|
||||
}
|
||||
|
||||
void pexpireMemberAtCommand(client *c)
|
||||
{
|
||||
long long when;
|
||||
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
|
||||
return;
|
||||
|
||||
expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_MILLISECONDS);
|
||||
}
|
||||
|
||||
/* Try to expire a few timed out keys. The algorithm used is adaptive and
|
||||
* will use few CPU cycles if there are few expiring keys, otherwise
|
||||
|
@ -286,6 +286,8 @@ void feedReplicationBacklogWithObject(robj *o) {
|
||||
feedReplicationBacklog(p,len);
|
||||
}
|
||||
|
||||
sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc);
|
||||
|
||||
void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw)
|
||||
{
|
||||
char llstr[LONG_STR_SIZE];
|
||||
@ -324,13 +326,43 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
|
||||
* are queued in the output buffer until the initial SYNC completes),
|
||||
* or are already in sync with the master. */
|
||||
|
||||
/* Add the multi bulk length. */
|
||||
addReplyArrayLenAsync(replica,argc);
|
||||
if (fSendRaw)
|
||||
{
|
||||
/* Add the multi bulk length. */
|
||||
addReplyArrayLenAsync(replica,argc);
|
||||
|
||||
/* Finally any additional argument that was not stored inside the
|
||||
* static buffer if any (from j to argc). */
|
||||
for (int j = 0; j < argc; j++)
|
||||
addReplyBulkAsync(replica,argv[j]);
|
||||
/* Finally any additional argument that was not stored inside the
|
||||
* static buffer if any (from j to argc). */
|
||||
for (int j = 0; j < argc; j++)
|
||||
addReplyBulkAsync(replica,argv[j]);
|
||||
}
|
||||
else
|
||||
{
|
||||
struct redisCommand *cmd = lookupCommand(szFromObj(argv[0]));
|
||||
sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc);
|
||||
addReplyProtoAsync(replica, buf, sdslen(buf));
|
||||
sdsfree(buf);
|
||||
}
|
||||
}
|
||||
|
||||
static int writeProtoNum(char *dst, const size_t cchdst, long long num)
|
||||
{
|
||||
if (cchdst < 1)
|
||||
return 0;
|
||||
dst[0] = '$';
|
||||
int cch = 1;
|
||||
cch += ll2string(dst + cch, cchdst - cch, digits10(num));
|
||||
int chCpyT = std::min<int>(cchdst - cch, 2);
|
||||
memcpy(dst + cch, "\r\n", chCpyT);
|
||||
cch += chCpyT;
|
||||
cch += ll2string(dst + cch, cchdst-cch, num);
|
||||
chCpyT = std::min<int>(cchdst - cch, 3);
|
||||
memcpy(dst + cch, "\r\n", chCpyT);
|
||||
if (chCpyT == 3)
|
||||
cch += 2;
|
||||
else
|
||||
cch += chCpyT;
|
||||
return cch;
|
||||
}
|
||||
|
||||
/* Propagate write commands to slaves, and populate the replication backlog
|
||||
@ -343,6 +375,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
listIter li, liReply;
|
||||
int j, len;
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
static client *fake = nullptr;
|
||||
|
||||
if (dictid < 0)
|
||||
dictid = 0; // this can happen if we send a PING before any real operation
|
||||
|
||||
@ -360,8 +394,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
/* We can't have slaves attached and no backlog. */
|
||||
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
|
||||
|
||||
client *fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar);
|
||||
fake->flags |= CLIENT_FORCE_REPLY;
|
||||
if (fake == nullptr)
|
||||
{
|
||||
fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar);
|
||||
fake->flags |= CLIENT_FORCE_REPLY;
|
||||
}
|
||||
|
||||
bool fSendRaw = !g_pserver->fActiveReplica;
|
||||
replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below
|
||||
|
||||
@ -377,24 +415,37 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
serverAssert(argc > 0);
|
||||
serverAssert(cchbuf > 0);
|
||||
|
||||
char uuid[40] = {'\0'};
|
||||
uuid_unparse(cserver.uuid, uuid);
|
||||
// The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||
// but that was much too slow
|
||||
static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$";
|
||||
char proto[1024];
|
||||
int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||
cchProto = std::min((int)sizeof(proto), cchProto);
|
||||
int cchProto = 0;
|
||||
if (!fSendRaw)
|
||||
{
|
||||
char uuid[37];
|
||||
uuid_unparse(cserver.uuid, uuid);
|
||||
|
||||
cchProto = strlen(protoRREPLAY);
|
||||
memcpy(proto, protoRREPLAY, strlen(protoRREPLAY));
|
||||
memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want
|
||||
cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf);
|
||||
memcpy(proto + cchProto, "\r\n", 3);
|
||||
cchProto += 2;
|
||||
}
|
||||
|
||||
long long master_repl_offset_start = g_pserver->master_repl_offset;
|
||||
|
||||
char szDbNum[128];
|
||||
int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid);
|
||||
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid);
|
||||
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||
int cchDbNum = 0;
|
||||
if (!fSendRaw)
|
||||
cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid);
|
||||
|
||||
|
||||
char szMvcc[128];
|
||||
incrementMvccTstamp();
|
||||
uint64_t mvccTstamp = getMvccTstamp();
|
||||
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%" PRIu64, mvccTstamp);
|
||||
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%" PRIu64 "\r\n", cchMvccNum, mvccTstamp);
|
||||
cchMvcc = std::min<int>(cchMvcc, sizeof(szMvcc)); // tricky snprintf
|
||||
int cchMvcc = 0;
|
||||
incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication
|
||||
if (!fSendRaw)
|
||||
cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp());
|
||||
|
||||
/* Write the command to the replication backlog if any. */
|
||||
if (g_pserver->repl_backlog)
|
||||
@ -483,7 +534,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
}
|
||||
}
|
||||
|
||||
freeClient(fake);
|
||||
// Cleanup cached fake client output buffers
|
||||
fake->bufpos = 0;
|
||||
fake->sentlen = 0;
|
||||
fake->reply_bytes = 0;
|
||||
listEmpty(fake->reply);
|
||||
}
|
||||
|
||||
/* This function is used in order to proxy what we receive from our master
|
||||
@ -3107,6 +3162,8 @@ void roleCommand(client *c) {
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->masters, &li);
|
||||
|
||||
if (listLength(g_pserver->masters) > 1)
|
||||
addReplyArrayLen(c,listLength(g_pserver->masters));
|
||||
while ((ln = listNext(&li)))
|
||||
{
|
||||
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||
|
@ -640,6 +640,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"expirememberat", expireMemberAtCommand, 4,
|
||||
"write fast @keyspace",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"pexpirememberat", pexpireMemberAtCommand, 4,
|
||||
"write fast @keyspace",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"pexpire",pexpireCommand,3,
|
||||
"write fast @keyspace",
|
||||
@ -2343,6 +2347,9 @@ void createSharedObjects(void) {
|
||||
shared.zpopmax = makeObjectShared("ZPOPMAX",7);
|
||||
shared.multi = makeObjectShared("MULTI",5);
|
||||
shared.exec = makeObjectShared("EXEC",4);
|
||||
shared.hdel = makeObjectShared(createStringObject("HDEL", 4));
|
||||
shared.zrem = makeObjectShared(createStringObject("ZREM", 4));
|
||||
shared.srem = makeObjectShared(createStringObject("SREM", 4));
|
||||
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
||||
shared.integers[j] =
|
||||
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
|
||||
@ -2496,6 +2503,8 @@ void initServerConfig(void) {
|
||||
cserver.xgroupCommand = lookupCommandByCString("xgroup");
|
||||
cserver.rreplayCommand = lookupCommandByCString("rreplay");
|
||||
cserver.rpoplpushCommand = lookupCommandByCString("rpoplpush");
|
||||
cserver.hdelCommand = lookupCommandByCString("hdel");
|
||||
cserver.zremCommand = lookupCommandByCString("zrem");
|
||||
|
||||
/* Debugging */
|
||||
g_pserver->assert_failed = "<no assertion failed>";
|
||||
@ -2513,6 +2522,7 @@ void initServerConfig(void) {
|
||||
/* Multithreading */
|
||||
cserver.cthreads = CONFIG_DEFAULT_THREADS;
|
||||
cserver.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY;
|
||||
cserver.threadAffinityOffset = 0;
|
||||
initConfigValues();
|
||||
}
|
||||
|
||||
@ -5515,10 +5525,10 @@ int main(int argc, char **argv) {
|
||||
#ifdef __linux__
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(iel, &cpuset);
|
||||
CPU_SET(iel + cserver.threadAffinityOffset, &cpuset);
|
||||
if (pthread_setaffinity_np(rgthread[iel], sizeof(cpu_set_t), &cpuset) == 0)
|
||||
{
|
||||
serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel);
|
||||
serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel + cserver.threadAffinityOffset + 1);
|
||||
}
|
||||
#else
|
||||
serverLog(LL_WARNING, "CPU pinning not available on this platform");
|
||||
|
10
src/server.h
10
src/server.h
@ -1343,7 +1343,7 @@ struct sharedObjectsStruct {
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
|
||||
*multi, *exec,
|
||||
*multi, *exec, *srem, *hdel, *zrem,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
*integers[OBJ_SHARED_INTEGERS],
|
||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||
@ -1577,6 +1577,7 @@ struct redisServerConst {
|
||||
|
||||
int cthreads; /* Number of main worker threads */
|
||||
int fThreadAffinity; /* Should we pin threads to cores? */
|
||||
int threadAffinityOffset = 0; /* Where should we start pinning them? */
|
||||
char *pidfile; /* PID file path */
|
||||
|
||||
/* Fast pointers to often looked up command */
|
||||
@ -1584,7 +1585,8 @@ struct redisServerConst {
|
||||
*lpopCommand, *rpopCommand, *zpopminCommand,
|
||||
*zpopmaxCommand, *sremCommand, *execCommand,
|
||||
*expireCommand, *pexpireCommand, *xclaimCommand,
|
||||
*xgroupCommand, *rreplayCommand, *rpoplpushCommand;
|
||||
*xgroupCommand, *rreplayCommand, *rpoplpushCommand,
|
||||
*hdelCommand, *zremCommand;
|
||||
|
||||
/* Configuration */
|
||||
char *default_masteruser; /* AUTH with this user and masterauth with master */
|
||||
@ -2622,6 +2624,7 @@ int removeExpire(redisDb *db, robj *key);
|
||||
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
|
||||
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey);
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy);
|
||||
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey);
|
||||
int expireIfNeeded(redisDb *db, robj *key);
|
||||
expireEntry *getExpire(redisDb *db, robj_roptr key);
|
||||
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
|
||||
@ -2759,6 +2762,8 @@ extern "C" char *redisGitDirty(void);
|
||||
extern "C" uint64_t redisBuildId(void);
|
||||
extern "C" char *redisBuildIdString(void);
|
||||
|
||||
int parseUnitString(const char *sz);
|
||||
|
||||
/* Commands prototypes */
|
||||
void authCommand(client *c);
|
||||
void pingCommand(client *c);
|
||||
@ -2837,6 +2842,7 @@ void expireCommand(client *c);
|
||||
void expireatCommand(client *c);
|
||||
void expireMemberCommand(client *c);
|
||||
void expireMemberAtCommand(client *c);
|
||||
void pexpireMemberAtCommand(client *c);
|
||||
void pexpireCommand(client *c);
|
||||
void pexpireatCommand(client *c);
|
||||
void getsetCommand(client *c);
|
||||
|
@ -290,4 +290,26 @@ tags {"aof"} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
## Test that PEXPIREMEMBERAT is loaded correctly
|
||||
create_aof {
|
||||
append_to_aof [formatCommand sadd testkey a b c d]
|
||||
append_to_aof [formatCommand pexpirememberat testkey a 1000]
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path aof-load-truncated no] {
|
||||
test "AOF+EXPIREMEMBER: Server shuold have been started" {
|
||||
assert_equal 1 [is_alive $srv]
|
||||
}
|
||||
|
||||
test "AOF+PEXPIREMEMBERAT: set should have 3 values" {
|
||||
set client [redis [dict get $srv host] [dict get $srv port]]
|
||||
wait_for_condition 50 100 {
|
||||
[catch {$client ping} e] == 0
|
||||
} else {
|
||||
fail "Loading DB is taking too much time."
|
||||
}
|
||||
assert_equal 3 [$client scard testkey]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,5 +142,93 @@ start_server {tags {"repl"}} {
|
||||
fail "SPOP replication inconsistency"
|
||||
}
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (set) command} {
|
||||
$master sadd testkey a b c d
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master scard testkey] eq 3
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave scard testkey] eq 3
|
||||
} else {
|
||||
assert_equal [$slave scard testkey] 3
|
||||
}
|
||||
$master del testkey
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (hash) command} {
|
||||
$master hset testkey a value
|
||||
$master hset testkey b value
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master hlen testkey] eq 1
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave hlen testkey] eq 1
|
||||
} else {
|
||||
assert_equal [$slave hlen testkey] 1
|
||||
}
|
||||
$master del testkey
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (zset) command} {
|
||||
$master zadd testkey 1 a
|
||||
$master zadd testkey 2 b
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master zcard testkey] eq 1
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave zcard testkey] eq 1
|
||||
} else {
|
||||
assert_equal [$slave zcard testkey] 1
|
||||
}
|
||||
}
|
||||
|
||||
test {keydb.cron replicates} {
|
||||
$master del testkey
|
||||
$master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
|
||||
after 300
|
||||
assert_equal 1 [$master get testkey]
|
||||
assert_equal 1 [$master exists testjob]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "KEYDB.CRON failed to replicate"
|
||||
}
|
||||
$master del testjob
|
||||
$master del testkey
|
||||
wait_for_condition 50 1000 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "cron delete failed to propogate"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +77,94 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||
$master flushall
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (set) command (Active)} {
|
||||
$master sadd testkey a b c d
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master scard testkey] eq 3
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave scard testkey] eq 3
|
||||
} else {
|
||||
assert_equal [$slave scard testkey] 3
|
||||
}
|
||||
$master del testkey
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (hash) command (Active)} {
|
||||
$master hset testkey a value
|
||||
$master hset testkey b value
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master hlen testkey] eq 1
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave hlen testkey] eq 1
|
||||
} else {
|
||||
assert_equal [$slave hlen testkey] 1
|
||||
}
|
||||
$master del testkey
|
||||
}
|
||||
|
||||
test {Replication of EXPIREMEMBER (zset) command (Active)} {
|
||||
$master zadd testkey 1 a
|
||||
$master zadd testkey 2 b
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "Failed to replicate set"
|
||||
}
|
||||
$master expiremember testkey a 1
|
||||
after 1000
|
||||
wait_for_condition 50 100 {
|
||||
[$master zcard testkey] eq 1
|
||||
} else {
|
||||
fail "expiremember failed to work on master"
|
||||
}
|
||||
wait_for_condition 50 100 {
|
||||
[$slave zcard testkey] eq 1
|
||||
} else {
|
||||
assert_equal [$slave zcard testkey] 1
|
||||
}
|
||||
}
|
||||
|
||||
test {keydb.cron replicates (Active) } {
|
||||
$master del testkey
|
||||
$master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
|
||||
after 300
|
||||
assert_equal 1 [$master get testkey]
|
||||
assert_equal 1 [$master exists testjob]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "KEYDB.CRON failed to replicate"
|
||||
}
|
||||
$master del testjob
|
||||
$master del testkey
|
||||
wait_for_condition 50 1000 {
|
||||
[$master debug digest] eq [$slave debug digest]
|
||||
} else {
|
||||
fail "cron delete failed to propogate"
|
||||
}
|
||||
}
|
||||
|
||||
test {Active replicas WAIT} {
|
||||
# Test that wait succeeds since replicas should be syncronized
|
||||
$master set testkey foo
|
||||
@ -113,37 +201,38 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||
assert_equal {0} [$slave del testkey1]
|
||||
}
|
||||
|
||||
test {Active replica expire propogates when source is down} {
|
||||
$slave flushall
|
||||
$slave set testkey2 foo
|
||||
$slave set testkey1 foo
|
||||
wait_for_condition 50 1000 {
|
||||
[string match *foo* [$master get testkey1]]
|
||||
} else {
|
||||
fail "Replication failed to propogate"
|
||||
}
|
||||
$slave expire testkey1 2
|
||||
assert_equal {1} [$slave wait 1 500] { "value should propogate
|
||||
within 0.5 seconds" }
|
||||
exec kill -SIGSTOP $slave_pid
|
||||
test {Active replica expire propogates when source is down} {
|
||||
$slave flushall
|
||||
$slave set testkey2 foo
|
||||
$slave set testkey1 foo
|
||||
wait_for_condition 50 1000 {
|
||||
[string match *foo* [$master get testkey1]]
|
||||
} else {
|
||||
fail "Replication failed to propogate"
|
||||
}
|
||||
$slave expire testkey1 2
|
||||
assert_equal {1} [$slave wait 1 500] { "value should propogate
|
||||
within 0.5 seconds" }
|
||||
exec kill -SIGSTOP $slave_pid
|
||||
after 3000
|
||||
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
|
||||
# about what is actually in the dict. The only way to know is with a count from info
|
||||
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
|
||||
# about what is actually in the dict. The only way to know is with a count from info
|
||||
assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"}
|
||||
}
|
||||
exec kill -SIGCONT $slave_pid
|
||||
}
|
||||
|
||||
test {Active replica different databases} {
|
||||
$master select 3
|
||||
$master set testkey abcd
|
||||
$master select 2
|
||||
$master del testkey
|
||||
$slave select 3
|
||||
wait_for_condition 50 1000 {
|
||||
[string match abcd [$slave get testkey]]
|
||||
} else {
|
||||
fail "Replication failed to propogate DB 3"
|
||||
}
|
||||
exec kill -SIGCONT $slave_pid
|
||||
|
||||
test {Active replica different databases} {
|
||||
$master select 3
|
||||
$master set testkey abcd
|
||||
$master select 2
|
||||
$master del testkey
|
||||
$slave select 3
|
||||
wait_for_condition 50 1000 {
|
||||
[string match abcd [$slave get testkey]]
|
||||
} else {
|
||||
fail "Replication failed to propogate DB 3"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user