diff --git a/src/ae.cpp b/src/ae.cpp index b7b33e057..a23fb9dc8 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) diff --git a/src/aof.cpp b/src/aof.cpp index 8db2d4af2..291685e5c 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,14 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -346,6 +354,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is @@ -1584,16 +1595,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index 307ca100c..1e7da2c4b 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -296,6 +296,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -624,9 +633,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->conn) { diff --git a/src/config.cpp b/src/config.cpp index 0fb2ea60a..442309a82 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -540,6 +540,11 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"enable-pro")) { cserver.fUsePro = true; break; + } else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) { + cserver.thread_min_client_threshold = atoi(argv[1]); + if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) { + err = "min-thread-client must be between 0 and 400"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/multi.cpp b/src/multi.cpp index b4ef7c538..5c38f194f 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -174,7 +175,7 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } @@ -190,7 +191,10 @@ void execCommand(client *c) { "no permission to execute the command or subcommand" : "no permission to touch the specified keys"); } else { - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); } /* Commands may alter argc/argv, restore mstate. */ diff --git a/src/networking.cpp b/src/networking.cpp index 746e48628..32bebbeb7 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1035,6 +1035,33 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +int chooseBestThreadForAccept() +{ + listIter li; + listNode *ln; + int rgcclients[MAX_EVENT_LOOPS] = {0}; + + listRewind(g_pserver->clients, &li); + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->iel < 0) + continue; + + rgcclients[c->iel]++; + } + + int ielMinLoad = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (rgcclients[iel] < cserver.thread_min_client_threshold) + return iel; + if (rgcclients[iel] < rgcclients[ielMinLoad]) + ielMinLoad = iel; + } + return ielMinLoad; +} + void clientAcceptHandler(connection *conn) { client *c = (client*)connGetPrivateData(conn); @@ -1176,7 +1203,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur); @@ -1193,10 +1235,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1240,7 +1287,24 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); + + aeAcquireLock(); + int ielTarget = rand() % cserver.cthreads; + if (ielTarget == iel) + { + LLocalThread: + acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); + } + else + { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget); + }); + if (res != AE_OK) + goto LLocalThread; + } + aeReleaseLock(); + } } @@ -2616,7 +2680,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } diff --git a/src/replication.cpp b/src/replication.cpp index b6d988dae..3fe5d9e9e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -43,6 +43,8 @@ #include #include #include +#include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); @@ -354,6 +356,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -437,6 +440,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -1448,7 +1452,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) while ((ln = listNext(&li))) { if (listNodeValue(ln) == replica) { fFound = true; - break; + break; } } if (!fFound) @@ -2808,6 +2812,8 @@ void freeMasterInfo(redisMaster *mi) zfree(mi->masteruser); if (mi->repl_transfer_tmpfile) zfree(mi->repl_transfer_tmpfile); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; if (mi->cached_master != nullptr) freeClientAsync(mi->cached_master); @@ -2886,6 +2892,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3760,14 +3771,33 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local ReplicaNestState *s_pstate = nullptr; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) s_pstate = new (MALLOC_LOCAL) ReplicaNestState; @@ -3791,9 +3821,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3829,7 +3860,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3839,33 +3870,57 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(nullptr, c->iel); + if (cFake == nullptr) + cFake = createClient(nullptr, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + if (mvcc > g_mapmvcc[uuid]) + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index 722ef467d..93cccd363 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1698,7 +1698,7 @@ void clientsCron(int iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock if (clientsCronResizeQueryBuffer(c)) goto LContinue; if (clientsCronTrackExpansiveClients(c)) goto LContinue; LContinue: @@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", diff --git a/src/server.h b/src/server.h index 8d976253d..c106bba8a 100644 --- a/src/server.h +++ b/src/server.h @@ -1519,6 +1519,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + bool fRetrySetAofEvent = false; }; struct redisMaster { @@ -1528,6 +1529,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ @@ -1598,6 +1601,7 @@ struct redisServerConst { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ bool fUsePro = false; + int thread_min_client_threshold = 50; }; struct redisServer { diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 4f37f2adf..0c28eb85d 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl new file mode 100644 index 000000000..e5e77fdad --- /dev/null +++ b/tests/integration/replication-multimaster.tcl @@ -0,0 +1,74 @@ +foreach topology {mesh ring} { +start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { + + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Initialize as mesh + if [string equal $topology "mesh"] { + for {set j 0} {$j < 4} {incr j} { + for {set k 0} {$k < 4} {incr k} { + if $j!=$k { + $R($j) replicaof $R_host($k) $R_port($k) + after 100 + } + } + }} + #Else Ring + if [string equal $topology "ring"] { + $R(0) replicaof $R_host(3) $R_port(3) + after 100 + $R(1) replicaof $R_host(0) $R_port(0) + after 100 + $R(2) replicaof $R_host(1) $R_port(1) + after 100 + $R(3) replicaof $R_host(2) $R_port(2) + } + + after 2000 + + test "$topology replicates to all nodes" { + $R(0) set testkey foo + after 500 + assert_equal foo [$R(1) get testkey] "replicates to 1" + assert_equal foo [$R(2) get testkey] "replicates to 2" + } + + test "$topology replicates only once" { + $R(0) set testkey 1 + after 500 + $R(1) incr testkey + after 500 + $R(2) incr testkey + after 500 + assert_equal 3 [$R(0) get testkey] + assert_equal 3 [$R(1) get testkey] + assert_equal 3 [$R(2) get testkey] + assert_equal 3 [$R(3) get testkey] + } + + test "$topology transaction replicates only once" { + for {set j 0} {$j < 1000} {incr j} { + $R(0) set testkey 1 + $R(0) multi + $R(0) incr testkey + $R(0) incr testkey + $R(0) exec + after 1 + assert_equal 3 [$R(0) get testkey] "node 0" + assert_equal 3 [$R(1) get testkey] "node 1" + assert_equal 3 [$R(2) get testkey] "node 2" + assert_equal 3 [$R(3) get testkey] "node 3" + } + } +} +} +} +} +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index bb8a694f1..e83f3657a 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -45,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2fd1d3714..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master flagonly + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { reconnect test {RREPLAY db different} { - r debug force-master flagonly + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2