Active Replica Support
Former-commit-id: a7aa2b074049a130761bc0a98d47130b6a0ff817
This commit is contained in:
parent
d18905fd26
commit
ccb9cb8b01
@ -1562,3 +1562,8 @@ server-threads 2
|
||||
# Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads.
|
||||
# When enabled threads are bount to cores sequentially starting at core 0.
|
||||
# server-thread-affinity true
|
||||
|
||||
# Uncomment the option below to enable Active Active support. Note that
|
||||
# replicas will still sync in the normal way and incorrect ordering when
|
||||
# bringing up replicas can result in data loss (the first master will win).
|
||||
# active-replica yes
|
||||
|
@ -94,7 +94,7 @@ endif
|
||||
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
|
||||
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS)
|
||||
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
|
||||
FINAL_LIBS=-lm
|
||||
FINAL_LIBS=-lm -luuid
|
||||
DEBUG=-g -ggdb
|
||||
|
||||
ifeq ($(uname_S),SunOS)
|
||||
|
11
src/config.c
11
src/config.c
@ -849,6 +849,16 @@ void loadServerConfigFromString(char *config) {
|
||||
err = "Unknown argument: server-thread-affinity expects either true or false";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0], "active-replica") && argc == 2) {
|
||||
server.fActiveReplica = yesnotoi(argv[1]);
|
||||
if (server.repl_slave_ro) {
|
||||
server.repl_slave_ro = FALSE;
|
||||
serverLog(LL_NOTICE, "Notice: \"active-replica yes\" implies \"replica-read-only no\"");
|
||||
}
|
||||
if (server.fActiveReplica == -1) {
|
||||
server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA;
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else {
|
||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||
}
|
||||
@ -2350,6 +2360,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL);
|
||||
rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH);
|
||||
rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ);
|
||||
rewriteConfigYesNoOption(state,"active-replica",server.fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA);
|
||||
|
||||
/* Rewrite Sentinel config if in Sentinel mode. */
|
||||
if (server.sentinel_mode) rewriteConfigSentinelOption(state);
|
||||
|
@ -226,6 +226,7 @@ client *createClient(int fd, int iel) {
|
||||
c->bufAsync = NULL;
|
||||
c->buflenAsync = 0;
|
||||
c->bufposAsync = 0;
|
||||
memset(c->uuid, 0, UUID_BINARY_LEN);
|
||||
|
||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
||||
@ -2135,7 +2136,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* corresponding part of the replication stream, will be propagated to
|
||||
* the sub-slaves and to the replication backlog. */
|
||||
processInputBufferAndReplicate(c);
|
||||
aelock.arm(nullptr);
|
||||
aelock.arm(c);
|
||||
ProcessPendingAsyncWrites();
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
#include <mutex>
|
||||
#include <uuid/uuid.h>
|
||||
|
||||
void replicationDiscardCachedMaster(void);
|
||||
void replicationResurrectCachedMaster(int newfd);
|
||||
@ -74,6 +75,21 @@ char *replicationGetSlaveName(client *c) {
|
||||
return buf;
|
||||
}
|
||||
|
||||
static bool FSameHost(client *clientA, client *clientB)
|
||||
{
|
||||
const unsigned char *a = clientA->uuid;
|
||||
const unsigned char *b = clientB->uuid;
|
||||
|
||||
unsigned char zeroCheck = 0;
|
||||
for (int i = 0; i < UUID_BINARY_LEN; ++i)
|
||||
{
|
||||
if (a[i] != b[i])
|
||||
return false;
|
||||
zeroCheck |= a[i];
|
||||
}
|
||||
return (zeroCheck != 0); // if the UUID is nil then it is never equal
|
||||
}
|
||||
|
||||
/* ---------------------------------- MASTER -------------------------------- */
|
||||
|
||||
void createReplicationBacklog(void) {
|
||||
@ -117,7 +133,13 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
|
||||
void freeReplicationBacklog(void) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert(listLength(server.slaves) == 0);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.slaves, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
// server.slaves should be empty, or filled with clients pending close
|
||||
serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP);
|
||||
}
|
||||
zfree(server.repl_backlog);
|
||||
server.repl_backlog = NULL;
|
||||
}
|
||||
@ -186,7 +208,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
* propagate *identical* replication stream. In this way this slave can
|
||||
* advertise the same replication ID as the master (since it shares the
|
||||
* master replication history and has the same backlog and offsets). */
|
||||
if (server.masterhost != NULL) return;
|
||||
if (!server.fActiveReplica && server.masterhost != NULL) return;
|
||||
|
||||
/* If there aren't slaves, and there is no backlog buffer to populate,
|
||||
* we can return ASAP. */
|
||||
@ -226,6 +248,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = (client*)ln->value;
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (server.current_client && FSameHost(server.current_client, slave)) continue;
|
||||
addReplyAsync(slave,selectcmd);
|
||||
}
|
||||
|
||||
@ -268,6 +291,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (server.current_client && FSameHost(server.current_client, slave)) continue;
|
||||
|
||||
/* Feed slaves that are waiting for the initial SYNC (so these commands
|
||||
* are queued in the output buffer until the initial SYNC completes),
|
||||
@ -282,10 +306,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
addReplyBulkAsync(slave,argv[j]);
|
||||
}
|
||||
|
||||
/* Release the lock on all slaves */
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
((client*)ln->value)->lock.unlock();
|
||||
client *slave = (client*)ln->value;
|
||||
slave->lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,6 +335,8 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = (client*)ln->value;
|
||||
std::lock_guard<decltype(slave->lock)> ulock(slave->lock);
|
||||
if (FSameHost(slave, server.master))
|
||||
continue; // Active Active case, don't feed back
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
@ -658,10 +684,12 @@ void syncCommand(client *c) {
|
||||
|
||||
/* Refuse SYNC requests if we are a slave but the link with our master
|
||||
* is not ok... */
|
||||
if (!server.fActiveReplica) {
|
||||
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
|
||||
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* SYNC can't be issued when the server has pending data to send to
|
||||
* the client about already issued commands. We need a fresh reply
|
||||
@ -790,6 +818,33 @@ void syncCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
void processReplconfUuid(client *c, robj *arg)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (arg->type != OBJ_STRING)
|
||||
throw "Invalid UUID";
|
||||
|
||||
const char *remoteUUID = (const char*)ptrFromObj(arg);
|
||||
if (strlen(remoteUUID) != 36)
|
||||
throw "Invalid UUID";
|
||||
|
||||
if (uuid_parse(remoteUUID, c->uuid) != 0)
|
||||
throw "Invalid UUID";
|
||||
|
||||
char szServerUUID[36 + 2]; // 1 for the '+', another for '\0'
|
||||
szServerUUID[0] = '+';
|
||||
uuid_unparse(server.uuid, szServerUUID+1);
|
||||
addReplyProto(c, szServerUUID, 37);
|
||||
addReplyProto(c, "\r\n", 2);
|
||||
}
|
||||
catch (const char *szErr)
|
||||
{
|
||||
addReplyError(c, szErr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* REPLCONF <option> <value> <option> <value> ...
|
||||
* This command is used by a slave in order to configure the replication
|
||||
* process before starting it with the SYNC command.
|
||||
@ -860,6 +915,10 @@ void replconfCommand(client *c) {
|
||||
* to the slave. */
|
||||
if (server.masterhost && server.master) replicationSendAck();
|
||||
return;
|
||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"uuid")) {
|
||||
/* REPLCONF uuid is used to set and send the UUID of each host */
|
||||
processReplconfUuid(c, c->argv[j+1]);
|
||||
return; // the process function replies to the client for both error and success
|
||||
} else {
|
||||
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
||||
(char*)ptrFromObj(c->argv[j]));
|
||||
@ -1134,6 +1193,10 @@ void replicationCreateMasterClient(int fd, int dbid) {
|
||||
server.master->reploff = server.master_initial_offset;
|
||||
server.master->read_reploff = server.master->reploff;
|
||||
server.master->puser = NULL; /* This client can do everything. */
|
||||
|
||||
memcpy(server.master->uuid, server.master_uuid, UUID_BINARY_LEN);
|
||||
memset(server.master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer
|
||||
|
||||
memcpy(server.master->replid, server.master_replid,
|
||||
sizeof(server.master_replid));
|
||||
/* If master offset is set to -1, this master is old and is not
|
||||
@ -1738,7 +1801,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
server.repl_state = REPL_STATE_RECEIVE_AUTH;
|
||||
return;
|
||||
} else {
|
||||
server.repl_state = REPL_STATE_SEND_PORT;
|
||||
server.repl_state = REPL_STATE_SEND_UUID;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1751,7 +1814,38 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
goto error;
|
||||
}
|
||||
sdsfree(err);
|
||||
server.repl_state = REPL_STATE_SEND_UUID;
|
||||
}
|
||||
|
||||
/* Send UUID */
|
||||
if (server.repl_state == REPL_STATE_SEND_UUID) {
|
||||
char szUUID[37];
|
||||
memset(server.master_uuid, 0, UUID_BINARY_LEN);
|
||||
uuid_unparse((unsigned char*)server.uuid, szUUID);
|
||||
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","uuid",szUUID);
|
||||
if (err) goto write_error;
|
||||
server.repl_state = REPL_STATE_RECEIVE_UUID;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Receive UUID */
|
||||
if (server.repl_state == REPL_STATE_RECEIVE_UUID) {
|
||||
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
|
||||
if (err[0] == '-') {
|
||||
serverLog(LL_WARNING, "non-fatal: Master doesn't understand REPLCONF uuid");
|
||||
}
|
||||
else {
|
||||
if (strlen(err) != 37 // 36-byte UUID string and the leading '+'
|
||||
|| uuid_parse(err+1, server.master_uuid) != 0)
|
||||
{
|
||||
serverLog(LL_WARNING, "Master replied with a UUID we don't understand");
|
||||
sdsfree(err);
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
sdsfree(err);
|
||||
server.repl_state = REPL_STATE_SEND_PORT;
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
/* Set the slave port, so that Master's INFO command can list the
|
||||
|
@ -58,6 +58,7 @@
|
||||
#include <locale.h>
|
||||
#include <sys/socket.h>
|
||||
#include <algorithm>
|
||||
#include <uuid/uuid.h>
|
||||
|
||||
/* Our shared "common" objects */
|
||||
|
||||
@ -2371,6 +2372,7 @@ void initServerConfig(void) {
|
||||
server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
|
||||
server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
|
||||
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
|
||||
server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA;
|
||||
|
||||
unsigned int lruclock = getLRUClock();
|
||||
atomicSet(server.lruclock,lruclock);
|
||||
@ -2962,6 +2964,10 @@ void initServer(void) {
|
||||
server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
|
||||
}
|
||||
|
||||
/* Generate UUID */
|
||||
static_assert(sizeof(uuid_t) == sizeof(server.uuid), "UUIDs are standardized at 16-bytes");
|
||||
uuid_generate((unsigned char*)server.uuid);
|
||||
|
||||
if (server.cluster_enabled) clusterInit();
|
||||
replicationScriptCacheInit();
|
||||
scriptingInit(1);
|
||||
|
36
src/server.h
36
src/server.h
@ -87,6 +87,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define UUID_BINARY_LEN 16
|
||||
|
||||
/* Error codes */
|
||||
#define C_OK 0
|
||||
#define C_ERR -1
|
||||
@ -186,6 +188,8 @@ extern "C" {
|
||||
#define CONFIG_DEFAULT_THREADS 1
|
||||
#define CONFIG_DEFAULT_THREAD_AFFINITY 0
|
||||
|
||||
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
|
||||
@ -335,17 +339,19 @@ extern "C" {
|
||||
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
|
||||
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
|
||||
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
|
||||
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
|
||||
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
|
||||
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
|
||||
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
|
||||
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
|
||||
#define REPL_STATE_SEND_UUID 6 /* send our UUID */
|
||||
#define REPL_STATE_RECEIVE_UUID 7 /* they should ack with their UUID */
|
||||
#define REPL_STATE_SEND_PORT 8 /* Send REPLCONF listening-port */
|
||||
#define REPL_STATE_RECEIVE_PORT 9 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_IP 10 /* Send REPLCONF ip-address */
|
||||
#define REPL_STATE_RECEIVE_IP 11 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_CAPA 12 /* Send REPLCONF capa */
|
||||
#define REPL_STATE_RECEIVE_CAPA 13 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_PSYNC 14 /* Send PSYNC */
|
||||
#define REPL_STATE_RECEIVE_PSYNC 15 /* Wait for PSYNC reply */
|
||||
/* --- End of handshake states --- */
|
||||
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
|
||||
#define REPL_STATE_CONNECTED 15 /* Connected to master */
|
||||
#define REPL_STATE_TRANSFER 16 /* Receiving .rdb from master */
|
||||
#define REPL_STATE_CONNECTED 17 /* Connected to master */
|
||||
|
||||
/* State of slaves from the POV of the master. Used in client->replstate.
|
||||
* In SEND_BULK and ONLINE state the slave receives new updates
|
||||
@ -871,6 +877,11 @@ typedef struct client {
|
||||
sds peerid; /* Cached peer ID. */
|
||||
listNode *client_list_node; /* list node in client list */
|
||||
|
||||
/* UUID announced by the client (default nil) - used to detect multiple connections to/from the same peer */
|
||||
/* compliant servers will announce their UUIDs when a replica connection is started, and return when asked */
|
||||
/* UUIDs are transient and lost when the server is shut down */
|
||||
unsigned char uuid[UUID_BINARY_LEN];
|
||||
|
||||
/* Response buffer */
|
||||
int bufpos;
|
||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||
@ -1423,6 +1434,11 @@ struct redisServer {
|
||||
pthread_mutex_t next_client_id_mutex;
|
||||
pthread_mutex_t unixtime_mutex;
|
||||
|
||||
int fActiveReplica; /* Can this replica also be a master? */
|
||||
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
||||
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
||||
/* After we've connected with our master use the UUID in server.master */
|
||||
|
||||
struct fastlock flock;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user