diff --git a/src/config.c b/src/config.c index ffafb14eb..162165c1d 100644 --- a/src/config.c +++ b/src/config.c @@ -857,6 +857,10 @@ void loadServerConfigFromString(char *config) { server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"multi-master") && argc == 2){ + if ((server.enable_multimaster = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1336,6 +1340,8 @@ void configSetCommand(client *c) { "loglevel",server.verbosity,loglevel_enum) { } config_set_enum_field( "maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) { + } config_set_bool_field( + "multi-master", server.enable_multimaster) { } config_set_enum_field( "appendfsync",server.aof_fsync,aof_fsync_enum) { @@ -2378,6 +2384,7 @@ int rewriteConfig(char *path) { rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH); rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ); rewriteConfigYesNoOption(state,"active-replica",server.fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA); + rewriteConfigYesNoOption(state,"multi-master",server.enable_multimaster,CONFIG_DEFAULT_ENABLE_MULTIMASTER); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); diff --git a/src/replication.cpp b/src/replication.cpp index 0e31889f9..915dfefc0 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1231,6 +1231,7 @@ void replicationEmptyDbCallback(void *privdata) { * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) { + serverAssert(mi->master == nullptr); mi->master = createClient(fd, serverTL - server.rgthreadvar); mi->master->flags |= CLIENT_MASTER; mi->master->authenticated = 1; @@ -1248,8 +1249,6 @@ void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) { if (mi->master->reploff == -1) mi->master->flags |= CLIENT_PRE_PSYNC; if (dbid != -1) selectDb(mi->master,dbid); - - listAddNodeTail(server.masters, mi); } /* This function will try to re-enable the AOF file after the @@ -2163,9 +2162,29 @@ int cancelReplicationHandshake(redisMaster *mi) { /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { - redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); - initMasterInfo(mi); - int was_master = mi->masterhost == NULL; + // pre-reqs: We must not already have a replica in the list with the same tuple + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *miCheck = (redisMaster*)listNodeValue(ln); + serverAssert(strcasecmp(miCheck->masterhost, ip) || miCheck->masterport != port); + } + + // Pre-req satisfied, lets continue + int was_master = listLength(server.masters) == 0; + redisMaster *mi = nullptr; + if (!server.enable_multimaster && listLength(server.masters)) { + serverAssert(listLength(server.masters) == 1); + mi = (redisMaster*)listNodeValue(listFirst(server.masters)); + } + else + { + mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + initMasterInfo(mi); + listAddNodeTail(server.masters, mi); + } sdsfree(mi->masterhost); mi->masterhost = sdsnew(ip); @@ -2186,14 +2205,14 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { * our own parameters, to later PSYNC with the new master. */ if (was_master) replicationCacheMasterUsingMyself(mi); mi->repl_state = REPL_STATE_CONNECT; - listAddNodeTail(server.masters, mi); return mi; } /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(redisMaster *mi) { - if (mi->masterhost == NULL) return; /* Nothing to do. */ + serverAssert(mi->masterhost != NULL); sdsfree(mi->masterhost); + mi->masterhost = NULL; /* When a slave is turned into a master, the current replication ID * (that was inherited from the master at synchronization time) is @@ -2226,6 +2245,10 @@ void replicationUnsetMaster(redisMaster *mi) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + listNode *ln = listSearchKey(server.masters, mi); + serverAssert(ln != nullptr); + listDelNode(server.masters, ln); } /* This function is called when the slave lose the connection with the @@ -2252,12 +2275,9 @@ void replicaofCommand(client *c) { if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && !strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) { if (listLength(server.masters)) { - listIter li; - listNode *ln; - listRewind(server.masters, &li); - while ((ln = listNext(&li))) + while (listLength(server.masters)) { - replicationUnsetMaster((redisMaster*)listNodeValue(ln)); + replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(server.masters))); } sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", diff --git a/src/server.cpp b/src/server.cpp index 445039dfe..f038c052d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2279,6 +2279,9 @@ void initServerConfig(void) { server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); clearReplicationId2(); + server.clients = listCreate(); + server.slaves = listCreate(); + server.monitors = listCreate(); server.timezone = getTimeZone(); /* Initialized by tzset(). */ server.configfile = NULL; server.executable = NULL; @@ -2397,6 +2400,7 @@ void initServerConfig(void) { /* Replication related */ server.masters = listCreate(); + server.enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; @@ -2858,11 +2862,8 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; - server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); - server.slaves = listCreate(); - server.monitors = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); diff --git a/src/server.h b/src/server.h index f9101ba5f..8b141f798 100644 --- a/src/server.h +++ b/src/server.h @@ -188,6 +188,7 @@ extern "C" { #define CONFIG_DEFAULT_THREAD_AFFINITY 0 #define CONFIG_DEFAULT_ACTIVE_REPLICA 0 +#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0 #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -1326,6 +1327,7 @@ struct redisServer { int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ list *masters; + int enable_multimaster; int repl_timeout; /* Timeout after N seconds of master idle */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 74f491e48..c181b3431 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -48,7 +48,7 @@ proc warnings_from_file {filename} { # Return value for INFO property proc status {r property} { - if {[regexp "\r\n$property:(.*?)\r\n" [{*}$r info] _ value]} { + if {[regexp "\r\n(\t*?)$property:(.*?)\r\n" [{*}$r info] _ __ value]} { set _ $value } }