From 2c6fbac6ead427abd8b4543b358ca249281f17d0 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 17 Sep 2020 00:13:03 +0000 Subject: [PATCH] Allow reads during an active replica load, optionally allow writes as an experimental feature Former-commit-id: 655bb6e21c5e814980a7b8b3ac1481ef142c1845 --- keydb.conf | 7 +++++++ src/config.cpp | 1 + src/networking.cpp | 9 +++++---- src/rdb.cpp | 2 +- src/server.cpp | 10 +++++++--- src/server.h | 4 ++++ 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/keydb.conf b/keydb.conf index 4bdd96ac8..e8ca634b7 100644 --- a/keydb.conf +++ b/keydb.conf @@ -417,6 +417,13 @@ dir ./ # replica-serve-stale-data yes +# Active Replicas will allow read only data access while loading remote RDBs +# provided they are permitted to serve stale data. As an option you may also +# permit them to accept write commands. This is an EXPERIMENTAL feature and +# may result in commands not being fully synchronized +# +# allow-write-during-load no + # You can modify the number of masters necessary to form a replica quorum when # multi-master is enabled and replica-serve-stale-data is "no". By default # this is set to -1 which implies the number of known masters (e.g. those diff --git a/src/config.cpp b/src/config.cpp index 8392bfd52..030762daf 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2236,6 +2236,7 @@ standardConfig configs[] = { createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), + createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), diff --git a/src/networking.cpp b/src/networking.cpp index 666d2fdae..b5610763c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1264,9 +1264,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) void acceptOnThread(connection *conn, int flags, char *cip) { int ielCur = ielFromEventLoop(serverTL->el); + bool fBootLoad = (g_pserver->loading == LOADING_BOOT); int ielTarget = 0; - if (g_pserver->loading) + if (fBootLoad) { ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active } @@ -1290,10 +1291,10 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] { connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); @@ -1302,7 +1303,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) return; // If res != AE_OK we can still try to accept on the local thread } - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); aeAcquireLock(); diff --git a/src/rdb.cpp b/src/rdb.cpp index c6d39a010..7876ab00f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2059,7 +2059,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { * needed to provide loading stats. */ void startLoading(size_t size, int rdbflags) { /* Load the DB */ - g_pserver->loading = 1; + g_pserver->loading = (rdbflags & RDBFLAGS_REPLICATION) ? LOADING_REPLICATION : LOADING_BOOT; g_pserver->loading_start_time = time(NULL); g_pserver->loading_loaded_bytes = 0; g_pserver->loading_total_bytes = size; diff --git a/src/server.cpp b/src/server.cpp index faab9341d..c11ede661 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3849,8 +3849,12 @@ int processCommand(client *c, int callFlags) { /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) { - addReply(c, shared.loadingerr); - return C_OK; + /* Active Replicas can execute read only commands, and optionally write commands */ + if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad))) + { + addReply(c, shared.loadingerr); + return C_OK; + } } /* Lua script too slow? Only allow a limited number of commands. @@ -4490,7 +4494,7 @@ sds genRedisInfoString(const char *section) { "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", - g_pserver->loading.load(std::memory_order_relaxed), + !!g_pserver->loading.load(std::memory_order_relaxed), /* Note: libraries expect 1 or 0 here so coerce our enum */ g_pserver->dirty, g_pserver->rdb_child_pid != -1, (intmax_t)g_pserver->lastsave, diff --git a/src/server.h b/src/server.h index 7f228c478..9527bab1c 100644 --- a/src/server.h +++ b/src/server.h @@ -104,6 +104,9 @@ typedef long long ustime_t; /* microsecond time type. */ #include "endianconv.h" #include "crc64.h" +#define LOADING_BOOT 1 +#define LOADING_REPLICATION 2 + extern int g_fTestMode; struct redisObject; @@ -2002,6 +2005,7 @@ struct redisServer { int watchdog_period; /* Software watchdog period in ms. 0 = off */ int fActiveReplica; /* Can this replica also be a master? */ + int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */ // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond