Add scoped RDB loading context and immediate abort flag (#1173)
This PR introduces a new mechanism for temporarily changing the server's loading_rio context during RDB loading operations. The new `RDB_SCOPED_LOADING_RIO` macro allows for a scoped change of the `server.loading_rio` value, ensuring that it's automatically restored to its original value when the scope ends. Introduces a dedicated flag to `rio` to signal immediate abort, preventing potential use-after-free scenarios during replication disconnection in dual-channel load. This ensures proper termination of `rdbLoadRioWithLoadingCtx` when replication is cancelled due to connection loss on main connection. Fixes https://github.com/valkey-io/valkey/issues/1152 --------- Signed-off-by: naglera <anagler123@gmail.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
This commit is contained in:
parent
f1b7f3072c
commit
9f4503ca50
15
src/rdb.c
15
src/rdb.c
@ -64,6 +64,7 @@ char *rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
|
||||
extern int rdbCheckMode;
|
||||
void rdbCheckError(const char *fmt, ...);
|
||||
void rdbCheckSetError(const char *fmt, ...);
|
||||
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
||||
|
||||
#ifdef __GNUC__
|
||||
void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__((format(printf, 3, 4)));
|
||||
@ -2991,7 +2992,19 @@ done:
|
||||
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
functionsLibCtx *functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
rdbLoadingCtx loading_ctx = {.dbarray = server.db, .functions_lib_ctx = functions_lib_ctx};
|
||||
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, &loading_ctx);
|
||||
int retval = rdbLoadRioWithLoadingCtxScopedRdb(rdb, rdbflags, rsi, &loading_ctx);
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* Wrapper for rdbLoadRioWithLoadingCtx that manages a scoped RDB context.
|
||||
* This method wraps the rdbLoadRioWithLoadingCtx function, providing temporary
|
||||
* RDB context management. It sets a new current loading RDB, calls the wrapped
|
||||
* function, and then restores the previous loading RDB context. */
|
||||
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
|
||||
rio *prev_rio = server.loading_rio;
|
||||
server.loading_rio = rdb;
|
||||
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, rdb_loading_ctx);
|
||||
server.loading_rio = prev_rio;
|
||||
return retval;
|
||||
}
|
||||
|
||||
|
@ -172,7 +172,7 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
|
||||
int rdbSaveBinaryFloatValue(rio *rdb, float val);
|
||||
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
|
||||
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
|
||||
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
||||
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
||||
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
|
||||
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
|
||||
ssize_t rdbSaveFunctions(rio *rdb);
|
||||
|
@ -2254,7 +2254,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
|
||||
int loadingFailed = 0;
|
||||
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
|
||||
if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
|
||||
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
|
||||
/* RDB loading failed. */
|
||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
|
||||
"from socket, check server logs.");
|
||||
@ -2831,18 +2831,15 @@ typedef struct replDataBufBlock {
|
||||
* Reads replication data from primary into specified repl buffer block */
|
||||
int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t read) {
|
||||
int nread = connRead(conn, data_block->buf + data_block->used, read);
|
||||
if (nread == -1) {
|
||||
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
||||
dualChannelServerLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));
|
||||
if (nread <= 0) {
|
||||
if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) {
|
||||
dualChannelServerLog(LL_WARNING, "Provisional primary closed connection");
|
||||
/* Signal ongoing RDB load to terminate gracefully */
|
||||
if (server.loading_rio) rioCloseASAP(server.loading_rio);
|
||||
cancelReplicationHandshake(1);
|
||||
}
|
||||
return C_ERR;
|
||||
}
|
||||
if (nread == 0) {
|
||||
dualChannelServerLog(LL_VERBOSE, "Provisional primary closed connection");
|
||||
cancelReplicationHandshake(1);
|
||||
return C_ERR;
|
||||
}
|
||||
data_block->used += nread;
|
||||
server.stat_total_reads_processed++;
|
||||
return read - nread;
|
||||
|
16
src/rio.h
16
src/rio.h
@ -39,6 +39,7 @@
|
||||
|
||||
#define RIO_FLAG_READ_ERROR (1 << 0)
|
||||
#define RIO_FLAG_WRITE_ERROR (1 << 1)
|
||||
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */
|
||||
|
||||
#define RIO_TYPE_FILE (1 << 0)
|
||||
#define RIO_TYPE_BUFFER (1 << 1)
|
||||
@ -115,7 +116,7 @@ typedef struct _rio rio;
|
||||
* if needed. */
|
||||
|
||||
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
|
||||
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
|
||||
if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
|
||||
while (len) {
|
||||
size_t bytes_to_write =
|
||||
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
||||
@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
|
||||
}
|
||||
|
||||
static inline size_t rioRead(rio *r, void *buf, size_t len) {
|
||||
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
|
||||
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
|
||||
while (len) {
|
||||
size_t bytes_to_read =
|
||||
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
||||
@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) {
|
||||
return r->flush(r);
|
||||
}
|
||||
|
||||
static inline void rioCloseASAP(rio *r) {
|
||||
r->flags |= RIO_FLAG_CLOSE_ASAP;
|
||||
}
|
||||
|
||||
/* This function allows to know if there was a read error in any past
|
||||
* operation, since the rio stream was created or since the last call
|
||||
* to rioClearError(). */
|
||||
@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) {
|
||||
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
|
||||
}
|
||||
|
||||
/* Like rioGetReadError() but for async close errors. */
|
||||
static inline int rioGetAsyncCloseError(rio *r) {
|
||||
return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
|
||||
}
|
||||
|
||||
static inline void rioClearErrors(rio *r) {
|
||||
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
|
||||
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
|
||||
}
|
||||
|
||||
void rioInitWithFile(rio *r, FILE *fp);
|
||||
|
@ -2218,6 +2218,7 @@ void initServerConfig(void) {
|
||||
server.fsynced_reploff_pending = 0;
|
||||
server.rdb_client_id = -1;
|
||||
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
|
||||
server.loading_rio = NULL;
|
||||
|
||||
/* Replication partial resync backlog */
|
||||
server.repl_backlog = NULL;
|
||||
|
@ -2088,6 +2088,7 @@ struct valkeyServer {
|
||||
int dbid;
|
||||
} repl_provisional_primary;
|
||||
client *cached_primary; /* Cached primary to be reused for PSYNC. */
|
||||
rio *loading_rio; /* Pointer to the rio object currently used for loading data. */
|
||||
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
|
||||
int repl_state; /* Replication status if the instance is a replica */
|
||||
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
|
||||
|
@ -1158,8 +1158,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
$primary config set repl-diskless-sync-delay 0
|
||||
|
||||
# Generating RDB will cost 100 sec to generate
|
||||
$primary debug populate 10000 primary 1
|
||||
$primary config set rdb-key-save-delay 10000
|
||||
$primary debug populate 100000 primary 1
|
||||
$primary config set rdb-key-save-delay 1000
|
||||
|
||||
start_server {} {
|
||||
set replica [srv 0 client]
|
||||
@ -1222,7 +1222,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
fail "replica didn't start sync session in time"
|
||||
}
|
||||
$primary debug log "killing replica main connection"
|
||||
set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"]
|
||||
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
|
||||
assert {$replica_main_conn_id != ""}
|
||||
set loglines [count_log_lines -1]
|
||||
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry
|
||||
@ -1247,3 +1247,59 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
stop_write_load $load_handle
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
set primary [srv 0 client]
|
||||
set primary_host [srv 0 host]
|
||||
set primary_port [srv 0 port]
|
||||
|
||||
$primary config set repl-diskless-sync yes
|
||||
$primary config set dual-channel-replication-enabled yes
|
||||
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry
|
||||
|
||||
# Generating RDB will take 100 sec to generate
|
||||
$primary debug populate 1000000 primary 1
|
||||
$primary config set rdb-key-save-delay -10
|
||||
|
||||
start_server {} {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
set replica_port [srv 0 port]
|
||||
set replica_log [srv 0 stdout]
|
||||
|
||||
$replica config set dual-channel-replication-enabled yes
|
||||
$replica config set loglevel debug
|
||||
$replica config set repl-timeout 10
|
||||
$replica config set repl-diskless-load flush-before-load
|
||||
|
||||
test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152
|
||||
set loglines [count_log_lines 0]
|
||||
$replica replicaof $primary_host $primary_port
|
||||
# Wait for sync session to start
|
||||
wait_for_condition 500 1000 {
|
||||
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
|
||||
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
|
||||
[s -1 rdb_bgsave_in_progress] eq 1
|
||||
} else {
|
||||
fail "replica didn't start sync session in time"
|
||||
}
|
||||
wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10
|
||||
$primary set key val
|
||||
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
|
||||
$primary debug log "killing replica main connection $replica_main_conn_id"
|
||||
assert {$replica_main_conn_id != ""}
|
||||
set loglines [count_log_lines 0]
|
||||
$primary config set rdb-key-save-delay 0; # disable delay to allow next sync to succeed
|
||||
$primary client kill id $replica_main_conn_id
|
||||
# Wait for primary to abort the sync
|
||||
wait_for_condition 50 1000 {
|
||||
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
|
||||
} else {
|
||||
fail "Primary did not free repl buf block after sync failure"
|
||||
}
|
||||
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
|
||||
verify_replica_online $primary 0 500
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user