Merge 768440f33a38f8585c9e04e39ac969c24b180549 into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
talxsha 2025-02-02 18:01:31 +02:00 committed by GitHub
commit 13fe90a41d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 138 additions and 11 deletions

View File

@ -121,6 +121,9 @@ typedef struct ConnectionType {
/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);
/* Miscellaneous */
int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks
} ConnectionType;
struct connection {
@ -483,4 +486,8 @@ static inline void connSetPostponeUpdateState(connection *conn, int on) {
}
}
static inline int connIsIntegrityChecked(connection *conn) {
return conn->type->connIntegrityChecked && conn->type->connIntegrityChecked();
}
#endif /* __REDIS_CONNECTION_H */

View File

@ -3377,7 +3377,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0) {
if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: skipped checksum for this transfer");
} else if (cksum == 0) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
@ -3568,8 +3570,14 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports skiping RDB checksum, primary should also skip checksum.
* Otherwise, use checksum for this RDB transfer.
*/
int skip_rdb_checksum = 1;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
@ -3601,6 +3609,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}
// do not skip RDB checksum on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->repl_data->replica_capa & REPLICA_CAPA_SKIP_RDB_CHECKSUM))
skip_rdb_checksum = 0;
}
/* Create the child process. */
@ -3624,6 +3636,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);
if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
@ -3673,8 +3687,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process");
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s%s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process",
skip_rdb_checksum ? " while skipping RDB checksum for this transfer" : "");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
if (dual_channel) {

View File

@ -1846,6 +1846,9 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,
/* Miscellaneous */
.connIntegrityChecked = NULL,
};
ConnectionType *connectionTypeRdma(void) {

View File

@ -1314,11 +1314,13 @@ void freeClientReplicationData(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel>
* - capa <eof|psync2|dual-channel|skip-rdb-checksum>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* skip-rdb-checksum: supports skipping RDB checksum calculations during diskless sync using
* a connection that has integrity checks (such as TLS).
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
@ -1386,7 +1388,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->repl_data->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR))
c->repl_data->replica_capa |= REPLICA_CAPA_SKIP_RDB_CHECKSUM;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
@ -2045,6 +2048,12 @@ static int useDisklessLoad(void) {
return enabled;
}
/* Returns 1 if the node can skip RDB checksum during full sync.
* We can RDB checksum when data is transmitted through a verified stream. */
int replicationSupportSkipRDBChecksum(connection *conn, int is_replica_stream_verified, int is_primary_stream_verified) {
return is_replica_stream_verified && is_primary_stream_verified && connIsIntegrityChecked(conn);
}
/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from primary. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
@ -2324,7 +2333,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (replicationSupportSkipRDBChecksum(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
@ -3583,11 +3592,34 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* skip-rdb-checksum: supports skipping RDB checksum during full sync.
* Inform the primary of this capa only during diskless sync
* using a connection that has integrity checks (such as TLS).
* In non-diskless sync, or non-integrity-checked connection, there is more
* concern for data corruprion so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
int send_skip_rdb_checksum_capa = replicationSupportSkipRDBChecksum(conn, useDisklessLoad(), 1); // we can ignore primary's conditions when sending capa (is_primary_stream_verified=1)
char *argv[9] = {"REPLCONF", "capa", "eof", "capa", "psync2", NULL, NULL, NULL, NULL};
size_t lens[9] = {8, 4, 3, 4, 6, 0, 0, 0, 0};
int argc = 5;
if (send_skip_rdb_checksum_capa) {
argv[argc] = "capa";
lens[argc] = strlen("capa");
argc++;
argv[argc] = REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR;
lens[argc] = strlen(REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR);
argc++;
}
if (server.dual_channel_replication) {
argv[argc] = "capa";
lens[argc] = strlen("capa");
argc++;
argv[argc] = "dual-channel";
lens[argc] = strlen("dual-channel");
argc++;
}
err = sendCommandArgv(conn, argc, argv, lens);
if (err) goto write_error;
/* Inform the primary of our (replica) version. */

View File

@ -425,6 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) != 0) return; // skip RDB checksum
r->cksum = crc64(r->cksum, buf, len);
}

View File

@ -40,6 +40,7 @@
#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */
#define RIO_FLAG_SKIP_RDB_CHECKSUM (1 << 3)
#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)

View File

@ -451,11 +451,15 @@ typedef enum {
* a replica that only wants RDB without replication buffer */
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */
/* Replica capabilities. */
/* Replica capability flags */
#define REPLICA_CAPA_NONE 0
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_SKIP_RDB_CHECKSUM (1 << 3) /* Supports skipping RDB checksum for sync requests. */
/* Replica capability strings */
#define REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR "skip-rdb-checksum" /* Supports skipping RDB checksum for sync requests. */
/* Replica requirements */
#define REPLICA_REQ_NONE 0
@ -1977,6 +1981,7 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */
/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->primary client structure. */

View File

@ -447,6 +447,9 @@ static ConnectionType CT_Socket = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,
/* Miscellaneous */
.connIntegrityChecked = NULL,
};
int connBlock(connection *conn) {

View File

@ -814,6 +814,10 @@ static int connTLSListen(connListener *listener) {
return listenToPort(listener);
}
static int connTLSIsIntegrityChecked(void) {
return 1;
}
static void connTLSCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
@ -1186,6 +1190,9 @@ static ConnectionType CT_TLS = {
/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,
/* Miscellaneous */
.connIntegrityChecked = connTLSIsIntegrityChecked,
};
int RedisRegisterConnectionTypeTLS(void) {

View File

@ -207,6 +207,9 @@ static ConnectionType CT_Unix = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,
/* Miscellaneous */
.connIntegrityChecked = NULL,
};
int RedisRegisterConnectionTypeUnix(void) {

View File

@ -0,0 +1,49 @@
proc test_skip_rdb_checksum {primary primary_host primary_port primary_skipped_rdb_checksum_counter primary_diskless_sync replica_diskless_load} {
upvar primary_skipped_rdb_checksum_counter counter
$primary config set repl-diskless-sync $primary_diskless_sync
start_server {overrides {save {}}} {
set replica [srv 0 client]
$replica config set repl-diskless-load $replica_diskless_load
$replica replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication not started"
}
set replica_skipping_rdb_checksum_count [count_log_message 0 "RDB file was saved with checksum disabled: skipped checksum for this transfer"]
set primary_skipping_rdb_checksum_count [count_log_message -1 "while skipping RDB checksum for this transfer"]
if {$replica_diskless_load eq "disabled" || $primary_diskless_sync eq "no" || !$::tls} {
assert_equal $counter $primary_skipping_rdb_checksum_count "Primary should not skip RDB checksum in this scenario"
assert_equal 0 $replica_skipping_rdb_checksum_count "Replica should not skip RDB checksum in this scenario"
} else {
incr counter
assert_equal $counter $primary_skipping_rdb_checksum_count "Primary should skip RDB checksum in this scenario"
assert_equal 1 $replica_skipping_rdb_checksum_count "Replica should skip RDB checksum in this scenario"
}
}
}
start_server {tags {"repl tls"} overrides {save {}}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set primary_skipped_rdb_checksum_counter 0
if {$::tls} {
foreach primary_diskless_sync {no yes} {
foreach replica_diskless_load {disabled on-empty-db swapdb flush-before-load} {
test "Skip RDB checksum sync - tls:$::tls, repl_diskless_sync:$primary_diskless_sync, repl_diskless_load:$replica_diskless_load" {
test_skip_rdb_checksum $primary $primary_host $primary_port $primary_skipped_rdb_checksum_counter $primary_diskless_sync $replica_diskless_load
}
}
}
} else {
set primary_diskless_sync yes
set replica_diskless_load on-empty-db
test "Skip RDB checksum sync - tls:$::tls, repl_diskless_sync:$primary_diskless_sync, repl_diskless_load:$replica_diskless_load" {
test_skip_rdb_checksum $primary $primary_host $primary_port $primary_skipped_rdb_checksum_counter $primary_diskless_sync $replica_diskless_load
}
}
}