Merge branch 'unstable' into cluster_module_support_light_msg
Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
This commit is contained in:
commit
df720b2a29
2
.github/workflows/daily.yml
vendored
2
.github/workflows/daily.yml
vendored
@ -1262,7 +1262,7 @@ jobs:
|
||||
notify-about-job-results:
|
||||
runs-on: ubuntu-latest
|
||||
if: always() && github.event_name == 'schedule' && github.repository == 'valkey-io/valkey'
|
||||
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-rpm-distros-jemalloc, test-rpm-distros-tls-module, test-rpm-distros-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
|
||||
needs: [test-ubuntu-jemalloc, test-ubuntu-jemalloc-fortify, test-ubuntu-libc-malloc, test-ubuntu-no-malloc-usable-size, test-ubuntu-32bit, test-ubuntu-tls, test-ubuntu-tls-no-tls, test-ubuntu-io-threads, test-ubuntu-tls-io-threads, test-ubuntu-reclaim-cache, test-valgrind-test, test-valgrind-misc, test-valgrind-no-malloc-usable-size-test, test-valgrind-no-malloc-usable-size-misc, test-sanitizer-address, test-sanitizer-undefined, test-sanitizer-force-defrag, test-rpm-distros-jemalloc, test-rpm-distros-tls-module, test-rpm-distros-tls-module-no-tls, test-macos-latest, test-macos-latest-sentinel, test-macos-latest-cluster, build-macos, test-freebsd, test-alpine-jemalloc, test-alpine-libc-malloc, reply-schemas-validator]
|
||||
steps:
|
||||
- name: Collect job status
|
||||
run: |
|
||||
|
10
src/aof.c
10
src/aof.c
@ -1532,10 +1532,11 @@ int loadSingleAppendOnlyFile(char *filename) {
|
||||
}
|
||||
|
||||
/* Command lookup */
|
||||
cmd = lookupCommand(argv, argc);
|
||||
if (!cmd) {
|
||||
serverLog(LL_WARNING, "Unknown command '%s' reading the append only file %s", (char *)argv[0]->ptr,
|
||||
filename);
|
||||
sds err = NULL;
|
||||
fakeClient->cmd = fakeClient->lastcmd = cmd = lookupCommand(argv, argc);
|
||||
if ((!cmd && !commandCheckExistence(fakeClient, &err)) || (cmd && !commandCheckArity(cmd, argc, &err))) {
|
||||
serverLog(LL_WARNING, "Error reading the append only file %s, error: %s", filename, err);
|
||||
sdsfree(err);
|
||||
freeClientArgv(fakeClient);
|
||||
ret = AOF_FAILED;
|
||||
goto cleanup;
|
||||
@ -1544,7 +1545,6 @@ int loadSingleAppendOnlyFile(char *filename) {
|
||||
if (cmd->proc == multiCommand) valid_before_multi = valid_up_to;
|
||||
|
||||
/* Run the command in the context of a fake client */
|
||||
fakeClient->cmd = fakeClient->lastcmd = cmd;
|
||||
if (fakeClient->flag.multi && fakeClient->cmd->proc != execCommand) {
|
||||
/* Note: we don't have to attempt calling evalGetCommandFlags,
|
||||
* since this is AOF, the checks in processCommand are not made
|
||||
|
@ -162,7 +162,10 @@ int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
|
||||
if (rdbver_ptr) {
|
||||
*rdbver_ptr = rdbver;
|
||||
}
|
||||
if (rdbver > RDB_VERSION) return C_ERR;
|
||||
if ((rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX) ||
|
||||
(rdbver > RDB_VERSION && server.rdb_version_check == RDB_VERSION_CHECK_STRICT)) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (server.skip_checksum_validation) return C_OK;
|
||||
|
||||
|
@ -1148,6 +1148,7 @@ void clusterInit(void) {
|
||||
server.cluster->failover_auth_time = 0;
|
||||
server.cluster->failover_auth_count = 0;
|
||||
server.cluster->failover_auth_rank = 0;
|
||||
server.cluster->failover_auth_sent = 0;
|
||||
server.cluster->failover_failed_primary_rank = 0;
|
||||
server.cluster->failover_auth_epoch = 0;
|
||||
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
|
||||
@ -1738,14 +1739,16 @@ void freeClusterNode(clusterNode *n) {
|
||||
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
|
||||
serverAssert(dictDelete(server.cluster->nodes, nodename) == DICT_OK);
|
||||
sdsfree(nodename);
|
||||
sdsfree(n->hostname);
|
||||
sdsfree(n->human_nodename);
|
||||
sdsfree(n->announce_client_ipv4);
|
||||
sdsfree(n->announce_client_ipv6);
|
||||
|
||||
/* Release links and associated data structures. */
|
||||
if (n->link) freeClusterLink(n->link);
|
||||
if (n->inbound_link) freeClusterLink(n->inbound_link);
|
||||
|
||||
/* Free these members after links are freed, as freeClusterLink may access them. */
|
||||
sdsfree(n->hostname);
|
||||
sdsfree(n->human_nodename);
|
||||
sdsfree(n->announce_client_ipv4);
|
||||
sdsfree(n->announce_client_ipv6);
|
||||
listRelease(n->fail_reports);
|
||||
zfree(n->replicas);
|
||||
zfree(n);
|
||||
|
@ -160,6 +160,10 @@ configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY},
|
||||
{"milliseconds", LOG_TIMESTAMP_MILLISECONDS},
|
||||
{NULL, 0}};
|
||||
|
||||
configEnum rdb_version_check_enum[] = {{"strict", RDB_VERSION_CHECK_STRICT},
|
||||
{"relaxed", RDB_VERSION_CHECK_RELAXED},
|
||||
{NULL, 0}};
|
||||
|
||||
/* Output buffer limits presets. */
|
||||
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
|
||||
{0, 0, 0}, /* normal */
|
||||
@ -3244,6 +3248,7 @@ standardConfig static_configs[] = {
|
||||
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
|
||||
createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL),
|
||||
createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL),
|
||||
createEnumConfig("rdb-version-check", NULL, MODIFIABLE_CONFIG, rdb_version_check_enum, server.rdb_version_check, RDB_VERSION_CHECK_STRICT, NULL, NULL),
|
||||
|
||||
/* Integer configs */
|
||||
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
|
||||
|
15
src/rdb.c
15
src/rdb.c
@ -42,6 +42,7 @@
|
||||
#include <math.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdatomic.h>
|
||||
#include <stdbool.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
@ -1418,6 +1419,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
|
||||
int j;
|
||||
|
||||
if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
|
||||
/* TODO: Change this to "VALKEY%03d" next time we bump the RDB version. */
|
||||
snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
|
||||
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
|
||||
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
|
||||
@ -3023,17 +3025,24 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||
char buf[1024];
|
||||
int error;
|
||||
long long empty_keys_skipped = 0;
|
||||
bool is_valkey_magic;
|
||||
|
||||
rdb->update_cksum = rdbLoadProgressCallback;
|
||||
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
|
||||
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
|
||||
buf[9] = '\0';
|
||||
if (memcmp(buf, "REDIS", 5) != 0) {
|
||||
if (memcmp(buf, "REDIS0", 6) == 0) {
|
||||
is_valkey_magic = false;
|
||||
} else if (memcmp(buf, "VALKEY", 6) == 0) {
|
||||
is_valkey_magic = true;
|
||||
} else {
|
||||
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
|
||||
return C_ERR;
|
||||
}
|
||||
rdbver = atoi(buf + 5);
|
||||
if (rdbver < 1 || rdbver > RDB_VERSION) {
|
||||
rdbver = atoi(buf + 6);
|
||||
if (rdbver < 1 ||
|
||||
(rdbver >= RDB_FOREIGN_VERSION_MIN && !is_valkey_magic) ||
|
||||
(rdbver > RDB_VERSION && server.rdb_version_check == RDB_VERSION_CHECK_STRICT)) {
|
||||
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
|
||||
return C_ERR;
|
||||
}
|
||||
|
20
src/rdb.h
20
src/rdb.h
@ -37,9 +37,27 @@
|
||||
#include "server.h"
|
||||
|
||||
/* The current RDB version. When the format changes in a way that is no longer
|
||||
* backward compatible this number gets incremented. */
|
||||
* backward compatible this number gets incremented.
|
||||
*
|
||||
* RDB 11 is the last open-source Redis RDB version, used by Valkey 7.x and 8.x.
|
||||
*
|
||||
* RDB 12+ are non-open-source Redis formats.
|
||||
*
|
||||
* Next time we bump the Valkey RDB version, use much higher version to avoid
|
||||
* collisions with non-OSS Redis RDB versions. For example, we could use RDB
|
||||
* version 90 for Valkey 9.0.
|
||||
*
|
||||
* In an RDB file/stream, we also check the magic string REDIS or VALKEY but in
|
||||
* the DUMP/RESTORE format, there is only the RDB version number and no magic
|
||||
* string. */
|
||||
#define RDB_VERSION 11
|
||||
|
||||
/* Reserved range for foreign (unsupported, non-OSS) RDB format. */
|
||||
#define RDB_FOREIGN_VERSION_MIN 12
|
||||
#define RDB_FOREIGN_VERSION_MAX 79
|
||||
static_assert(RDB_VERSION < RDB_FOREIGN_VERSION_MIN || RDB_VERSION > RDB_FOREIGN_VERSION_MAX,
|
||||
"RDB version in foreign version range");
|
||||
|
||||
/* Defines related to the dump file format. To store 32 bits lengths for short
|
||||
* keys requires a lot of space, so we check the most significant 2 bits of
|
||||
* the first byte to interpreter the length:
|
||||
|
73
src/rdma.c
73
src/rdma.c
@ -23,6 +23,7 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/mman.h>
|
||||
|
||||
#define CONN_TYPE_RDMA "rdma"
|
||||
|
||||
@ -134,6 +135,8 @@ static list *pending_list;
|
||||
static rdma_listener *rdma_listeners;
|
||||
static serverRdmaContextConfig *rdma_config;
|
||||
|
||||
static size_t page_size;
|
||||
|
||||
static ConnectionType CT_RDMA;
|
||||
|
||||
static void serverRdmaError(char *err, const char *fmt, ...) {
|
||||
@ -191,31 +194,56 @@ static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCm
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* To make Valkey forkable, buffer which is registered as RDMA
|
||||
* memory region should be aligned to page size. And the length
|
||||
* also need be aligned to page size.
|
||||
/* To make Valkey forkable, buffer which is registered as RDMA memory region should be
|
||||
* aligned to page size. And the length also need be aligned to page size.
|
||||
* Random segment-fault case like this:
|
||||
* 0x7f2764ac5000 - 0x7f2764ac7000
|
||||
* |ptr0 128| ... |ptr1 4096| ... |ptr2 512|
|
||||
*
|
||||
* After ibv_reg_mr(pd, ptr1, 4096, access), the full range of 8K
|
||||
* becomes DONTFORK. And the child process will hit a segment fault
|
||||
* during access ptr0/ptr2.
|
||||
* Note that the memory can be freed by libc free only.
|
||||
* TODO: move it to zmalloc.c if necessary
|
||||
* After ibv_reg_mr(pd, ptr1, 4096, access), the full range of 8K becomes DONTFORK. And
|
||||
* the child process will hit a segment fault during access ptr0/ptr2.
|
||||
*
|
||||
* The portable posix_memalign(&tmp, page_size, aligned_size) would be fine too. However,
|
||||
* RDMA is supported by Linux only, so it would not break anything. Using raw mmap syscall
|
||||
* to allocate a separate virtual memory area(VMA), also make it protected by the 2 guard
|
||||
* pages (a top one and a bottom one).
|
||||
*/
|
||||
static void *page_aligned_zalloc(size_t size) {
|
||||
void *tmp;
|
||||
size_t aligned_size, page_size = sysconf(_SC_PAGESIZE);
|
||||
static void *rdmaMemoryAlloc(size_t size) {
|
||||
size_t real_size, aligned_size = (size + page_size - 1) & (~(page_size - 1));
|
||||
uint8_t *ptr;
|
||||
|
||||
aligned_size = (size + page_size - 1) & (~(page_size - 1));
|
||||
if (posix_memalign(&tmp, page_size, aligned_size)) {
|
||||
serverPanic("posix_memalign failed");
|
||||
real_size = aligned_size + 2 * page_size;
|
||||
ptr = mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||
if (ptr == MAP_FAILED) {
|
||||
serverPanic("failed to allocate memory for RDMA region");
|
||||
}
|
||||
|
||||
memset(tmp, 0x00, aligned_size);
|
||||
madvise(ptr, real_size, MADV_DONTDUMP); /* no need to dump this VMA on coredump */
|
||||
mprotect(ptr, page_size, PROT_NONE); /* top page of this VMA */
|
||||
mprotect(ptr + size + page_size, page_size, PROT_NONE); /* bottom page of this VMA */
|
||||
|
||||
return tmp;
|
||||
return ptr + page_size;
|
||||
}
|
||||
|
||||
static void rdmaMemoryFree(void *ptr, size_t size) {
|
||||
uint8_t *real_ptr;
|
||||
size_t real_size, aligned_size;
|
||||
|
||||
if (!ptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ((unsigned long)ptr & (page_size - 1)) {
|
||||
serverPanic("unaligned memory in use for RDMA region");
|
||||
}
|
||||
|
||||
aligned_size = (size + page_size - 1) & (~(page_size - 1));
|
||||
real_size = aligned_size + 2 * page_size;
|
||||
real_ptr = (uint8_t *)ptr - page_size;
|
||||
|
||||
if (munmap(real_ptr, real_size)) {
|
||||
serverPanic("failed to free memory for RDMA region");
|
||||
}
|
||||
}
|
||||
|
||||
static void rdmaDestroyIoBuf(RdmaContext *ctx) {
|
||||
@ -224,7 +252,7 @@ static void rdmaDestroyIoBuf(RdmaContext *ctx) {
|
||||
ctx->rx.mr = NULL;
|
||||
}
|
||||
|
||||
zlibc_free(ctx->rx.addr);
|
||||
rdmaMemoryFree(ctx->rx.addr, ctx->rx.length);
|
||||
ctx->rx.addr = NULL;
|
||||
|
||||
if (ctx->tx.mr) {
|
||||
@ -232,7 +260,7 @@ static void rdmaDestroyIoBuf(RdmaContext *ctx) {
|
||||
ctx->tx.mr = NULL;
|
||||
}
|
||||
|
||||
zlibc_free(ctx->tx.addr);
|
||||
rdmaMemoryFree(ctx->tx.addr, ctx->tx.length);
|
||||
ctx->tx.addr = NULL;
|
||||
|
||||
if (ctx->cmd_mr) {
|
||||
@ -240,7 +268,7 @@ static void rdmaDestroyIoBuf(RdmaContext *ctx) {
|
||||
ctx->cmd_mr = NULL;
|
||||
}
|
||||
|
||||
zlibc_free(ctx->cmd_buf);
|
||||
rdmaMemoryFree(ctx->cmd_buf, sizeof(ValkeyRdmaCmd) * VALKEY_RDMA_MAX_WQE * 2);
|
||||
ctx->cmd_buf = NULL;
|
||||
}
|
||||
|
||||
@ -251,7 +279,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
|
||||
int i;
|
||||
|
||||
/* setup CMD buf & MR */
|
||||
ctx->cmd_buf = page_aligned_zalloc(length);
|
||||
ctx->cmd_buf = rdmaMemoryAlloc(length);
|
||||
ctx->cmd_mr = ibv_reg_mr(ctx->pd, ctx->cmd_buf, length, access);
|
||||
if (!ctx->cmd_mr) {
|
||||
serverLog(LL_WARNING, "RDMA: reg mr for CMD failed");
|
||||
@ -275,7 +303,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
|
||||
/* setup recv buf & MR */
|
||||
access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
|
||||
length = rdma_config->rx_size;
|
||||
ctx->rx.addr = page_aligned_zalloc(length);
|
||||
ctx->rx.addr = rdmaMemoryAlloc(length);
|
||||
ctx->rx.length = length;
|
||||
ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access);
|
||||
if (!ctx->rx.mr) {
|
||||
@ -387,7 +415,7 @@ static int rdmaAdjustSendbuf(RdmaContext *ctx, unsigned int length) {
|
||||
}
|
||||
|
||||
/* create a new buffer & MR */
|
||||
ctx->tx.addr = page_aligned_zalloc(length);
|
||||
ctx->tx.addr = rdmaMemoryAlloc(length);
|
||||
ctx->tx_length = length;
|
||||
ctx->tx.mr = ibv_reg_mr(ctx->pd, ctx->tx.addr, length, access);
|
||||
if (!ctx->tx.mr) {
|
||||
@ -1705,6 +1733,7 @@ error:
|
||||
|
||||
static void rdmaInit(void) {
|
||||
pending_list = listCreate();
|
||||
page_size = sysconf(_SC_PAGESIZE);
|
||||
|
||||
VALKEY_BUILD_BUG_ON(sizeof(ValkeyRdmaFeature) != 32);
|
||||
VALKEY_BUILD_BUG_ON(sizeof(ValkeyRdmaKeepalive) != 32);
|
||||
|
@ -3911,7 +3911,7 @@ void afterCommand(client *c) {
|
||||
int commandCheckExistence(client *c, sds *err) {
|
||||
if (c->cmd) return 1;
|
||||
if (!err) return 0;
|
||||
if (isContainerCommandBySds(c->argv[0]->ptr)) {
|
||||
if (isContainerCommandBySds(c->argv[0]->ptr) && c->argc >= 2) {
|
||||
/* If we can't find the command but argv[0] by itself is a command
|
||||
* it means we're dealing with an invalid subcommand. Print Help. */
|
||||
sds cmd = sdsnew((char *)c->argv[0]->ptr);
|
||||
@ -4025,7 +4025,6 @@ int processCommand(client *c) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
||||
/* Check if the command is marked as protected and the relevant configuration allows it */
|
||||
if (c->cmd->flags & CMD_PROTECTED) {
|
||||
if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) ||
|
||||
|
@ -600,6 +600,9 @@ typedef enum { LOG_TIMESTAMP_LEGACY = 0,
|
||||
LOG_TIMESTAMP_ISO8601,
|
||||
LOG_TIMESTAMP_MILLISECONDS } log_timestamp_type;
|
||||
|
||||
typedef enum { RDB_VERSION_CHECK_STRICT = 0,
|
||||
RDB_VERSION_CHECK_RELAXED } rdb_version_check_type;
|
||||
|
||||
/* common sets of actions to pause/unpause */
|
||||
#define PAUSE_ACTIONS_CLIENT_WRITE_SET \
|
||||
(PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA)
|
||||
@ -1768,6 +1771,7 @@ struct valkeyServer {
|
||||
int active_defrag_enabled;
|
||||
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
|
||||
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
|
||||
int rdb_version_check; /* Try to load RDB produced by a future version. */
|
||||
int jemalloc_bg_thread; /* Enable jemalloc background thread */
|
||||
int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */
|
||||
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
|
||||
|
BIN
tests/assets/encodings-rdb987.rdb
Normal file
BIN
tests/assets/encodings-rdb987.rdb
Normal file
Binary file not shown.
@ -259,7 +259,7 @@ tags {"aof external:skip"} {
|
||||
|
||||
start_server_aof_ex [list dir $server_path aof-load-truncated yes] [list wait_ready false] {
|
||||
test "Unknown command: Server should have logged an error" {
|
||||
wait_for_log_messages 0 {"*Unknown command 'bla' reading the append only file*"} 0 10 1000
|
||||
wait_for_log_messages 0 {"*unknown command 'bla'*"} 0 10 1000
|
||||
}
|
||||
}
|
||||
|
||||
@ -693,6 +693,47 @@ tags {"aof cluster external:skip"} {
|
||||
assert_equal [r ping] {PONG}
|
||||
}
|
||||
}
|
||||
|
||||
test {Test command check in aof won't crash} {
|
||||
# cluster, wrong number of arguments for 'cluster' command
|
||||
create_aof $aof_dirpath $aof_file { append_to_aof [formatCommand cluster] }
|
||||
create_aof_manifest $aof_dirpath $aof_manifest_file { append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n" }
|
||||
start_server_aof_ex [list dir $server_path] [list wait_ready false] {
|
||||
wait_for_condition 100 50 {
|
||||
! [is_alive [srv pid]]
|
||||
} else {
|
||||
fail "AOF loading didn't fail"
|
||||
}
|
||||
assert_equal 1 [count_message_lines $server_path/stdout "wrong number of arguments for 'cluster' command"]
|
||||
}
|
||||
clean_aof_persistence $aof_dirpath
|
||||
|
||||
# cluster slots-xxx, unknown subcommand 'slots-xxx'
|
||||
create_aof $aof_dirpath $aof_file { append_to_aof [formatCommand cluster slots-xxx] }
|
||||
create_aof_manifest $aof_dirpath $aof_manifest_file { append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n" }
|
||||
start_server_aof_ex [list dir $server_path] [list wait_ready false] {
|
||||
wait_for_condition 100 50 {
|
||||
! [is_alive [srv pid]]
|
||||
} else {
|
||||
fail "AOF loading didn't fail"
|
||||
}
|
||||
assert_equal 1 [count_message_lines $server_path/stdout "unknown subcommand 'slots-xxx'"]
|
||||
}
|
||||
clean_aof_persistence $aof_dirpath
|
||||
|
||||
# cluster slots xxx, wrong number of arguments for 'cluster|slots' command
|
||||
create_aof $aof_dirpath $aof_file { append_to_aof [formatCommand cluster slots xxx] }
|
||||
create_aof_manifest $aof_dirpath $aof_manifest_file { append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n" }
|
||||
start_server_aof_ex [list dir $server_path] [list wait_ready false] {
|
||||
wait_for_condition 100 50 {
|
||||
![is_alive [srv pid]]
|
||||
} else {
|
||||
fail "AOF loading didn't fail"
|
||||
}
|
||||
assert_equal 1 [count_message_lines $server_path/stdout "wrong number of arguments for 'cluster|slots' command"]
|
||||
}
|
||||
clean_aof_persistence $aof_dirpath
|
||||
}
|
||||
}
|
||||
|
||||
set ::singledb $old_singledb
|
||||
|
@ -18,7 +18,7 @@ start_server {tags {"repl needs:other-server external:skip"} start-other-server
|
||||
start_server {} {
|
||||
test "Start replication from $primary_name_and_version" {
|
||||
r replicaof [srv -1 host] [srv -1 port]
|
||||
wait_for_sync r
|
||||
wait_for_sync r 500 100
|
||||
# The key has been transferred.
|
||||
assert_equal bar [r get foo]
|
||||
assert_equal up [s master_link_status]
|
||||
|
@ -1,9 +1,21 @@
|
||||
tags {"rdb external:skip"} {
|
||||
|
||||
# Helper function to start a server and kill it, just to check the error
|
||||
# logged.
|
||||
set defaults {}
|
||||
proc start_server_and_kill_it {overrides code} {
|
||||
upvar defaults defaults srv srv server_path server_path
|
||||
set config [concat $defaults $overrides]
|
||||
set srv [start_server [list overrides $config keep_persistence true]]
|
||||
uplevel 1 $code
|
||||
kill_server $srv
|
||||
}
|
||||
|
||||
set server_path [tmpdir "server.rdb-encoding-test"]
|
||||
|
||||
# Copy RDB with different encodings in server path
|
||||
exec cp tests/assets/encodings.rdb $server_path
|
||||
exec cp tests/assets/encodings-rdb987.rdb $server_path
|
||||
exec cp tests/assets/list-quicklist.rdb $server_path
|
||||
|
||||
start_server [list overrides [list "dir" $server_path "dbfilename" "list-quicklist.rdb" save ""]] {
|
||||
@ -15,11 +27,7 @@ start_server [list overrides [list "dir" $server_path "dbfilename" "list-quickli
|
||||
} {7}
|
||||
}
|
||||
|
||||
start_server [list overrides [list "dir" $server_path "dbfilename" "encodings.rdb"]] {
|
||||
test "RDB encoding loading test" {
|
||||
r select 0
|
||||
csvdump r
|
||||
} {"0","compressible","string","aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
set csv_dump {"0","compressible","string","aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
"0","hash","hash","a","1","aa","10","aaa","100","b","2","bb","20","bbb","200","c","3","cc","30","ccc","300","ddd","400","eee","5000000000",
|
||||
"0","hash_zipped","hash","a","1","b","2","c","3",
|
||||
"0","list","list","1","2","3","a","b","c","100000","6000000000","1","2","3","a","b","c","100000","6000000000","1","2","3","a","b","c","100000","6000000000",
|
||||
@ -33,6 +41,32 @@ start_server [list overrides [list "dir" $server_path "dbfilename" "encodings.rd
|
||||
"0","zset","zset","a","1","b","2","c","3","aa","10","bb","20","cc","30","aaa","100","bbb","200","ccc","300","aaaa","1000","cccc","123456789","bbbb","5000000000",
|
||||
"0","zset_zipped","zset","a","1","b","2","c","3",
|
||||
}
|
||||
|
||||
start_server [list overrides [list "dir" $server_path "dbfilename" "encodings.rdb"]] {
|
||||
test "RDB encoding loading test" {
|
||||
r select 0
|
||||
csvdump r
|
||||
} $csv_dump
|
||||
}
|
||||
|
||||
start_server_and_kill_it [list "dir" $server_path "dbfilename" "encodings-rdb987.rdb"] {
|
||||
test "RDB future version loading, strict version check" {
|
||||
wait_for_condition 50 100 {
|
||||
[string match {*Fatal error loading*} \
|
||||
[exec tail -1 < [dict get $srv stdout]]]
|
||||
} else {
|
||||
fail "Server started even if RDB version check failed"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server [list overrides [list "dir" $server_path \
|
||||
"dbfilename" "encodings-rdb987.rdb" \
|
||||
"rdb-version-check" "relaxed"]] {
|
||||
test "RDB future version loading, relaxed version check" {
|
||||
r select 0
|
||||
csvdump r
|
||||
} $csv_dump
|
||||
}
|
||||
|
||||
set server_path [tmpdir "server.rdb-startup-test"]
|
||||
@ -80,17 +114,6 @@ start_server [list overrides [list "dir" $server_path] keep_persistence true] {
|
||||
r del stream
|
||||
}
|
||||
|
||||
# Helper function to start a server and kill it, just to check the error
|
||||
# logged.
|
||||
set defaults {}
|
||||
proc start_server_and_kill_it {overrides code} {
|
||||
upvar defaults defaults srv srv server_path server_path
|
||||
set config [concat $defaults $overrides]
|
||||
set srv [start_server [list overrides $config keep_persistence true]]
|
||||
uplevel 1 $code
|
||||
kill_server $srv
|
||||
}
|
||||
|
||||
# Make the RDB file unreadable
|
||||
file attributes [file join $server_path dump.rdb] -permissions 0222
|
||||
|
||||
|
@ -110,8 +110,8 @@ proc waitForBgrewriteaof r {
|
||||
}
|
||||
}
|
||||
|
||||
proc wait_for_sync r {
|
||||
wait_for_condition 50 100 {
|
||||
proc wait_for_sync {r {maxtries 50} {delay 100}} {
|
||||
wait_for_condition $maxtries $delay {
|
||||
[status $r master_link_status] eq "up"
|
||||
} else {
|
||||
fail "replica didn't sync in time"
|
||||
|
@ -124,6 +124,22 @@ start_server {tags {"dump"}} {
|
||||
close_replication_stream $repl
|
||||
} {} {needs:repl}
|
||||
|
||||
test {RESTORE key with future RDB version, strict version check} {
|
||||
# str len "bar" RDB 222 CRC64 checksum
|
||||
# | | | | |
|
||||
set bar_dump "\x00\x03\x62\x61\x72\xde\x00\x0fYUza\xd3\xec\xe0"
|
||||
assert_error {ERR DUMP payload version or checksum are wrong} {r restore foo 0 $bar_dump replace}
|
||||
}
|
||||
|
||||
test {RESTORE key with future RDB version, relaxed version check} {
|
||||
r config set rdb-version-check relaxed
|
||||
# |type|len| | RDB | CRC64 |
|
||||
# |str | 3 | "bar" | 222 | checksum |
|
||||
r restore foo 0 "\x00\x03\x62\x61\x72\xde\x00\x0fYUza\xd3\xec\xe0" replace
|
||||
r config set rdb-version-check strict
|
||||
assert_equal {bar} [r get foo]
|
||||
}
|
||||
|
||||
test {DUMP of non existing key returns nil} {
|
||||
r dump nonexisting_key
|
||||
} {}
|
||||
|
@ -7,7 +7,7 @@ start_server {tags {"pause network"}} {
|
||||
assert_equal [s paused_actions] "write"
|
||||
after 1000
|
||||
set timeout [s paused_timeout_milliseconds]
|
||||
assert {$timeout > 0 && $timeout < 9000}
|
||||
assert {$timeout > 0 && $timeout <= 9000}
|
||||
r client unpause
|
||||
|
||||
r multi
|
||||
|
@ -236,20 +236,14 @@ start_server {tags {"tracking network logreqres:skip"}} {
|
||||
}
|
||||
|
||||
test {RESP3 Client gets tracking-redir-broken push message after cached key changed when rediretion client is terminated} {
|
||||
# make sure r is working resp 3
|
||||
r HELLO 3
|
||||
r CLIENT TRACKING on REDIRECT $redir_id
|
||||
$rd_sg SET key1 1
|
||||
r GET key1
|
||||
$rd_redirection QUIT
|
||||
assert_equal OK [$rd_redirection read]
|
||||
$rd_sg SET key1 2
|
||||
set MAX_TRIES 100
|
||||
set res -1
|
||||
for {set i 0} {$i <= $MAX_TRIES && $res < 0} {incr i} {
|
||||
set res [lsearch -exact [r PING] "tracking-redir-broken"]
|
||||
}
|
||||
assert {$res >= 0}
|
||||
# Consume PING reply
|
||||
assert_equal PONG [r read]
|
||||
|
||||
# Reinstantiating after QUIT
|
||||
set rd_redirection [valkey_deferring_client]
|
||||
@ -257,6 +251,15 @@ start_server {tags {"tracking network logreqres:skip"}} {
|
||||
set redir_id [$rd_redirection read]
|
||||
$rd_redirection SUBSCRIBE __redis__:invalidate
|
||||
$rd_redirection read ; # Consume the SUBSCRIBE reply
|
||||
|
||||
# Wait to read the tracking-redir-broken
|
||||
wait_for_condition 1000 50 {
|
||||
[lsearch -exact [r PING] "tracking-redir-broken"]
|
||||
} else {
|
||||
fail "Failed to get redirect broken indication"
|
||||
}
|
||||
# Consume PING reply
|
||||
assert_equal PONG [r read]
|
||||
}
|
||||
|
||||
test {Different clients can redirect to the same connection} {
|
||||
|
@ -549,6 +549,15 @@ rdbcompression yes
|
||||
# tell the loading code to skip the check.
|
||||
rdbchecksum yes
|
||||
|
||||
# Valkey can try to load an RDB dump produced by a future version of Valkey.
|
||||
# This can only work on a best-effort basis, because future RDB versions may
|
||||
# contain information that's not known to the current version. If no new features
|
||||
# are used, it may be possible to import the data produced by a later version,
|
||||
# but loading is aborted if unknown information is encountered. Possible values
|
||||
# are 'strict' and 'relaxed'. This also applies to replication and the RESTORE
|
||||
# command.
|
||||
rdb-version-check strict
|
||||
|
||||
# Enables or disables full sanitization checks for ziplist and listpack etc when
|
||||
# loading an RDB or RESTORE payload. This reduces the chances of a assertion or
|
||||
# crash later on while processing commands.
|
||||
|
Loading…
x
Reference in New Issue
Block a user