Allow reads during an active replica load, optionally allow writes as an experimental feature

Former-commit-id: 655bb6e21c5e814980a7b8b3ac1481ef142c1845
This commit is contained in:
John Sully 2020-09-17 00:13:03 +00:00
parent 4eecb1825f
commit 2c6fbac6ea
6 changed files with 25 additions and 8 deletions

View File

@ -417,6 +417,13 @@ dir ./
# #
replica-serve-stale-data yes replica-serve-stale-data yes
# Active Replicas will allow read only data access while loading remote RDBs
# provided they are permitted to serve stale data. As an option you may also
# permit them to accept write commands. This is an EXPERIMENTAL feature and
# may result in commands not being fully synchronized
#
# allow-write-during-load no
# You can modify the number of masters necessary to form a replica quorum when # You can modify the number of masters necessary to form a replica quorum when
# multi-master is enabled and replica-serve-stale-data is "no". By default # multi-master is enabled and replica-serve-stale-data is "no". By default
# this is set to -1 which implies the number of known masters (e.g. those # this is set to -1 which implies the number of known masters (e.g. those

View File

@ -2236,6 +2236,7 @@ standardConfig configs[] = {
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL),
createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL),
/* String Configs */ /* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),

View File

@ -1264,9 +1264,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
void acceptOnThread(connection *conn, int flags, char *cip) void acceptOnThread(connection *conn, int flags, char *cip)
{ {
int ielCur = ielFromEventLoop(serverTL->el); int ielCur = ielFromEventLoop(serverTL->el);
bool fBootLoad = (g_pserver->loading == LOADING_BOOT);
int ielTarget = 0; int ielTarget = 0;
if (g_pserver->loading) if (fBootLoad)
{ {
ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active
} }
@ -1290,10 +1291,10 @@ void acceptOnThread(connection *conn, int flags, char *cip)
szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN); memcpy(szT, cip, NET_IP_STR_LEN);
} }
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] {
connMarshalThread(conn); connMarshalThread(conn);
acceptCommonHandler(conn,flags,szT,ielTarget); acceptCommonHandler(conn,flags,szT,ielTarget);
if (!g_fTestMode && !g_pserver->loading) if (!g_fTestMode && !fBootLoad)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT); zfree(szT);
}); });
@ -1302,7 +1303,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
return; return;
// If res != AE_OK we can still try to accept on the local thread // If res != AE_OK we can still try to accept on the local thread
} }
if (!g_fTestMode && !g_pserver->loading) if (!g_fTestMode && !fBootLoad)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
aeAcquireLock(); aeAcquireLock();

View File

@ -2059,7 +2059,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
* needed to provide loading stats. */ * needed to provide loading stats. */
void startLoading(size_t size, int rdbflags) { void startLoading(size_t size, int rdbflags) {
/* Load the DB */ /* Load the DB */
g_pserver->loading = 1; g_pserver->loading = (rdbflags & RDBFLAGS_REPLICATION) ? LOADING_REPLICATION : LOADING_BOOT;
g_pserver->loading_start_time = time(NULL); g_pserver->loading_start_time = time(NULL);
g_pserver->loading_loaded_bytes = 0; g_pserver->loading_loaded_bytes = 0;
g_pserver->loading_total_bytes = size; g_pserver->loading_total_bytes = size;

View File

@ -3849,9 +3849,13 @@ int processCommand(client *c, int callFlags) {
/* Loading DB? Return an error if the command has not the /* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */ * CMD_LOADING flag. */
if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) { if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) {
/* Active Replicas can execute read only commands, and optionally write commands */
if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad)))
{
addReply(c, shared.loadingerr); addReply(c, shared.loadingerr);
return C_OK; return C_OK;
} }
}
/* Lua script too slow? Only allow a limited number of commands. /* Lua script too slow? Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients * Note that we need to allow the transactions commands, otherwise clients
@ -4490,7 +4494,7 @@ sds genRedisInfoString(const char *section) {
"aof_last_cow_size:%zu\r\n" "aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n" "module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n", "module_fork_last_cow_size:%zu\r\n",
g_pserver->loading.load(std::memory_order_relaxed), !!g_pserver->loading.load(std::memory_order_relaxed), /* Note: libraries expect 1 or 0 here so coerce our enum */
g_pserver->dirty, g_pserver->dirty,
g_pserver->rdb_child_pid != -1, g_pserver->rdb_child_pid != -1,
(intmax_t)g_pserver->lastsave, (intmax_t)g_pserver->lastsave,

View File

@ -104,6 +104,9 @@ typedef long long ustime_t; /* microsecond time type. */
#include "endianconv.h" #include "endianconv.h"
#include "crc64.h" #include "crc64.h"
#define LOADING_BOOT 1
#define LOADING_REPLICATION 2
extern int g_fTestMode; extern int g_fTestMode;
struct redisObject; struct redisObject;
@ -2002,6 +2005,7 @@ struct redisServer {
int watchdog_period; /* Software watchdog period in ms. 0 = off */ int watchdog_period; /* Software watchdog period in ms. 0 = off */
int fActiveReplica; /* Can this replica also be a master? */ int fActiveReplica; /* Can this replica also be a master? */
int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */
// Format: // Format:
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Lower 20 bits: a counter incrementing for each command executed in the same millisecond