RREPLAY command now takes a DB argument

Former-commit-id: 6e1e5bd08b59f8ad4653621a6c01fcf3a76f0692
This commit is contained in:
John Sully 2019-09-28 14:59:44 -04:00
parent b6b6bb0488
commit 2c6fdf0f4e
5 changed files with 69 additions and 3 deletions

View File

@ -682,10 +682,12 @@ NULL
changeReplicationId();
clearReplicationId2();
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2)
{
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
stringmatchlen_fuzz_test();
addReplyStatus(c,"Apparently Redis did not crash: test passed");
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) {
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
addReply(c, shared.ok);
} else {
addReplySubcommandSyntaxError(c);
return;

View File

@ -323,9 +323,13 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
char uuid[40] = {'\0'};
uuid_unparse(cserver.uuid, uuid);
char proto[1024];
int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
cchProto = std::min((int)sizeof(proto), cchProto);
long long master_repl_offset_start = g_pserver->master_repl_offset;
serverAssert(dictid >= 0);
char szDbNum[128];
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid);
/* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog)
@ -368,6 +372,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
const char *crlf = "\r\n";
feedReplicationBacklog(crlf, 2);
feedReplicationBacklog(szDbNum, cchDbNum);
}
}
@ -396,7 +401,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
addReplyProtoAsync(slave, reply->buf(), reply->used);
}
if (!fSendRaw)
{
addReplyAsync(slave,shared.crlf);
addReplyProtoAsync(slave, szDbNum, cchDbNum);
}
}
freeClient(fake);
@ -3266,6 +3274,7 @@ void replicaReplayCommand(client *c)
// the replay command contains two arguments:
// 1: The UUID of the source
// 2: The raw command buffer to be replayed
// 3: (OPTIONAL) the database ID the command should apply to
if (!(c->flags & CLIENT_MASTER))
{
@ -3298,6 +3307,17 @@ void replicaReplayCommand(client *c)
return;
}
if (c->argc >= 4)
{
long long db;
if (getLongLongFromObject(c->argv[3], &db) != C_OK || db >= cserver.dbnum || selectDb(c, (int)db) != C_OK)
{
addReplyError(c, "Invalid database ID");
s_pstate->Cancel();
return;
}
}
if (FSameUuidNoNil(uuid, cserver.uuid))
{
addReply(c, shared.ok);

View File

@ -93,5 +93,18 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
assert_equal {0} [$master del testkey1]
assert_equal {0} [$slave del testkey1]
}
test {Active replica different databases} {
$master select 3
$master set testkey abcd
$master select 2
$master del testkey
$slave select 3
wait_for_condition 50 1000 {
[string match abcd [$slave get testkey]]
} else {
fail "Replication failed to propogate DB 3"
}
}
}
}

View File

@ -35,6 +35,7 @@ set ::all_tests {
unit/quit
unit/aofrw
unit/acl
unit/rreplay
integration/block-repl
integration/replication
integration/replication-2

30
tests/unit/rreplay.tcl Normal file
View File

@ -0,0 +1,30 @@
start_server {tags {"rreplay"}} {
test {RREPLAY use current db} {
r debug force-master
r select 4
r set dbnum invalid
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
r get dbnum
} {four}
reconnect
test {RREPLAY db different} {
r debug force-master
r select 4
r set testkey four
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
r select 4
assert { [r get testkey] == "four" }
r select 2
r get testkey
} {bebe}
reconnect
test {RREPLAY not master} {
assert_error "*master*" {r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2}
}
r flushdb
}