Dual channel replication (#60)
In this PR we introduce the main benefit of dual channel replication by continuously steaming the COB (client output buffers) in parallel to the RDB and thus keeping the primary's side COB small AND accelerating the overall sync process. By streaming the replication data to the replica during the full sync, we reduce 1. Memory load from the primary's node. 2. CPU load from the primary's main process. [Latest performance tests](#data) ## Motivation * Reduce primary memory load. We do that by moving the COB tracking to the replica side. This also decrease the chance for COB overruns. Note that primary's input buffer limits at the replica side are less restricted then primary's COB as the replica plays less critical part in the replication group. While increasing the primary’s COB may end up with primary reaching swap and clients suffering, at replica side we’re more at ease with it. Larger COB means better chance to sync successfully. * Reduce primary main process CPU load. By opening a new, dedicated connection for the RDB transfer, child processes can have direct access to the new connection. Due to TLS connection restrictions, this was not possible using one main connection. We eliminate the need for the child process to use the primary's child-proc -> main-proc pipeline, thus freeing up the main process to process clients queries. ## Dual Channel Replication high level interface design - Dual channel replication begins when the replica sends a `REPLCONF CAPA DUALCHANNEL` to the primary during initial handshake. This is used to state that the replica is capable of dual channel sync and that this is the replica's main channel, which is not used for snapshot transfer. - When replica lacks sufficient data for PSYNC, the primary will send `-FULLSYNCNEEDED` response instead of RDB data. As a next step, the replica creates a new connection (rdb-channel) and configures it against the primary with the appropriate capabilities and requirements. The replica then requests a sync using the RDB channel. - Prior to forking, the primary sends the replica the snapshot's end repl-offset, and attaches the replica to the replication backlog to keep repl data until the replica requests psync. The replica uses the main channel to request a PSYNC starting at the snapshot end offset. - The primary main threads sends incremental changes via the main channel, while the bgsave process sends the RDB directly to the replica via the rdb-channel. As for the replica, the incremental changes are stored on a local buffer, while the RDB is loaded into memory. - Once the replica completes loading the rdb, it drops the rdb-connection and streams the accumulated incremental changes into memory. Repl steady state continues normally. ## New replica state machine  ## Data <a name="data"></a>    ## Explanation These graphs demonstrate performance improvements during full sync sessions using rdb-channel + streaming rdb directly from the background process to the replica. First graph- with at most 50 clients and light weight commands, we saw 5%-7.5% improvement in write latency during sync session. Two graphs below- full sync was tested during heavy read commands from the primary (such as sdiff, sunion on large sets). In that case, the child process writes to the replica without sharing CPU with the loaded main process. As a result, this not only improves client response time, but may also shorten sync time by about 50%. The shorter sync time results in less memory being used to store replication diffs (>60% in some of the tested cases). ## Test setup Both primary and replica in the performance tests ran on the same machine. RDB size in all tests is 3.7gb. I generated write load using valkey-benchmark ` ./valkey-benchmark -r 100000 -n 6000000 lpush my_list __rand_int__`. --------- Signed-off-by: naglera <anagler123@gmail.com> Signed-off-by: naglera <58042354+naglera@users.noreply.github.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech> Co-authored-by: Ping Xie <pingxie@outlook.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
parent
66d0f7d9a1
commit
ff6b780fe6
@ -3074,6 +3074,7 @@ standardConfig static_configs[] = {
|
||||
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush, 0, NULL, NULL),
|
||||
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
|
||||
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
|
||||
createBoolConfig("dual-channel-replication-enabled", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | PROTECTED_CONFIG, server.dual_channel_replication, 0, NULL, NULL),
|
||||
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
|
||||
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
|
||||
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
|
||||
|
15
src/debug.c
15
src/debug.c
@ -495,6 +495,10 @@ void debugCommand(client *c) {
|
||||
" In case RESET is provided the peak reset time will be restored to the default value",
|
||||
"REPLYBUFFER RESIZING <0|1>",
|
||||
" Enable or disable the reply buffer resize cron job",
|
||||
"SLEEP-AFTER-FORK-SECONDS <seconds>",
|
||||
" Stop the server's main process for <seconds> after forking.",
|
||||
"DELAY-RDB-CLIENT-FREE-SECOND <seconds>",
|
||||
" Grace period in seconds for replica main channel to establish psync.",
|
||||
"DICT-RESIZING <0|1>",
|
||||
" Enable or disable the main dict and expire dict resizing.",
|
||||
NULL};
|
||||
@ -991,6 +995,17 @@ void debugCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "sleep-after-fork-seconds") && c->argc == 3) {
|
||||
double sleep_after_fork_seconds;
|
||||
if (getDoubleFromObjectOrReply(c, c->argv[2], &sleep_after_fork_seconds, NULL) != C_OK) {
|
||||
addReply(c, shared.err);
|
||||
return;
|
||||
}
|
||||
server.debug_sleep_after_fork_us = (int)(sleep_after_fork_seconds * 1e6);
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "delay-rdb-client-free-seconds") && c->argc == 3) {
|
||||
server.wait_before_rdb_client_free = atoi(c->argv[2]->ptr);
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
|
||||
server.dict_resizing = atoi(c->argv[2]->ptr);
|
||||
addReply(c, shared.ok);
|
||||
|
@ -119,6 +119,11 @@ int authRequired(client *c) {
|
||||
return auth_required;
|
||||
}
|
||||
|
||||
static inline int isReplicaReadyForReplData(client *replica) {
|
||||
return (replica->repl_state == REPLICA_STATE_ONLINE || replica->repl_state == REPLICA_STATE_BG_RDB_LOAD) &&
|
||||
!(replica->flag.close_asap);
|
||||
}
|
||||
|
||||
client *createClient(connection *conn) {
|
||||
client *c = zmalloc(sizeof(client));
|
||||
|
||||
@ -189,6 +194,8 @@ client *createClient(connection *conn) {
|
||||
c->replica_version = 0;
|
||||
c->replica_capa = REPLICA_CAPA_NONE;
|
||||
c->replica_req = REPLICA_REQ_NONE;
|
||||
c->associated_rdb_client_id = 0;
|
||||
c->rdb_client_disconnect_time = 0;
|
||||
c->reply = listCreate();
|
||||
c->deferred_reply_errors = NULL;
|
||||
c->reply_bytes = 0;
|
||||
@ -255,8 +262,8 @@ void putClientInPendingWriteQueue(client *c) {
|
||||
/* Schedule the client to write the output buffers to the socket only
|
||||
* if not already done and, for replicas, if the replica can actually receive
|
||||
* writes at this stage. */
|
||||
if (!c->flag.pending_write && (c->repl_state == REPL_STATE_NONE ||
|
||||
(c->repl_state == REPLICA_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) {
|
||||
if (!c->flag.pending_write &&
|
||||
(c->repl_state == REPL_STATE_NONE || (isReplicaReadyForReplData(c) && !c->repl_start_cmd_stream_on_ack))) {
|
||||
/* Here instead of installing the write handler, we just flag the
|
||||
* client and put it into a list of clients that have something
|
||||
* to write to the socket. This way before re-entering the event
|
||||
@ -1598,7 +1605,7 @@ void freeClient(client *c) {
|
||||
|
||||
/* If a client is protected, yet we need to free it right now, make sure
|
||||
* to at least use asynchronous freeing. */
|
||||
if (c->flag.protected) {
|
||||
if (c->flag.protected || c->flag.protected_rdb_channel) {
|
||||
freeClientAsync(c);
|
||||
return;
|
||||
}
|
||||
@ -1644,7 +1651,10 @@ void freeClient(client *c) {
|
||||
|
||||
/* Log link disconnection with replica */
|
||||
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
|
||||
serverLog(LL_NOTICE, "Connection with replica %s lost.", replicationGetReplicaName(c));
|
||||
serverLog(LL_NOTICE,
|
||||
c->flag.repl_rdb_channel ? "Replica %s rdb channel disconnected."
|
||||
: "Connection with replica %s lost.",
|
||||
replicationGetReplicaName(c));
|
||||
}
|
||||
|
||||
/* Free the query buffer */
|
||||
@ -1874,6 +1884,26 @@ int freeClientsInAsyncFreeQueue(void) {
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = listNodeValue(ln);
|
||||
|
||||
if (c->flag.protected_rdb_channel) {
|
||||
/* Check if it's safe to remove RDB connection protection during synchronization
|
||||
* The primary gives a grace period before freeing this client because
|
||||
* it serves as a reference to the first required replication data block for
|
||||
* this replica */
|
||||
if (!c->rdb_client_disconnect_time) {
|
||||
c->rdb_client_disconnect_time = server.unixtime;
|
||||
serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id,
|
||||
replicationGetReplicaName(c), server.wait_before_rdb_client_free);
|
||||
continue;
|
||||
}
|
||||
if (server.unixtime - c->rdb_client_disconnect_time > server.wait_before_rdb_client_free) {
|
||||
serverLog(LL_NOTICE,
|
||||
"Replica main channel failed to establish PSYNC within the grace period (%ld seconds). "
|
||||
"Freeing RDB client %llu.",
|
||||
(long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id);
|
||||
c->flag.protected_rdb_channel = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (c->flag.protected) continue;
|
||||
|
||||
c->flag.close_asap = 0;
|
||||
@ -2984,6 +3014,10 @@ int processInputBuffer(client *c) {
|
||||
void readToQueryBuf(client *c) {
|
||||
int big_arg = 0;
|
||||
size_t qblen, readlen;
|
||||
|
||||
/* If the replica RDB client is marked as closed ASAP, do not try to read from it */
|
||||
if (c->flag.close_asap) return;
|
||||
|
||||
int is_primary = c->read_flags & READ_FLAGS_PRIMARY;
|
||||
|
||||
readlen = PROTO_IOBUF_LEN;
|
||||
@ -4289,9 +4323,13 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
|
||||
serverAssert(c->reply_bytes < SIZE_MAX - (1024 * 64));
|
||||
/* Note that c->reply_bytes is irrelevant for replica clients
|
||||
* (they use the global repl buffers). */
|
||||
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || c->flag.close_asap) return 0;
|
||||
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) ||
|
||||
(c->flag.close_asap && !(c->flag.protected_rdb_channel)))
|
||||
return 0;
|
||||
if (checkClientOutputBufferLimits(c)) {
|
||||
sds client = catClientInfoString(sdsempty(), c);
|
||||
/* Remove RDB connection protection on COB overrun */
|
||||
c->flag.protected_rdb_channel = 0;
|
||||
|
||||
if (async) {
|
||||
freeClientAsync(c);
|
||||
@ -4335,7 +4373,7 @@ void flushReplicasOutputBuffers(void) {
|
||||
*
|
||||
* 3. Obviously if the replica is not ONLINE.
|
||||
*/
|
||||
if (replica->repl_state == REPLICA_STATE_ONLINE && !(replica->flag.close_asap) && can_receive_writes &&
|
||||
if (isReplicaReadyForReplData(replica) && !(replica->flag.close_asap) && can_receive_writes &&
|
||||
!replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) {
|
||||
writeToClient(replica);
|
||||
}
|
||||
|
125
src/rdb.c
125
src/rdb.c
@ -3451,12 +3451,15 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
|
||||
serverLog(LL_NOTICE, "Background RDB transfer terminated with success");
|
||||
} else if (!bysignal && exitcode != 0) {
|
||||
serverLog(LL_WARNING, "Background transfer error");
|
||||
server.lastbgsave_status = C_ERR;
|
||||
} else {
|
||||
serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal);
|
||||
}
|
||||
if (server.rdb_child_exit_pipe != -1) close(server.rdb_child_exit_pipe);
|
||||
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
|
||||
close(server.rdb_pipe_read);
|
||||
if (server.rdb_pipe_read > 0) {
|
||||
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
|
||||
close(server.rdb_pipe_read);
|
||||
}
|
||||
server.rdb_child_exit_pipe = -1;
|
||||
server.rdb_pipe_read = -1;
|
||||
zfree(server.rdb_pipe_conns);
|
||||
@ -3507,43 +3510,65 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
|
||||
listIter li;
|
||||
pid_t childpid;
|
||||
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
|
||||
int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL);
|
||||
|
||||
if (hasActiveChildProcess()) return C_ERR;
|
||||
serverAssert(server.rdb_pipe_read == -1 && server.rdb_child_exit_pipe == -1);
|
||||
|
||||
/* Even if the previous fork child exited, don't start a new one until we
|
||||
* drained the pipe. */
|
||||
if (server.rdb_pipe_conns) return C_ERR;
|
||||
|
||||
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
|
||||
* the parent, we can't let it write directly to the sockets, since in case
|
||||
* of TLS we must let the parent handle a continuous TLS state when the
|
||||
* child terminates and parent takes over. */
|
||||
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
|
||||
server.rdb_pipe_read = pipefds[0]; /* read end */
|
||||
rdb_pipe_write = pipefds[1]; /* write end */
|
||||
if (!dual_channel) {
|
||||
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
|
||||
* the parent, we can't let it write directly to the sockets, since in case
|
||||
* of TLS we must let the parent handle a continuous TLS state when the
|
||||
* child terminates and parent takes over. */
|
||||
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
|
||||
server.rdb_pipe_read = pipefds[0]; /* read end */
|
||||
rdb_pipe_write = pipefds[1]; /* write end */
|
||||
|
||||
/* create another pipe that is used by the parent to signal to the child
|
||||
* that it can exit. */
|
||||
if (anetPipe(pipefds, 0, 0) == -1) {
|
||||
close(rdb_pipe_write);
|
||||
close(server.rdb_pipe_read);
|
||||
return C_ERR;
|
||||
/* create another pipe that is used by the parent to signal to the child
|
||||
* that it can exit. */
|
||||
if (anetPipe(pipefds, 0, 0) == -1) {
|
||||
close(rdb_pipe_write);
|
||||
close(server.rdb_pipe_read);
|
||||
return C_ERR;
|
||||
}
|
||||
safe_to_exit_pipe = pipefds[0]; /* read end */
|
||||
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
|
||||
}
|
||||
safe_to_exit_pipe = pipefds[0]; /* read end */
|
||||
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
|
||||
|
||||
/* Collect the connections of the replicas we want to transfer
|
||||
* the RDB to, which are i WAIT_BGSAVE_START state. */
|
||||
server.rdb_pipe_conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
|
||||
server.rdb_pipe_numconns = 0;
|
||||
server.rdb_pipe_numconns_writing = 0;
|
||||
int connsnum = 0;
|
||||
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
|
||||
server.rdb_pipe_conns = NULL;
|
||||
if (!dual_channel) {
|
||||
server.rdb_pipe_conns = conns;
|
||||
server.rdb_pipe_numconns = 0;
|
||||
server.rdb_pipe_numconns_writing = 0;
|
||||
}
|
||||
/* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */
|
||||
listRewind(server.replicas, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = ln->value;
|
||||
if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
|
||||
/* Check replica has the exact requirements */
|
||||
if (replica->replica_req != req) continue;
|
||||
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = replica->conn;
|
||||
|
||||
conns[connsnum++] = replica->conn;
|
||||
if (dual_channel) {
|
||||
/* Put the socket in blocking mode to simplify RDB transfer. */
|
||||
connBlock(replica->conn);
|
||||
connSendTimeout(replica->conn, server.repl_timeout * 1000);
|
||||
/* This replica uses diskless dual channel sync, hence we need
|
||||
* to inform it with the save end offset.*/
|
||||
sendCurrentOffsetToReplica(replica);
|
||||
/* Make sure repl traffic is appended to the replication backlog */
|
||||
addRdbReplicaToPsyncWait(replica);
|
||||
} else {
|
||||
server.rdb_pipe_numconns++;
|
||||
}
|
||||
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
|
||||
}
|
||||
}
|
||||
@ -3553,12 +3578,15 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
|
||||
/* Child */
|
||||
int retval, dummy;
|
||||
rio rdb;
|
||||
|
||||
rioInitWithFd(&rdb, rdb_pipe_write);
|
||||
if (dual_channel) {
|
||||
rioInitWithConnset(&rdb, conns, connsnum);
|
||||
} else {
|
||||
rioInitWithFd(&rdb, rdb_pipe_write);
|
||||
}
|
||||
|
||||
/* Close the reading part, so that if the parent crashes, the child will
|
||||
* get a write error and exit. */
|
||||
close(server.rdb_pipe_read);
|
||||
if (!dual_channel) close(server.rdb_pipe_read);
|
||||
if (strstr(server.exec_argv[0], "redis-server") != NULL) {
|
||||
serverSetProcTitle("redis-rdb-to-slaves");
|
||||
} else {
|
||||
@ -3572,14 +3600,18 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
|
||||
if (retval == C_OK) {
|
||||
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
|
||||
}
|
||||
|
||||
rioFreeFd(&rdb);
|
||||
if (dual_channel) {
|
||||
rioFreeConnset(&rdb);
|
||||
} else {
|
||||
rioFreeFd(&rdb);
|
||||
close(rdb_pipe_write);
|
||||
}
|
||||
zfree(conns);
|
||||
/* wake up the reader, tell it we're done. */
|
||||
close(rdb_pipe_write);
|
||||
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
|
||||
/* hold exit until the parent tells us it's safe. we're not expecting
|
||||
* to read anything, just get the error when the pipe is closed. */
|
||||
dummy = read(safe_to_exit_pipe, pipefds, 1);
|
||||
if (!dual_channel) dummy = read(safe_to_exit_pipe, pipefds, 1);
|
||||
UNUSED(dummy);
|
||||
exitFromChild((retval == C_OK) ? 0 : 1);
|
||||
} else {
|
||||
@ -3597,23 +3629,36 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
|
||||
replica->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
|
||||
}
|
||||
}
|
||||
close(rdb_pipe_write);
|
||||
close(server.rdb_pipe_read);
|
||||
if (!dual_channel) {
|
||||
close(rdb_pipe_write);
|
||||
close(server.rdb_pipe_read);
|
||||
}
|
||||
close(server.rdb_child_exit_pipe);
|
||||
zfree(server.rdb_pipe_conns);
|
||||
server.rdb_pipe_conns = NULL;
|
||||
server.rdb_pipe_numconns = 0;
|
||||
server.rdb_pipe_numconns_writing = 0;
|
||||
zfree(conns);
|
||||
if (dual_channel) {
|
||||
closeChildInfoPipe();
|
||||
} else {
|
||||
server.rdb_pipe_conns = NULL;
|
||||
server.rdb_pipe_numconns = 0;
|
||||
server.rdb_pipe_numconns_writing = 0;
|
||||
}
|
||||
} else {
|
||||
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld", (long)childpid);
|
||||
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
|
||||
dual_channel ? "direct socket to replica" : "pipe through parent process");
|
||||
server.rdb_save_time_start = time(NULL);
|
||||
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
|
||||
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
|
||||
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) == AE_ERR) {
|
||||
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
|
||||
if (dual_channel) {
|
||||
/* For dual channel sync, the main process no longer requires these RDB connections. */
|
||||
zfree(conns);
|
||||
} else {
|
||||
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
|
||||
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) ==
|
||||
AE_ERR) {
|
||||
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
|
||||
}
|
||||
}
|
||||
}
|
||||
close(safe_to_exit_pipe);
|
||||
if (!dual_channel) close(safe_to_exit_pipe);
|
||||
return (childpid == -1) ? C_ERR : C_OK;
|
||||
}
|
||||
return C_OK; /* Unreached. */
|
||||
|
File diff suppressed because it is too large
Load Diff
126
src/rio.c
126
src/rio.c
@ -55,6 +55,7 @@
|
||||
#include "crc64.h"
|
||||
#include "config.h"
|
||||
#include "server.h"
|
||||
#include "connhelpers.h"
|
||||
|
||||
/* ------------------------- Buffer I/O implementation ----------------------- */
|
||||
|
||||
@ -496,3 +497,128 @@ size_t rioWriteBulkDouble(rio *r, double d) {
|
||||
dbuf[dlen] = '\0';
|
||||
return rioWriteBulkString(r, dbuf, dlen);
|
||||
}
|
||||
|
||||
/* Returns 1 or 0 for success/failure.
|
||||
* The function returns success as long as we are able to correctly write
|
||||
* to at least one file descriptor.
|
||||
*
|
||||
* When buf is NULL and len is 0, the function performs a flush operation
|
||||
* if there is some pending buffer, so this function is also used in order
|
||||
* to implement rioConnsetFlush(). */
|
||||
static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) {
|
||||
ssize_t retval;
|
||||
int j;
|
||||
unsigned char *p = (unsigned char *)buf;
|
||||
int doflush = (buf == NULL && len == 0);
|
||||
|
||||
/* To start we always append to our buffer. If it gets larger than
|
||||
* a given size, we actually write to the sockets. */
|
||||
if (len) {
|
||||
r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len);
|
||||
len = 0; /* Prevent entering the while below if we don't flush. */
|
||||
if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1;
|
||||
}
|
||||
|
||||
if (doflush) {
|
||||
p = (unsigned char *)r->io.connset.buf;
|
||||
len = sdslen(r->io.connset.buf);
|
||||
}
|
||||
|
||||
/* Write in little chunchs so that when there are big writes we
|
||||
* parallelize while the kernel is sending data in background to
|
||||
* the TCP socket. */
|
||||
while (len) {
|
||||
size_t count = len < 1024 ? len : 1024;
|
||||
int broken = 0;
|
||||
for (j = 0; j < r->io.connset.numconns; j++) {
|
||||
if (r->io.connset.state[j] != 0) {
|
||||
/* Skip FDs already in error. */
|
||||
broken++;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Make sure to write 'count' bytes to the socket regardless
|
||||
* of short writes. */
|
||||
size_t nwritten = 0;
|
||||
while (nwritten != count) {
|
||||
retval = connWrite(r->io.connset.conns[j], p + nwritten, count - nwritten);
|
||||
if (retval <= 0) {
|
||||
/* With blocking sockets, which is the sole user of this
|
||||
* rio target, EWOULDBLOCK is returned only because of
|
||||
* the SO_SNDTIMEO socket option, so we translate the error
|
||||
* into one more recognizable by the user. */
|
||||
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
|
||||
break;
|
||||
}
|
||||
nwritten += retval;
|
||||
}
|
||||
|
||||
if (nwritten != count) {
|
||||
/* Mark this FD as broken. */
|
||||
r->io.connset.state[j] = errno;
|
||||
if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO;
|
||||
}
|
||||
}
|
||||
if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */
|
||||
p += count;
|
||||
len -= count;
|
||||
r->io.connset.pos += count;
|
||||
}
|
||||
|
||||
if (doflush) sdsclear(r->io.connset.buf);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Returns 1 or 0 for success/failure. */
|
||||
static size_t rioConnsetRead(rio *r, void *buf, size_t len) {
|
||||
UNUSED(r);
|
||||
UNUSED(buf);
|
||||
UNUSED(len);
|
||||
return 0; /* Error, this target does not support reading. */
|
||||
}
|
||||
|
||||
/* Returns read/write position in file. */
|
||||
static off_t rioConnsetTell(rio *r) {
|
||||
return r->io.connset.pos;
|
||||
}
|
||||
|
||||
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||
* and 0 on failures. */
|
||||
static int rioConnsetFlush(rio *r) {
|
||||
/* Our flush is implemented by the write method, that recognizes a
|
||||
* buffer set to NULL with a count of zero as a flush request. */
|
||||
return rioConnsetWrite(r, NULL, 0);
|
||||
}
|
||||
|
||||
static const rio rioConnsetIO = {
|
||||
rioConnsetRead,
|
||||
rioConnsetWrite,
|
||||
rioConnsetTell,
|
||||
rioConnsetFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* flags */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{{NULL, 0}} /* union for io-specific vars */
|
||||
};
|
||||
|
||||
void rioInitWithConnset(rio *r, connection **conns, int numconns) {
|
||||
*r = rioConnsetIO;
|
||||
r->io.connset.conns = zmalloc(sizeof(connection *) * numconns);
|
||||
r->io.connset.state = zmalloc(sizeof(int) * numconns);
|
||||
for (int i = 0; i < numconns; i++) {
|
||||
r->io.connset.conns[i] = conns[i];
|
||||
r->io.connset.state[i] = 0;
|
||||
}
|
||||
r->io.connset.numconns = numconns;
|
||||
r->io.connset.pos = 0;
|
||||
r->io.connset.buf = sdsempty();
|
||||
}
|
||||
|
||||
/* release the rio stream. */
|
||||
void rioFreeConnset(rio *r) {
|
||||
zfree(r->io.connset.conns);
|
||||
zfree(r->io.connset.state);
|
||||
sdsfree(r->io.connset.buf);
|
||||
}
|
||||
|
10
src/rio.h
10
src/rio.h
@ -97,6 +97,14 @@ struct _rio {
|
||||
off_t pos;
|
||||
sds buf;
|
||||
} fd;
|
||||
/* Multiple connections target (used to write to N sockets). */
|
||||
struct {
|
||||
connection **conns; /* Connections */
|
||||
int *state; /* Error state of each fd. 0 (if ok) or errno. */
|
||||
int numconns;
|
||||
off_t pos;
|
||||
sds buf;
|
||||
} connset;
|
||||
} io;
|
||||
};
|
||||
|
||||
@ -184,4 +192,6 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
|
||||
void rioSetAutoSync(rio *r, off_t bytes);
|
||||
void rioSetReclaimCache(rio *r, int enabled);
|
||||
uint8_t rioCheckType(rio *r);
|
||||
void rioInitWithConnset(rio *r, connection **conns, int numconns);
|
||||
void rioFreeConnset(rio *r);
|
||||
#endif
|
||||
|
20
src/server.c
20
src/server.c
@ -2045,6 +2045,7 @@ void initServerConfig(void) {
|
||||
server.cached_primary = NULL;
|
||||
server.primary_initial_offset = -1;
|
||||
server.repl_state = REPL_STATE_NONE;
|
||||
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_STATE_NONE;
|
||||
server.repl_transfer_tmpfile = NULL;
|
||||
server.repl_transfer_fd = -1;
|
||||
server.repl_transfer_s = NULL;
|
||||
@ -2052,6 +2053,8 @@ void initServerConfig(void) {
|
||||
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||
server.primary_repl_offset = 0;
|
||||
server.fsynced_reploff_pending = 0;
|
||||
server.rdb_client_id = -1;
|
||||
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
|
||||
|
||||
/* Replication partial resync backlog */
|
||||
server.repl_backlog = NULL;
|
||||
@ -2545,6 +2548,8 @@ void initServer(void) {
|
||||
server.hz = server.config_hz;
|
||||
server.pid = getpid();
|
||||
server.in_fork_child = CHILD_TYPE_NONE;
|
||||
server.rdb_pipe_read = -1;
|
||||
server.rdb_child_exit_pipe = -1;
|
||||
server.main_thread_id = pthread_self();
|
||||
server.current_client = NULL;
|
||||
server.errors = raxNew();
|
||||
@ -2554,6 +2559,8 @@ void initServer(void) {
|
||||
server.clients_to_close = listCreate();
|
||||
server.replicas = listCreate();
|
||||
server.monitors = listCreate();
|
||||
server.replicas_waiting_psync = raxNew();
|
||||
server.wait_before_rdb_client_free = DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE;
|
||||
server.clients_pending_write = listCreate();
|
||||
server.clients_pending_io_write = listCreate();
|
||||
server.clients_pending_io_read = listCreate();
|
||||
@ -5140,6 +5147,7 @@ const char *replstateToString(int replstate) {
|
||||
switch (replstate) {
|
||||
case REPLICA_STATE_WAIT_BGSAVE_START:
|
||||
case REPLICA_STATE_WAIT_BGSAVE_END: return "wait_bgsave";
|
||||
case REPLICA_STATE_BG_RDB_LOAD: return "bg_transfer";
|
||||
case REPLICA_STATE_SEND_BULK: return "send_bulk";
|
||||
case REPLICA_STATE_ONLINE: return "online";
|
||||
default: return "";
|
||||
@ -5699,7 +5707,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
"master_last_io_seconds_ago:%d\r\n", server.primary ? ((int)(server.unixtime-server.primary->last_interaction)) : -1,
|
||||
"master_sync_in_progress:%d\r\n", server.repl_state == REPL_STATE_TRANSFER,
|
||||
"slave_read_repl_offset:%lld\r\n", replica_read_repl_offset,
|
||||
"slave_repl_offset:%lld\r\n", replica_repl_offset));
|
||||
"slave_repl_offset:%lld\r\n", replica_repl_offset,
|
||||
"replicas_repl_buffer_size:%zu\r\n", server.pending_repl_data.len,
|
||||
"replicas_repl_buffer_peak:%zu\r\n", server.pending_repl_data.peak));
|
||||
/* clang-format on */
|
||||
|
||||
if (server.repl_state == REPL_STATE_TRANSFER) {
|
||||
@ -5759,14 +5769,18 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
|
||||
info = sdscatprintf(info,
|
||||
"slave%d:ip=%s,port=%d,state=%s,"
|
||||
"offset=%lld,lag=%ld\r\n",
|
||||
"offset=%lld,lag=%ld,type=%s\r\n",
|
||||
replica_id, replica_ip, replica->replica_listening_port, state,
|
||||
replica->repl_ack_off, lag);
|
||||
replica->repl_ack_off, lag,
|
||||
replica->flag.repl_rdb_channel ? "rdb-channel"
|
||||
: replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-channel"
|
||||
: "replica");
|
||||
replica_id++;
|
||||
}
|
||||
}
|
||||
/* clang-format off */
|
||||
info = sdscatprintf(info, FMTARGS(
|
||||
"replicas_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.replicas_waiting_psync),
|
||||
"master_failover_state:%s\r\n", getFailoverStateString(),
|
||||
"master_replid:%s\r\n", server.replid,
|
||||
"master_replid2:%s\r\n", server.replid2,
|
||||
|
91
src/server.h
91
src/server.h
@ -139,7 +139,11 @@ struct hdr_histogram;
|
||||
#define CONFIG_BINDADDR_MAX 16
|
||||
#define CONFIG_MIN_RESERVED_FDS 32
|
||||
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
|
||||
#define DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE \
|
||||
60 /* Grace period in seconds for replica main \
|
||||
channel to establish psync. */
|
||||
#define INCREMENTAL_REHASHING_THRESHOLD_US 1000
|
||||
#define LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT 100 /* Default: 0.1 seconds */
|
||||
|
||||
/* Bucket sizes for client eviction pools. Each bucket stores clients with
|
||||
* memory usage of up to twice the size of the bucket below it. */
|
||||
@ -398,6 +402,18 @@ typedef enum {
|
||||
REPL_STATE_CONNECTED, /* Connected to primary */
|
||||
} repl_state;
|
||||
|
||||
/* Replica rdb-channel replication state. Used in server.repl_rdb_channel_state for
|
||||
* replicas to remember what to do next. */
|
||||
typedef enum {
|
||||
REPL_DUAL_CHANNEL_STATE_NONE = 0, /* No active rdb channel sync */
|
||||
REPL_DUAL_CHANNEL_SEND_HANDSHAKE, /* Send handshake sequence to primary */
|
||||
REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
|
||||
REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */
|
||||
REPL_DUAL_CHANNEL_RECEIVE_ENDOFF, /* Wait for $ENDOFF reply */
|
||||
REPL_DUAL_CHANNEL_RDB_LOAD, /* Loading rdb using rdb channel */
|
||||
REPL_DUAL_CHANNEL_RDB_LOADED,
|
||||
} repl_rdb_channel_state;
|
||||
|
||||
/* The state of an in progress coordinated failover */
|
||||
typedef enum {
|
||||
NO_FAILOVER = 0, /* No failover in progress */
|
||||
@ -415,18 +431,21 @@ typedef enum {
|
||||
#define REPLICA_STATE_SEND_BULK 8 /* Sending RDB file to replica. */
|
||||
#define REPLICA_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
|
||||
#define REPLICA_STATE_RDB_TRANSMITTED \
|
||||
10 /* RDB file transmitted - This state is used only for \
|
||||
* a replica that only wants RDB without replication buffer */
|
||||
10 /* RDB file transmitted - This state is used only for \
|
||||
* a replica that only wants RDB without replication buffer */
|
||||
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */
|
||||
|
||||
/* Replica capabilities. */
|
||||
#define REPLICA_CAPA_NONE 0
|
||||
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
|
||||
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
|
||||
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
|
||||
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
|
||||
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
|
||||
|
||||
/* Replica requirements */
|
||||
#define REPLICA_REQ_NONE 0
|
||||
#define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
|
||||
#define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */
|
||||
#define REPLICA_REQ_RDB_CHANNEL (1 << 2) /* Use dual-channel-replication */
|
||||
/* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */
|
||||
#define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS)
|
||||
|
||||
@ -1087,6 +1106,12 @@ typedef struct replBacklog {
|
||||
* byte in the replication backlog buffer.*/
|
||||
} replBacklog;
|
||||
|
||||
typedef struct replDataBuf {
|
||||
list *blocks; /* List of replDataBufBlock */
|
||||
size_t len; /* Number of bytes stored in all blocks */
|
||||
size_t peak;
|
||||
} replDataBuf;
|
||||
|
||||
typedef struct {
|
||||
list *clients;
|
||||
size_t mem_usage_sum;
|
||||
@ -1180,7 +1205,23 @@ typedef struct ClientFlags {
|
||||
uint64_t reprocessing_command : 1; /* The client is re-processing the command. */
|
||||
uint64_t replication_done : 1; /* Indicate that replication has been done on the client */
|
||||
uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */
|
||||
uint64_t reserved : 9; /* Reserved for future use */
|
||||
uint64_t
|
||||
protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \
|
||||
* release during full sync. This flag is used to ensure that the RDB client, which \
|
||||
* references the first replication data block required by the replica, is not \
|
||||
* released prematurely. Protecting the client is crucial for prevention of \
|
||||
* synchronization failures: \
|
||||
* If the RDB client is released before the replica initiates PSYNC, the primary \
|
||||
* will reduce the reference count (o->refcount) of the block needed by the replica.
|
||||
* \
|
||||
* This could potentially lead to the removal of the required data block, resulting \
|
||||
* in synchronization failures. Such failures could occur even in scenarios where \
|
||||
* the replica only needs an additional 4KB beyond the minimum size of the
|
||||
* repl_backlog.
|
||||
* By using this flag, we ensure that the RDB client remains intact until the replica
|
||||
* \ has successfully initiated PSYNC. */
|
||||
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
|
||||
uint64_t reserved : 7; /* Reserved for future use */
|
||||
} ClientFlags;
|
||||
|
||||
typedef struct client {
|
||||
@ -1257,6 +1298,8 @@ typedef struct client {
|
||||
int replica_version; /* Version on the form 0xMMmmpp. */
|
||||
short replica_capa; /* Replica capabilities: REPLICA_CAPA_* bitwise OR. */
|
||||
short replica_req; /* Replica requirements: REPLICA_REQ_* */
|
||||
uint64_t associated_rdb_client_id; /* The client id of this replica's rdb connection */
|
||||
time_t rdb_client_disconnect_time; /* Time of the first freeClient call on this client. Used for delaying free. */
|
||||
multiState mstate; /* MULTI/EXEC state */
|
||||
blockingState bstate; /* blocking state */
|
||||
long long woff; /* Last write global replication offset. */
|
||||
@ -1656,6 +1699,11 @@ struct valkeyServer {
|
||||
list *clients_pending_io_read; /* List of clients with pending read to be process by I/O threads. */
|
||||
list *clients_pending_io_write; /* List of clients with pending write to be process by I/O threads. */
|
||||
list *replicas, *monitors; /* List of replicas and MONITORs */
|
||||
rax *replicas_waiting_psync; /* Radix tree for tracking replicas awaiting partial synchronization.
|
||||
* Key: RDB client ID
|
||||
* Value: RDB client object
|
||||
* This structure holds dual-channel sync replicas from the start of their
|
||||
* RDB transfer until their main channel establishes partial synchronization. */
|
||||
client *current_client; /* The client that triggered the command execution (External or AOF). */
|
||||
client *executing_client; /* The client executing the current command (possibly script or module). */
|
||||
|
||||
@ -1698,6 +1746,7 @@ struct valkeyServer {
|
||||
off_t loading_loaded_bytes;
|
||||
time_t loading_start_time;
|
||||
off_t loading_process_events_interval_bytes;
|
||||
time_t loading_process_events_interval_ms;
|
||||
/* Fields used only for stats */
|
||||
time_t stat_starttime; /* Server start time */
|
||||
long long stat_numcommands; /* Number of processed commands */
|
||||
@ -1930,6 +1979,7 @@ struct valkeyServer {
|
||||
int repl_ping_replica_period; /* Primary pings the replica every N seconds */
|
||||
replBacklog *repl_backlog; /* Replication backlog for partial syncs */
|
||||
long long repl_backlog_size; /* Backlog circular buffer size */
|
||||
replDataBuf pending_repl_data; /* Replication data buffer for dual-channel-replication */
|
||||
time_t repl_backlog_time_limit; /* Time without replicas after the backlog
|
||||
gets released. */
|
||||
time_t repl_no_replicas_since; /* We have no replicas since that time.
|
||||
@ -1943,23 +1993,39 @@ struct valkeyServer {
|
||||
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
|
||||
int repl_diskless_sync_max_replicas; /* Max replicas for diskless repl BGSAVE
|
||||
* delay (start sooner if they all connect). */
|
||||
int dual_channel_replication; /* Config used to determine if the replica should
|
||||
* use dual channel replication for full syncs. */
|
||||
int wait_before_rdb_client_free; /* Grace period in seconds for replica main channel
|
||||
* to establish psync. */
|
||||
int debug_sleep_after_fork_us; /* Debug param that force the main process to
|
||||
* sleep for N microseconds after fork() in repl. */
|
||||
size_t repl_buffer_mem; /* The memory of replication buffer. */
|
||||
list *repl_buffer_blocks; /* Replication buffers blocks list
|
||||
* (serving replica clients and repl backlog) */
|
||||
/* Replication (replica) */
|
||||
char *primary_user; /* AUTH with this user and primary_auth with primary */
|
||||
sds primary_auth; /* AUTH with this password with primary */
|
||||
char *primary_host; /* Hostname of primary */
|
||||
int primary_port; /* Port of primary */
|
||||
int repl_timeout; /* Timeout after N seconds of primary idle */
|
||||
client *primary; /* Client that is primary for this replica */
|
||||
char *primary_user; /* AUTH with this user and primary_auth with primary */
|
||||
sds primary_auth; /* AUTH with this password with primary */
|
||||
char *primary_host; /* Hostname of primary */
|
||||
int primary_port; /* Port of primary */
|
||||
int repl_timeout; /* Timeout after N seconds of primary idle */
|
||||
client *primary; /* Client that is primary for this replica */
|
||||
uint64_t rdb_client_id; /* Rdb client id as it defined at primary side */
|
||||
struct {
|
||||
connection *conn;
|
||||
char replid[CONFIG_RUN_ID_SIZE + 1];
|
||||
long long reploff;
|
||||
long long read_reploff;
|
||||
int dbid;
|
||||
} repl_provisional_primary;
|
||||
client *cached_primary; /* Cached primary to be reused for PSYNC. */
|
||||
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
|
||||
int repl_state; /* Replication status if the instance is a replica */
|
||||
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
|
||||
off_t repl_transfer_size; /* Size of RDB to read from primary during sync. */
|
||||
off_t repl_transfer_read; /* Amount of RDB read from primary during sync. */
|
||||
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
|
||||
connection *repl_transfer_s; /* Replica -> Primary SYNC connection */
|
||||
connection *repl_rdb_transfer_s; /* Primary FULL SYNC connection (RDB download) */
|
||||
int repl_transfer_fd; /* Replica -> Primary SYNC temp file descriptor */
|
||||
char *repl_transfer_tmpfile; /* Replica-> Primary SYNC temp file name */
|
||||
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
|
||||
@ -2764,6 +2830,7 @@ int clientHasPendingReplies(client *c);
|
||||
int updateClientMemUsageAndBucket(client *c);
|
||||
void removeClientFromMemUsageBucket(client *c, int allow_eviction);
|
||||
void unlinkClient(client *c);
|
||||
void removeFromServerClientList(client *c);
|
||||
int writeToClient(client *c);
|
||||
void linkClient(client *c);
|
||||
void protectClient(client *c);
|
||||
@ -2965,6 +3032,8 @@ void clearFailoverState(void);
|
||||
void updateFailoverStatus(void);
|
||||
void abortFailover(const char *err);
|
||||
const char *getFailoverStateString(void);
|
||||
int sendCurrentOffsetToReplica(client *replica);
|
||||
void addRdbReplicaToPsyncWait(client *replica);
|
||||
|
||||
/* Generic persistence functions */
|
||||
void startLoadingFile(size_t size, char *filename, int rdbflags);
|
||||
|
10
tests/helpers/bg_server_sleep.tcl
Normal file
10
tests/helpers/bg_server_sleep.tcl
Normal file
@ -0,0 +1,10 @@
|
||||
source tests/support/valkey.tcl
|
||||
source tests/support/util.tcl
|
||||
|
||||
proc bg_server_sleep {host port sec} {
|
||||
set r [valkey $host $port 0]
|
||||
$r client setname SLEEP_HANDLER
|
||||
$r debug sleep $sec
|
||||
}
|
||||
|
||||
bg_server_sleep [lindex $argv 0] [lindex $argv 1] [lindex $argv 2]
|
@ -2,17 +2,23 @@ source tests/support/valkey.tcl
|
||||
|
||||
set ::tlsdir "tests/tls"
|
||||
|
||||
proc gen_write_load {host port seconds tls} {
|
||||
# Continuously sends SET commands to the node. If key is omitted, a random key is
|
||||
# used for every SET command. The value is always random.
|
||||
proc gen_write_load {host port seconds tls {key ""}} {
|
||||
set start_time [clock seconds]
|
||||
set r [valkey $host $port 1 $tls]
|
||||
$r client setname LOAD_HANDLER
|
||||
$r select 9
|
||||
while 1 {
|
||||
$r set [expr rand()] [expr rand()]
|
||||
if {$key == ""} {
|
||||
$r set [expr rand()] [expr rand()]
|
||||
} else {
|
||||
$r set $key [expr rand()]
|
||||
}
|
||||
if {[clock seconds]-$start_time > $seconds} {
|
||||
exit 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3]
|
||||
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4]
|
||||
|
1116
tests/integration/dual-channel-replication.tcl
Normal file
1116
tests/integration/dual-channel-replication.tcl
Normal file
File diff suppressed because it is too large
Load Diff
@ -203,10 +203,16 @@ start_server {} {
|
||||
} else {
|
||||
fail "Replicas didn't sync after master restart"
|
||||
}
|
||||
set dualchannel [lindex [r config get dual-channel-replication-enabled] 1]
|
||||
set psync_count 0
|
||||
if {$dualchannel == "yes"} {
|
||||
# Expect one fake psync
|
||||
set psync_count 1
|
||||
}
|
||||
|
||||
# Replication backlog is full
|
||||
assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]}
|
||||
assert {[status $master sync_partial_ok] == 0}
|
||||
assert {[status $master sync_partial_ok] == $psync_count}
|
||||
assert {[status $master sync_full] == 1}
|
||||
assert {[status $master rdb_last_load_keys_expired] == 2048}
|
||||
assert {[status $replica sync_full] == 1}
|
||||
|
@ -1,6 +1,7 @@
|
||||
# This test group aims to test that all replicas share one global replication buffer,
|
||||
# two replicas don't make replication buffer size double, and when there is no replica,
|
||||
# replica buffer will shrink.
|
||||
foreach dualchannel {"yes" "no"} {
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
start_server {} {
|
||||
start_server {} {
|
||||
@ -8,6 +9,9 @@ start_server {} {
|
||||
set replica1 [srv -3 client]
|
||||
set replica2 [srv -2 client]
|
||||
set replica3 [srv -1 client]
|
||||
$replica1 config set dual-channel-replication-enabled $dualchannel
|
||||
$replica2 config set dual-channel-replication-enabled $dualchannel
|
||||
$replica3 config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
@ -18,6 +22,7 @@ start_server {} {
|
||||
$master config set repl-diskless-sync-delay 5
|
||||
$master config set repl-diskless-sync-max-replicas 1
|
||||
$master config set client-output-buffer-limit "replica 0 0 0"
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
# Make sure replica3 is synchronized with master
|
||||
$replica3 replicaof $master_host $master_port
|
||||
@ -39,7 +44,7 @@ start_server {} {
|
||||
fail "fail to sync with replicas"
|
||||
}
|
||||
|
||||
test {All replicas share one global replication buffer} {
|
||||
test "All replicas share one global replication buffer dualchannel $dualchannel" {
|
||||
set before_used [s used_memory]
|
||||
populate 1024 "" 1024 ; # Write extra 1M data
|
||||
# New data uses 1M memory, but all replicas use only one
|
||||
@ -47,19 +52,29 @@ start_server {} {
|
||||
# more than double of replication buffer.
|
||||
set repl_buf_mem [s mem_total_replication_buffers]
|
||||
set extra_mem [expr {[s used_memory]-$before_used-1024*1024}]
|
||||
assert {$extra_mem < 2*$repl_buf_mem}
|
||||
|
||||
if {$dualchannel == "yes"} {
|
||||
# master's replication buffers should not grow during dual channel replication
|
||||
assert {$extra_mem < 1024*1024}
|
||||
assert {$repl_buf_mem < 1024*1024}
|
||||
} else {
|
||||
assert {$extra_mem < 2*$repl_buf_mem}
|
||||
}
|
||||
# Kill replica1, replication_buffer will not become smaller
|
||||
catch {$replica1 shutdown nosave}
|
||||
wait_for_condition 50 100 {
|
||||
[s connected_slaves] eq {2}
|
||||
set cur_slave_count 2
|
||||
if {$dualchannel == "yes"} {
|
||||
# slave3 is connected, slave2 is syncing (has two connection)
|
||||
set cur_slave_count 3
|
||||
}
|
||||
wait_for_condition 500 100 {
|
||||
[s connected_slaves] eq $cur_slave_count
|
||||
} else {
|
||||
fail "replica doesn't disconnect with master"
|
||||
}
|
||||
assert_equal $repl_buf_mem [s mem_total_replication_buffers]
|
||||
}
|
||||
|
||||
test {Replication buffer will become smaller when no replica uses} {
|
||||
test "Replication buffer will become smaller when no replica uses dualchannel $dualchannel" {
|
||||
# Make sure replica3 catch up with the master
|
||||
wait_for_ofs_sync $master $replica3
|
||||
|
||||
@ -71,12 +86,18 @@ start_server {} {
|
||||
} else {
|
||||
fail "replica2 doesn't disconnect with master"
|
||||
}
|
||||
assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
|
||||
if {$dualchannel == "yes"} {
|
||||
# master's replication buffers should not grow during dual channel replication
|
||||
assert {1024*512 > [s mem_total_replication_buffers]}
|
||||
} else {
|
||||
assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# This test group aims to test replication backlog size can outgrow the backlog
|
||||
# limit config if there is a slow replica which keep massive replication buffers,
|
||||
@ -84,6 +105,7 @@ start_server {} {
|
||||
# partial re-synchronization. Of course, replication backlog memory also can
|
||||
# become smaller when master disconnects with slow replicas since output buffer
|
||||
# limit is reached.
|
||||
foreach dualchannel {yes no} {
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
start_server {} {
|
||||
start_server {} {
|
||||
@ -91,6 +113,7 @@ start_server {} {
|
||||
set replica1_pid [s -2 process_id]
|
||||
set replica2 [srv -1 client]
|
||||
set replica2_pid [s -1 process_id]
|
||||
$replica1 config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
@ -99,6 +122,7 @@ start_server {} {
|
||||
$master config set save ""
|
||||
$master config set repl-backlog-size 16384
|
||||
$master config set client-output-buffer-limit "replica 0 0 0"
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
# Executing 'debug digest' on master which has many keys costs much time
|
||||
# (especially in valgrind), this causes that replica1 and replica2 disconnect
|
||||
@ -106,11 +130,13 @@ start_server {} {
|
||||
$master config set repl-timeout 1000
|
||||
$replica1 config set repl-timeout 1000
|
||||
$replica2 config set repl-timeout 1000
|
||||
$replica2 config set client-output-buffer-limit "replica 0 0 0"
|
||||
$replica2 config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
$replica1 replicaof $master_host $master_port
|
||||
wait_for_sync $replica1
|
||||
|
||||
test {Replication backlog size can outgrow the backlog limit config} {
|
||||
test "Replication backlog size can outgrow the backlog limit config dualchannel $dualchannel" {
|
||||
# Generating RDB will take 1000 seconds
|
||||
$master config set rdb-key-save-delay 1000000
|
||||
populate 1000 master 10000
|
||||
@ -124,7 +150,7 @@ start_server {} {
|
||||
}
|
||||
# Replication actual backlog grow more than backlog setting since
|
||||
# the slow replica2 kept replication buffer.
|
||||
populate 10000 master 10000
|
||||
populate 20000 master 10000
|
||||
assert {[s repl_backlog_histlen] > [expr 10000*10000]}
|
||||
}
|
||||
|
||||
@ -135,7 +161,7 @@ start_server {} {
|
||||
fail "Replica offset didn't catch up with the master after too long time"
|
||||
}
|
||||
|
||||
test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} {
|
||||
test "Replica could use replication buffer (beyond backlog config) for partial resynchronization dualchannel $dualchannel" {
|
||||
# replica1 disconnects with master
|
||||
$replica1 replicaof [srv -1 host] [srv -1 port]
|
||||
# Write a mass of data that exceeds repl-backlog-size
|
||||
@ -151,22 +177,36 @@ start_server {} {
|
||||
# replica2 still waits for bgsave ending
|
||||
assert {[s rdb_bgsave_in_progress] eq {1} && [lindex [$replica2 role] 3] eq {sync}}
|
||||
# master accepted replica1 partial resync
|
||||
assert_equal [s sync_partial_ok] {1}
|
||||
if { $dualchannel == "yes" } {
|
||||
# 2 psync using main channel
|
||||
# +1 "real" psync
|
||||
assert_equal [s sync_partial_ok] {3}
|
||||
} else {
|
||||
assert_equal [s sync_partial_ok] {1}
|
||||
}
|
||||
assert_equal [$master debug digest] [$replica1 debug digest]
|
||||
}
|
||||
|
||||
test {Replication backlog memory will become smaller if disconnecting with replica} {
|
||||
test "Replication backlog memory will become smaller if disconnecting with replica dualchannel $dualchannel" {
|
||||
assert {[s repl_backlog_histlen] > [expr 2*10000*10000]}
|
||||
assert_equal [s connected_slaves] {2}
|
||||
if {$dualchannel == "yes"} {
|
||||
# 1 connection of replica1
|
||||
# +2 connections during sync of replica2
|
||||
assert_equal [s connected_slaves] {3}
|
||||
} else {
|
||||
assert_equal [s connected_slaves] {2}
|
||||
}
|
||||
|
||||
pause_process $replica2_pid
|
||||
r config set client-output-buffer-limit "replica 128k 0 0"
|
||||
# trigger output buffer limit check
|
||||
r set key [string repeat A [expr 64*1024]]
|
||||
r set key [string repeat A [expr 64*2048]]
|
||||
# master will close replica2's connection since replica2's output
|
||||
# buffer limit is reached, so there only is replica1.
|
||||
wait_for_condition 100 100 {
|
||||
[s connected_slaves] eq {1}
|
||||
[s connected_slaves] eq {1} ||
|
||||
([s connected_slaves] eq {2} &&
|
||||
[string match {*slave*state=wait_bgsave*type=rdb-channel*} [$master info]])
|
||||
} else {
|
||||
fail "master didn't disconnect with replica2"
|
||||
}
|
||||
@ -185,15 +225,19 @@ start_server {} {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} {
|
||||
foreach dualchannel {"yes" "no"} {
|
||||
test "Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size. dualchannel $dualchannel" {
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
start_server {} {
|
||||
r config set save ""
|
||||
r config set repl-backlog-size 100mb
|
||||
r config set client-output-buffer-limit "replica 512k 0 0"
|
||||
r config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
set replica [srv -1 client]
|
||||
$replica config set dual-channel-replication-enabled $dualchannel
|
||||
$replica replicaof [srv 0 host] [srv 0 port]
|
||||
wait_for_sync $replica
|
||||
|
||||
@ -210,8 +254,13 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
|
||||
r set key $big_str ;# trigger output buffer limit check
|
||||
wait_for_ofs_sync r $replica
|
||||
# master accepted replica partial resync
|
||||
set psync_count 1
|
||||
if {$dualchannel == "yes"} {
|
||||
# One fake and one real psync
|
||||
set psync_count 2
|
||||
}
|
||||
assert_equal [s sync_full] {1}
|
||||
assert_equal [s sync_partial_ok] {1}
|
||||
assert_equal [s sync_partial_ok] $psync_count
|
||||
|
||||
r multi
|
||||
r set key $big_str
|
||||
@ -225,13 +274,13 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
|
||||
fail "Replica offset didn't catch up with the master after too long time"
|
||||
}
|
||||
assert_equal [s sync_full] {1}
|
||||
assert_equal [s sync_partial_ok] {1}
|
||||
assert_equal [s sync_partial_ok] $psync_count
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# This test was added to make sure big keys added to the backlog do not trigger psync loop.
|
||||
test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} {
|
||||
test "Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending. dualchannel $dualchannel" {
|
||||
proc client_field {r type f} {
|
||||
set client [$r client list type $type]
|
||||
if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} {
|
||||
@ -252,6 +301,8 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r
|
||||
|
||||
$master config set repl-backlog-size 16384
|
||||
$master config set client-output-buffer-limit "replica 32768 32768 60"
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
$replica config set dual-channel-replication-enabled $dualchannel
|
||||
# Key has has to be larger than replica client-output-buffer limit.
|
||||
set keysize [expr 256*1024]
|
||||
|
||||
@ -290,7 +341,11 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r
|
||||
|
||||
# now we expect the replica to re-connect but fail partial sync (it doesn't have large
|
||||
# enough COB limit and must result in a full-sync)
|
||||
assert {[status $master sync_partial_ok] == 0}
|
||||
if {$dualchannel == "yes"} {
|
||||
assert {[status $master sync_partial_ok] == [status $master sync_full]}
|
||||
} else {
|
||||
assert {[status $master sync_partial_ok] == 0}
|
||||
}
|
||||
|
||||
# Before this fix (#11905), the test would trigger an assertion in 'o->used >= c->ref_block_pos'
|
||||
test {The update of replBufBlock's repl_offset is ok - Regression test for #11666} {
|
||||
@ -304,4 +359,5 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
# If reconnect is > 0, the test actually try to break the connection and
|
||||
# reconnect with the master, otherwise just the initial synchronization is
|
||||
# checked for consistency.
|
||||
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
|
||||
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl dualchannel reconnect} {
|
||||
start_server {tags {"repl"} overrides {save {}}} {
|
||||
start_server {overrides {save {}}} {
|
||||
|
||||
@ -21,7 +21,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
|
||||
$master config set repl-backlog-ttl $backlog_ttl
|
||||
$master config set repl-diskless-sync $mdl
|
||||
$master config set repl-diskless-sync-delay 1
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
$slave config set repl-diskless-load $sdl
|
||||
$slave config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
|
||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
|
||||
@ -46,8 +48,8 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
|
||||
}
|
||||
}
|
||||
|
||||
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
|
||||
# Now while the clients are writing data, break the maste-slave
|
||||
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, dual-channel: $dualchannel, reconnect: $reconnect)" {
|
||||
# Now while the clients are writing data, break the master-slave
|
||||
# link multiple times.
|
||||
if ($reconnect) {
|
||||
for {set j 0} {$j < $duration*10} {incr j} {
|
||||
@ -74,19 +76,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
|
||||
|
||||
# Wait for the slave to reach the "online"
|
||||
# state from the POV of the master.
|
||||
set retry 5000
|
||||
while {$retry} {
|
||||
set info [$master info]
|
||||
if {[string match {*slave0:*state=online*} $info]} {
|
||||
break
|
||||
} else {
|
||||
incr retry -1
|
||||
after 100
|
||||
}
|
||||
}
|
||||
if {$retry == 0} {
|
||||
error "assertion:Slave not correctly synchronized"
|
||||
}
|
||||
verify_replica_online $master 0 5000
|
||||
|
||||
# Wait that slave acknowledge it is online so
|
||||
# we are sure that DBSIZE and DEBUG DIGEST will not
|
||||
@ -111,6 +101,10 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
|
||||
fail "Master - Replica inconsistency, Run diff -u against /tmp/repldump*.txt for more info"
|
||||
}
|
||||
assert {[$master dbsize] > 0}
|
||||
# if {$descr == "no backlog" && $mdl == "yes" && $sdl == "disabled"} {
|
||||
# puts "Master port: $master_port"
|
||||
# after 100000000
|
||||
# }
|
||||
eval $cond
|
||||
}
|
||||
}
|
||||
@ -120,24 +114,26 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
|
||||
tags {"external:skip"} {
|
||||
foreach mdl {no yes} {
|
||||
foreach sdl {disabled swapdb} {
|
||||
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
|
||||
} $mdl $sdl 0
|
||||
foreach dualchannel {yes no} {
|
||||
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
|
||||
} $mdl $sdl $dualchannel 0
|
||||
|
||||
test_psync {ok psync} 6 100000000 3600 0 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl 1
|
||||
test_psync {ok psync} 6 100000000 3600 0 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl $dualchannel 1
|
||||
|
||||
test_psync {no backlog} 6 100 3600 0.5 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl 1
|
||||
test_psync {no backlog} 6 100 3600 0.5 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl $dualchannel 1
|
||||
|
||||
test_psync {ok after delay} 3 100000000 3600 3 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl 1
|
||||
test_psync {ok after delay} 3 100000000 3600 3 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl $dualchannel 1
|
||||
|
||||
test_psync {backlog expired} 3 100000000 1 3 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl 1
|
||||
test_psync {backlog expired} 3 100000000 1 3 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl $dualchannel 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -301,7 +301,7 @@ start_server {tags {"repl external:skip"}} {
|
||||
}
|
||||
}
|
||||
|
||||
foreach mdl {no yes} {
|
||||
foreach mdl {no yes} dualchannel {no yes} {
|
||||
foreach sdl {disabled swapdb} {
|
||||
start_server {tags {"repl external:skip"} overrides {save {}}} {
|
||||
set master [srv 0 client]
|
||||
@ -317,7 +317,7 @@ foreach mdl {no yes} {
|
||||
lappend slaves [srv 0 client]
|
||||
start_server {overrides {save {}}} {
|
||||
lappend slaves [srv 0 client]
|
||||
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
|
||||
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl dual-channel-replication-enabled=$dualchannel" {
|
||||
# start load handles only inside the test, so that the test can be skipped
|
||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
|
||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
|
||||
@ -326,7 +326,11 @@ foreach mdl {no yes} {
|
||||
set load_handle4 [start_write_load $master_host $master_port 4]
|
||||
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
|
||||
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
# Send SLAVEOF commands to slaves
|
||||
[lindex $slaves 0] config set dual-channel-replication-enabled $dualchannel
|
||||
[lindex $slaves 1] config set dual-channel-replication-enabled $dualchannel
|
||||
[lindex $slaves 2] config set dual-channel-replication-enabled $dualchannel
|
||||
[lindex $slaves 0] config set repl-diskless-load $sdl
|
||||
[lindex $slaves 1] config set repl-diskless-load $sdl
|
||||
[lindex $slaves 2] config set repl-diskless-load $sdl
|
||||
@ -336,7 +340,7 @@ foreach mdl {no yes} {
|
||||
|
||||
# Wait for all the three slaves to reach the "online"
|
||||
# state from the POV of the master.
|
||||
set retry 500
|
||||
set retry 1000
|
||||
while {$retry} {
|
||||
set info [r -3 info]
|
||||
if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
|
||||
@ -375,6 +379,8 @@ foreach mdl {no yes} {
|
||||
wait_for_ofs_sync $master [lindex $slaves 1]
|
||||
wait_for_ofs_sync $master [lindex $slaves 2]
|
||||
|
||||
assert [string match *replicas_waiting_psync:0* [$master info replication]]
|
||||
|
||||
# Check digests
|
||||
set digest [$master debug digest]
|
||||
set digest0 [[lindex $slaves 0] debug digest]
|
||||
@ -436,7 +442,7 @@ start_server {tags {"repl external:skip"} overrides {save {}}} {
|
||||
}
|
||||
|
||||
# Diskless load swapdb when NOT async_loading (different master replid)
|
||||
foreach testType {Successful Aborted} {
|
||||
foreach testType {Successful Aborted} dualchannel {yes no} {
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
@ -451,8 +457,10 @@ foreach testType {Successful Aborted} {
|
||||
$master config set repl-diskless-sync yes
|
||||
$master config set repl-diskless-sync-delay 0
|
||||
$master config set save ""
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
$replica config set repl-diskless-load swapdb
|
||||
$replica config set save ""
|
||||
$replica config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
# Put different data sets on the master and replica
|
||||
# We need to put large keys on the master since the replica replies to info only once in 2mb
|
||||
@ -472,7 +480,7 @@ foreach testType {Successful Aborted} {
|
||||
# Start the replication process
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
test {Diskless load swapdb (different replid): replica enter loading} {
|
||||
test "Diskless load swapdb (different replid): replica enter loading dual-channel-replication-enabled=$dualchannel" {
|
||||
# Wait for the replica to start reading the rdb
|
||||
wait_for_condition 100 100 {
|
||||
[s -1 loading] eq 1
|
||||
@ -496,7 +504,7 @@ foreach testType {Successful Aborted} {
|
||||
fail "Replica didn't disconnect"
|
||||
}
|
||||
|
||||
test {Diskless load swapdb (different replid): old database is exposed after replication fails} {
|
||||
test "Diskless load swapdb (different replid): old database is exposed after replication fails dual-channel=$dualchannel" {
|
||||
# Ensure we see old values from replica
|
||||
assert_equal [$replica get mykey] "myvalue"
|
||||
|
||||
@ -518,7 +526,7 @@ foreach testType {Successful Aborted} {
|
||||
fail "Master <-> Replica didn't finish sync"
|
||||
}
|
||||
|
||||
test {Diskless load swapdb (different replid): new database is exposed after swapping} {
|
||||
test "Diskless load swapdb (different replid): new database is exposed after swapping dual-channel=$dualchannel" {
|
||||
# Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
|
||||
assert_equal [$replica GET mykey] ""
|
||||
|
||||
@ -549,6 +557,7 @@ foreach testType {Successful Aborted} {
|
||||
$master config set save ""
|
||||
$replica config set repl-diskless-load swapdb
|
||||
$replica config set save ""
|
||||
$replica config set dual-channel-replication-enabled no; # Doesn't work with swapdb
|
||||
|
||||
# Set replica writable so we can check that a key we manually added is served
|
||||
# during replication and after failure, but disappears on success
|
||||
@ -853,6 +862,7 @@ start_server {tags {"repl external:skip"} overrides {save ""}} {
|
||||
$master config set repl-diskless-sync yes
|
||||
$master config set repl-diskless-sync-delay 5
|
||||
$master config set repl-diskless-sync-max-replicas 2
|
||||
$master config set dual-channel-replication-enabled "no"; # dual-channel-replication doesn't use pipe
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set master_pid [srv 0 pid]
|
||||
@ -1042,8 +1052,8 @@ test "diskless replication child being killed is collected" {
|
||||
}
|
||||
} {} {external:skip}
|
||||
|
||||
foreach mdl {yes no} {
|
||||
test "replication child dies when parent is killed - diskless: $mdl" {
|
||||
foreach mdl {yes no} dualchannel {yes no} {
|
||||
test "replication child dies when parent is killed - diskless: $mdl dual-channel-replication-enabled: $dualchannel" {
|
||||
# when master is killed, make sure the fork child can detect that and exit
|
||||
start_server {tags {"repl"} overrides {save ""}} {
|
||||
set master [srv 0 client]
|
||||
@ -1057,6 +1067,7 @@ foreach mdl {yes no} {
|
||||
$master debug populate 10000
|
||||
start_server {overrides {save ""}} {
|
||||
set replica [srv 0 client]
|
||||
$replica config set dual-channel-replication-enabled $dualchannel
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
# wait for rdb child to start
|
||||
@ -1236,69 +1247,80 @@ test {Kill rdb child process if its dumping RDB is not useful} {
|
||||
}
|
||||
}
|
||||
} {} {external:skip}
|
||||
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
set master1_host [srv 0 host]
|
||||
set master1_port [srv 0 port]
|
||||
r set a b
|
||||
|
||||
start_server {} {
|
||||
set master2 [srv 0 client]
|
||||
set master2_host [srv 0 host]
|
||||
set master2_port [srv 0 port]
|
||||
# Take 10s for dumping RDB
|
||||
$master2 debug populate 10 master2 10
|
||||
$master2 config set rdb-key-save-delay 1000000
|
||||
foreach dualchannel {yes no} {
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
set master1 [srv 0 client]
|
||||
set master1_host [srv 0 host]
|
||||
set master1_port [srv 0 port]
|
||||
$master1 config set dual-channel-replication-enabled $dualchannel
|
||||
r set a b
|
||||
|
||||
start_server {} {
|
||||
set sub_replica [srv 0 client]
|
||||
set master2 [srv 0 client]
|
||||
set master2_host [srv 0 host]
|
||||
set master2_port [srv 0 port]
|
||||
# Take 10s for dumping RDB
|
||||
$master2 debug populate 10 master2 10
|
||||
$master2 config set rdb-key-save-delay 1000000
|
||||
$master2 config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
start_server {} {
|
||||
# Full sync with master1
|
||||
r slaveof $master1_host $master1_port
|
||||
wait_for_sync r
|
||||
assert_equal "b" [r get a]
|
||||
set sub_replica [srv 0 client]
|
||||
$sub_replica config set dual-channel-replication-enabled $dualchannel
|
||||
|
||||
# Let sub replicas sync with me
|
||||
$sub_replica slaveof [srv 0 host] [srv 0 port]
|
||||
wait_for_sync $sub_replica
|
||||
assert_equal "b" [$sub_replica get a]
|
||||
|
||||
# Full sync with master2, and then kill master2 before finishing dumping RDB
|
||||
r slaveof $master2_host $master2_port
|
||||
wait_for_condition 50 100 {
|
||||
([s -2 rdb_bgsave_in_progress] == 1) &&
|
||||
([string match "*wait_bgsave*" [s -2 slave0]])
|
||||
} else {
|
||||
fail "full sync didn't start"
|
||||
}
|
||||
catch {$master2 shutdown nosave}
|
||||
|
||||
test {Don't disconnect with replicas before loading transferred RDB when full sync} {
|
||||
assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"]
|
||||
# The replication id is not changed in entire replication chain
|
||||
assert_equal [s master_replid] [s -3 master_replid]
|
||||
assert_equal [s master_replid] [s -1 master_replid]
|
||||
}
|
||||
|
||||
test {Discard cache master before loading transferred RDB when full sync} {
|
||||
set full_sync [s -3 sync_full]
|
||||
set partial_sync [s -3 sync_partial_ok]
|
||||
# Partial sync with master1
|
||||
start_server {} {
|
||||
# Full sync with master1
|
||||
set replica [srv 0 client]
|
||||
$replica config set dual-channel-replication-enabled $dualchannel
|
||||
r slaveof $master1_host $master1_port
|
||||
wait_for_sync r
|
||||
# master1 accepts partial sync instead of full sync
|
||||
assert_equal $full_sync [s -3 sync_full]
|
||||
assert_equal [expr $partial_sync+1] [s -3 sync_partial_ok]
|
||||
assert_equal "b" [r get a]
|
||||
|
||||
# Since master only partially sync replica, and repl id is not changed,
|
||||
# the replica doesn't disconnect with its sub-replicas
|
||||
assert_equal [s master_replid] [s -3 master_replid]
|
||||
assert_equal [s master_replid] [s -1 master_replid]
|
||||
assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"]
|
||||
# Sub replica just has one full sync, no partial resync.
|
||||
assert_equal 1 [s sync_full]
|
||||
assert_equal 0 [s sync_partial_ok]
|
||||
# Let sub replicas sync with me
|
||||
$sub_replica slaveof [srv 0 host] [srv 0 port]
|
||||
wait_for_sync $sub_replica
|
||||
assert_equal "b" [$sub_replica get a]
|
||||
|
||||
# Full sync with master2, and then kill master2 before finishing dumping RDB
|
||||
r slaveof $master2_host $master2_port
|
||||
wait_for_condition 50 100 {
|
||||
([s -2 rdb_bgsave_in_progress] == 1) &&
|
||||
([string match "*wait_bgsave*" [s -2 slave0]])
|
||||
} else {
|
||||
fail "full sync didn't start"
|
||||
}
|
||||
catch {$master2 shutdown nosave}
|
||||
|
||||
test "Don't disconnect with replicas before loading transferred RDB when full sync with dual-channel-replication $dualchannel" {
|
||||
assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"]
|
||||
# The replication id is not changed in entire replication chain
|
||||
assert_equal [s master_replid] [s -3 master_replid]
|
||||
assert_equal [s master_replid] [s -1 master_replid]
|
||||
}
|
||||
|
||||
test "Discard cache master before loading transferred RDB when full sync with dual-channel-replication $dualchannel" {
|
||||
set full_sync [s -3 sync_full]
|
||||
set partial_sync [s -3 sync_partial_ok]
|
||||
# Partial sync with master1
|
||||
r slaveof $master1_host $master1_port
|
||||
wait_for_sync r
|
||||
# master1 accepts partial sync instead of full sync
|
||||
assert_equal $full_sync [s -3 sync_full]
|
||||
assert_equal [expr $partial_sync+1] [s -3 sync_partial_ok]
|
||||
|
||||
# Since master only partially sync replica, and repl id is not changed,
|
||||
# the replica doesn't disconnect with its sub-replicas
|
||||
assert_equal [s master_replid] [s -3 master_replid]
|
||||
assert_equal [s master_replid] [s -1 master_replid]
|
||||
assert ![log_file_matches [srv -1 stdout] "*Connection with master lost*"]
|
||||
# Sub replica just has one full sync, no partial resync.
|
||||
assert_equal 1 [s sync_full]
|
||||
if {$dualchannel == "yes"} {
|
||||
assert_equal 1 [s sync_partial_ok]
|
||||
} else {
|
||||
assert_equal 0 [s sync_partial_ok]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -141,6 +141,34 @@ proc wait_for_condition {maxtries delay e _else_ elsescript} {
|
||||
}
|
||||
}
|
||||
|
||||
proc verify_replica_online {master replica_idx max_retry} {
|
||||
set pause 100
|
||||
set count_down $max_retry
|
||||
while {$count_down} {
|
||||
set info [$master info]
|
||||
set pattern *slave$replica_idx:*state=online*
|
||||
if {[string match $pattern $info]} {
|
||||
break
|
||||
} else {
|
||||
incr count_down -1
|
||||
after $pause
|
||||
}
|
||||
}
|
||||
if {$count_down == 0} {
|
||||
set threshold [expr {$max_retry*$pause/1000}]
|
||||
error "assertion:Replica is not in sync after $threshold seconds"
|
||||
}
|
||||
}
|
||||
|
||||
proc wait_for_value_to_propegate_to_replica {master replica key} {
|
||||
set val [$master get $key]
|
||||
wait_for_condition 50 500 {
|
||||
([$replica get $key] eq $val)
|
||||
} else {
|
||||
error "Key $key did not propegate. Expected $val but got [$replica get $key]"
|
||||
}
|
||||
}
|
||||
|
||||
# try to match a value to a list of patterns that are either regex (starts with "/") or plain string.
|
||||
# The caller can specify to use only glob-pattern match
|
||||
proc search_pattern_list {value pattern_list {glob_pattern false}} {
|
||||
|
@ -555,7 +555,15 @@ proc find_valgrind_errors {stderr on_termination} {
|
||||
# of seconds to the specified the server instance.
|
||||
proc start_write_load {host port seconds} {
|
||||
set tclsh [info nameofexecutable]
|
||||
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls &
|
||||
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls "" &
|
||||
}
|
||||
|
||||
# Execute a background process writing only one key for the specified number
|
||||
# of seconds to the specified Redis instance. This load handler is useful for
|
||||
# tests which requires heavy replication stream but no memory load.
|
||||
proc start_one_key_write_load {host port seconds key} {
|
||||
set tclsh [info nameofexecutable]
|
||||
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls $key &
|
||||
}
|
||||
|
||||
# Stop a process generating write load executed with start_write_load.
|
||||
|
@ -47,6 +47,7 @@ start_server {tags {"auth external:skip"} overrides {requirepass foobar}} {
|
||||
}
|
||||
}
|
||||
|
||||
foreach dualchannel {yes no} {
|
||||
start_server {tags {"auth_binary_password external:skip"}} {
|
||||
test {AUTH fails when binary password is wrong} {
|
||||
r config set requirepass "abc\x00def"
|
||||
@ -65,12 +66,13 @@ start_server {tags {"auth_binary_password external:skip"}} {
|
||||
set master_port [srv -1 port]
|
||||
set slave [srv 0 client]
|
||||
|
||||
test {primaryauth test with binary password} {
|
||||
test "primaryauth test with binary password dualchannel = $dualchannel" {
|
||||
$master config set requirepass "abc\x00def"
|
||||
|
||||
$master config set dual-channel-replication-enabled $dualchannel
|
||||
# Configure the replica with primaryauth
|
||||
set loglines [count_log_lines 0]
|
||||
$slave config set primaryauth "abc"
|
||||
$slave config set dual-channel-replication-enabled $dualchannel
|
||||
$slave slaveof $master_host $master_port
|
||||
|
||||
# Verify replica is not able to sync with master
|
||||
@ -87,3 +89,4 @@ start_server {tags {"auth_binary_password external:skip"}} {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -558,6 +558,7 @@ start_server {tags {"introspection"}} {
|
||||
socket-mark-id
|
||||
req-res-logfile
|
||||
client-default-resp
|
||||
dual-channel-replication-enabled
|
||||
}
|
||||
|
||||
if {!$::tls} {
|
||||
|
18
valkey.conf
18
valkey.conf
@ -668,6 +668,24 @@ repl-diskless-sync-max-replicas 0
|
||||
# during replication.
|
||||
repl-diskless-load disabled
|
||||
|
||||
# This dual channel replication sync feature optimizes the full synchronization process
|
||||
# between a primary and its replicas. When enabled, it reduces both memory and CPU load
|
||||
# on the primary server.
|
||||
#
|
||||
# How it works:
|
||||
# 1. During full sync, instead of accumulating replication data on the primary server,
|
||||
# the data is sent directly to the syncing replica.
|
||||
# 2. The primary's background save (bgsave) process streams the RDB snapshot directly
|
||||
# to the replica over a separate connection.
|
||||
#
|
||||
# Tradeoff:
|
||||
# While this approach reduces load on the primary, it shifts the burden of storing
|
||||
# the replication buffer to the replica. This means the replica must have sufficient
|
||||
# memory to accommodate the buffer during synchronization. However, this tradeoff is
|
||||
# generally beneficial as it prevents potential performance degradation on the primary
|
||||
# server, which is typically handling more critical operations.
|
||||
dual-channel-replication-enabled no
|
||||
|
||||
# Master send PINGs to its replicas in a predefined interval. It's possible to
|
||||
# change this interval with the repl_ping_replica_period option. The default
|
||||
# value is 10 seconds.
|
||||
|
Loading…
x
Reference in New Issue
Block a user