Update more slave instances to use replica (Issue #75)

Former-commit-id: 252725d50fc9d4ff2b6e9246a36c38176d61beae
This commit is contained in:
John Sully 2019-10-13 12:29:20 -04:00
parent d4e4b4f4d1
commit c61b6cc8fd
20 changed files with 337 additions and 337 deletions

View File

@ -666,7 +666,7 @@ client *createFakeClient(void) {
c->flags = 0; c->flags = 0;
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
c->btype = BLOCKED_NONE; c->btype = BLOCKED_NONE;
/* We set the fake client as a slave waiting for the synchronization /* We set the fake client as a replica waiting for the synchronization
* so that Redis will not try to send replies to this client. */ * so that Redis will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->reply = listCreate(); c->reply = listCreate();

View File

@ -221,8 +221,8 @@ void replyToBlockedClientTimedOut(client *c) {
/* Mass-unblock clients because something changed in the instance that makes /* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations * blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function * in an instance which turns from master to replica is unsafe, so this function
* is called when a master turns into a slave. * is called when a master turns into a replica.
* *
* The semantics is to send an -UNBLOCKED error to the client, disconnecting * The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */ * it at the same time. */

View File

@ -96,7 +96,7 @@ configEnum aof_fsync_enum[] = {
/* Output buffer limits presets. */ /* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */ {0, 0, 0}, /* normal */
{1024*1024*256, 1024*1024*64, 60}, /* slave */ {1024*1024*256, 1024*1024*64, 60}, /* replica */
{1024*1024*32, 1024*1024*8, 60} /* pubsub */ {1024*1024*32, 1024*1024*8, 60} /* pubsub */
}; };

View File

@ -115,7 +115,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
* LOOKUP_NOTOUCH: don't alter the last access time of the key. * LOOKUP_NOTOUCH: don't alter the last access time of the key.
* *
* Note: this function also returns NULL if the key is logically expired * Note: this function also returns NULL if the key is logically expired
* but still existing, in case this is a slave, since this API is called only * but still existing, in case this is a replica, since this API is called only
* for read operations. Even if the key expiry is master-driven, we can * for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging * correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */ * expiring our key via DELs in the replication link. */
@ -133,7 +133,7 @@ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
return NULL; return NULL;
} }
/* However if we are in the context of a slave, expireIfNeeded() will /* However if we are in the context of a replica, expireIfNeeded() will
* not really try to expire the key, it only returns information * not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the * about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set. * master in order to have a consistent view of master's data set.
@ -344,7 +344,7 @@ robj *dbRandomKey(redisDb *db) {
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set, /* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically * it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because * expired in the replica, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because * expireIfNeeded() is false, nor it can stop because
* dictGetRandomKey() returns NULL (there are keys to return). * dictGetRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there * To prevent the infinite loop we do some tries, but if there
@ -1368,7 +1368,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) {
* to all the slaves and the AOF file if enabled. * to all the slaves and the AOF file if enabled.
* *
* This way the key expiry is centralized in one place, and since both * This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything * AOF and the master->replica link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring * will be consistent even if we allow write operations against expiring
* keys. */ * keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) { void propagateExpire(redisDb *db, robj *key, int lazy) {
@ -1426,10 +1426,10 @@ int keyIsExpired(redisDb *db, robj *key) {
* is via lookupKey*() family of functions. * is via lookupKey*() family of functions.
* *
* The behavior of the function depends on the replication role of the * The behavior of the function depends on the replication role of the
* instance, because slave instances do not expire keys, they wait * instance, because replica instances do not expire keys, they wait
* for DELs from the master for consistency matters. However even * for DELs from the master for consistency matters. However even
* slaves will try to have a coherent return value for the function, * slaves will try to have a coherent return value for the function,
* so that read commands executed in the slave side will be able to * so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the * behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL). * master has yet to propagate the DEL).
* *
@ -1442,9 +1442,9 @@ int keyIsExpired(redisDb *db, robj *key) {
int expireIfNeeded(redisDb *db, robj *key) { int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0; if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave, instead of /* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP: * evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will * the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys. * send us synthesized DEL operations for expired keys.
* *
* Still we try to return the right information to the caller, * Still we try to return the right information to the caller,

View File

@ -374,8 +374,8 @@ size_t freeMemoryGetNotCountedMemory(void) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln); client *replica = (client*)listNodeValue(ln);
overhead += getClientOutputBufferMemoryUsage(slave); overhead += getClientOutputBufferMemoryUsage(replica);
} }
} }
if (g_pserver->aof_state != AOF_OFF) { if (g_pserver->aof_state != AOF_OFF) {

View File

@ -307,17 +307,17 @@ void activeExpireCycle(int type) {
* *
* Normally slaves do not process expires: they wait the masters to synthesize * Normally slaves do not process expires: they wait the masters to synthesize
* DEL operations in order to retain consistency. However writable slaves are * DEL operations in order to retain consistency. However writable slaves are
* an exception: if a key is created in the slave and an expire is assigned * an exception: if a key is created in the replica and an expire is assigned
* to it, we need a way to expire such a key, since the master does not know * to it, we need a way to expire such a key, since the master does not know
* anything about such a key. * anything about such a key.
* *
* In order to do so, we track keys created in the slave side with an expire * In order to do so, we track keys created in the replica side with an expire
* set, and call the expireSlaveKeys() function from time to time in order to * set, and call the expireSlaveKeys() function from time to time in order to
* reclaim the keys if they already expired. * reclaim the keys if they already expired.
* *
* Note that the use case we are trying to cover here, is a popular one where * Note that the use case we are trying to cover here, is a popular one where
* slaves are put in writable mode in order to compute slow operations in * slaves are put in writable mode in order to compute slow operations in
* the slave side that are mostly useful to actually read data in a more * the replica side that are mostly useful to actually read data in a more
* processed way. Think at sets intersections in a tmp key, with an expire so * processed way. Think at sets intersections in a tmp key, with an expire so
* that it is also used as a cache to avoid intersecting every time. * that it is also used as a cache to avoid intersecting every time.
* *
@ -326,7 +326,7 @@ void activeExpireCycle(int type) {
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
/* The dictionary where we remember key names and database ID of keys we may /* The dictionary where we remember key names and database ID of keys we may
* want to expire from the slave. Since this function is not often used we * want to expire from the replica. Since this function is not often used we
* don't even care to initialize the database at startup. We'll do it once * don't even care to initialize the database at startup. We'll do it once
* the feature is used the first time, that is, when rememberSlaveKeyWithExpire() * the feature is used the first time, that is, when rememberSlaveKeyWithExpire()
* is called. * is called.
@ -389,7 +389,7 @@ void expireSlaveKeys(void) {
} }
/* Set the new bitmap as value of the key, in the dictionary /* Set the new bitmap as value of the key, in the dictionary
* of keys with an expire set directly in the writable slave. Otherwise * of keys with an expire set directly in the writable replica. Otherwise
* if the bitmap is zero, we no longer need to keep track of it. */ * if the bitmap is zero, we no longer need to keep track of it. */
if (new_dbids) if (new_dbids)
dictSetUnsignedIntegerVal(de,new_dbids); dictSetUnsignedIntegerVal(de,new_dbids);
@ -406,7 +406,7 @@ void expireSlaveKeys(void) {
} }
/* Track keys that received an EXPIRE or similar command in the context /* Track keys that received an EXPIRE or similar command in the context
* of a writable slave. */ * of a writable replica. */
void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
if (slaveKeysWithExpire == NULL) { if (slaveKeysWithExpire == NULL) {
static dictType dt = { static dictType dt = {
@ -448,7 +448,7 @@ size_t getSlaveKeyWithExpireCount(void) {
* *
* Note: technically we should handle the case of a single DB being flushed * Note: technically we should handle the case of a single DB being flushed
* but it is not worth it since anyway race conditions using the same set * but it is not worth it since anyway race conditions using the same set
* of key names in a wriatable slave and in its master will lead to * of key names in a wriatable replica and in its master will lead to
* inconsistencies. This is just a best-effort thing we do. */ * inconsistencies. This is just a best-effort thing we do. */
void flushSlaveKeysWithExpireList(void) { void flushSlaveKeysWithExpireList(void) {
if (slaveKeysWithExpire) { if (slaveKeysWithExpire) {
@ -486,7 +486,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context * should never be executed as a DEL when load the AOF or in the context
* of a slave instance. * of a replica instance.
* *
* Instead we take the other branch of the IF statement setting an expire * Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */ * (possibly in the past) and wait for an explicit DEL from the master. */

View File

@ -1439,7 +1439,7 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
* *
* * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
* *
* * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a replica
* *
* * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only * * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
* *
@ -4289,7 +4289,7 @@ size_t RM_GetClusterSize(void) {
* *
* The arguments ip, master_id, port and flags can be NULL in case we don't * The arguments ip, master_id, port and flags can be NULL in case we don't
* need to populate back certain info. If an ip and master_id (only populated * need to populate back certain info. If an ip and master_id (only populated
* if the instance is a slave) are specified, they point to buffers holding * if the instance is a replica) are specified, they point to buffers holding
* at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
* and master_id are not null terminated. * and master_id are not null terminated.
* *
@ -4300,7 +4300,7 @@ size_t RM_GetClusterSize(void) {
* * REDISMODULE_NODE_SLAVE The node is a replica * * REDISMODULE_NODE_SLAVE The node is a replica
* * REDISMODULE_NODE_PFAIL We see the node as failing * * REDISMODULE_NODE_PFAIL We see the node as failing
* * REDISMODULE_NODE_FAIL The cluster agrees the node is failing * * REDISMODULE_NODE_FAIL The cluster agrees the node is failing
* * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover * * REDISMODULE_NODE_NOFAILOVER The replica is configured to never failover
*/ */
clusterNode *clusterLookupNode(const char *name); /* We need access to internals */ clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
@ -5181,7 +5181,7 @@ void moduleInitModulesSystem(void) {
* The function aborts the server on errors, since to start with missing * The function aborts the server on errors, since to start with missing
* modules is not considered sane: clients may rely on the existence of * modules is not considered sane: clients may rely on the existence of
* given commands, loading AOF also may need some modules to exist, and * given commands, loading AOF also may need some modules to exist, and
* if this instance is a slave, it must understand commands from master. */ * if this instance is a replica, it must understand commands from master. */
void moduleLoadFromQueue(void) { void moduleLoadFromQueue(void) {
listIter li; listIter li;
listNode *ln; listNode *ln;

View File

@ -142,7 +142,7 @@ void execCommand(client *c) {
} }
/* If there are write commands inside the transaction, and this is a read /* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction * only replica, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and * was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into * then the configuration changed (for example instance was turned into
* a replica). */ * a replica). */
@ -195,7 +195,7 @@ void execCommand(client *c) {
int is_master = listLength(g_pserver->masters) == 0; int is_master = listLength(g_pserver->masters) == 0;
g_pserver->dirty++; g_pserver->dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly /* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the * switched from master to replica (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the * initial MULTI was propagated into the replication backlog, but the
* rest was not. We need to make sure to at least terminate the * rest was not. We need to make sure to at least terminate the
* backlog with the final EXEC. */ * backlog with the final EXEC. */

View File

@ -195,7 +195,7 @@ client *createClient(int fd, int iel) {
* buffers can hold, then we'll really install the handler. */ * buffers can hold, then we'll really install the handler. */
void clientInstallWriteHandler(client *c) { void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only /* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive * if not already done and, for slaves, if the replica can actually receive
* writes at this stage. */ * writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) && if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE || (c->replstate == REPL_STATE_NONE ||
@ -239,7 +239,7 @@ void clientInstallAsyncWriteHandler(client *c) {
* *
* 1) The event handler should already be installed since the output buffer * 1) The event handler should already be installed since the output buffer
* already contains something. * already contains something.
* 2) The client is a slave but not yet online, so we want to just accumulate * 2) The client is a replica but not yet online, so we want to just accumulate
* writes in the buffer but not actually sending them yet. * writes in the buffer but not actually sending them yet.
* *
* Typically gets called every time a reply is built, before adding more * Typically gets called every time a reply is built, before adding more
@ -442,7 +442,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
addReplyProtoCore(c,s,len,fAsync); addReplyProtoCore(c,s,len,fAsync);
addReplyProtoCore(c,"\r\n",2,fAsync); addReplyProtoCore(c,"\r\n",2,fAsync);
/* Sometimes it could be normal that a slave replies to a master with /* Sometimes it could be normal that a replica replies to a master with
* an error and this function gets called. Actually the error will never * an error and this function gets called. Actually the error will never
* be sent because addReply*() against master clients has no effect... * be sent because addReply*() against master clients has no effect...
* A notable example is: * A notable example is:
@ -1214,7 +1214,7 @@ void unlinkClient(client *c) {
/* In the case of diskless replication the fork is writing to the /* In the case of diskless replication the fork is writing to the
* sockets and just closing the fd isn't enough, if we don't also * sockets and just closing the fd isn't enough, if we don't also
* shutdown the socket the fork will continue to write to the slave * shutdown the socket the fork will continue to write to the replica
* and the salve will only find out that it was disconnected when * and the salve will only find out that it was disconnected when
* it will finish reading the rdb. */ * it will finish reading the rdb. */
if ((c->flags & CLIENT_SLAVE) && if ((c->flags & CLIENT_SLAVE) &&
@ -1299,7 +1299,7 @@ bool freeClient(client *c) {
} }
} }
/* Log link disconnection with slave */ /* Log link disconnection with replica */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
serverLog(LL_WARNING,"Connection with replica %s lost.", serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c)); replicationGetSlaveName(c));
@ -1333,8 +1333,8 @@ bool freeClient(client *c) {
* places where active clients may be referenced. */ * places where active clients may be referenced. */
unlinkClient(c); unlinkClient(c);
/* Master/slave cleanup Case 1: /* Master/replica cleanup Case 1:
* we lost the connection with a slave. */ * we lost the connection with a replica. */
if (c->flags & CLIENT_SLAVE) { if (c->flags & CLIENT_SLAVE) {
if (c->replstate == SLAVE_STATE_SEND_BULK) { if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd); if (c->repldbfd != -1) close(c->repldbfd);
@ -1352,7 +1352,7 @@ bool freeClient(client *c) {
refreshGoodSlavesCount(); refreshGoodSlavesCount();
} }
/* Master/slave cleanup Case 2: /* Master/replica cleanup Case 2:
* we lost the connection with the master. */ * we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(MasterInfoFromClient(c)); if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(MasterInfoFromClient(c));
@ -1494,7 +1494,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
* just deliver as much data as it is possible to deliver. * just deliver as much data as it is possible to deliver.
* *
* Moreover, we also send as much as possible if the client is * Moreover, we also send as much as possible if the client is
* a slave (otherwise, on high-speed traffic, the replication * a replica (otherwise, on high-speed traffic, the replication
* buffer will grow indefinitely) */ * buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT && if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 || (g_pserver->maxmemory == 0 ||
@ -1805,7 +1805,7 @@ int processInlineBuffer(client *c) {
} }
/* Newline from slaves can be used to refresh the last ACK time. /* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big * This is useful for a replica to ping back while loading a big
* RDB file. */ * RDB file. */
if (querylen == 0 && c->flags & CLIENT_SLAVE) if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = g_pserver->unixtime; c->repl_ack_time = g_pserver->unixtime;
@ -2038,8 +2038,8 @@ int processCommandAndResetClient(client *c, int flags) {
} }
if (serverTL->current_client == NULL) deadclient = 1; if (serverTL->current_client == NULL) deadclient = 1;
serverTL->current_client = NULL; serverTL->current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may /* freeMemoryIfNeeded may flush replica output buffers. This may
* result into a slave, that may be the active client, to be * result into a replica, that may be the active client, to be
* freed. */ * freed. */
return deadclient ? C_ERR : C_OK; return deadclient ? C_ERR : C_OK;
} }
@ -2060,7 +2060,7 @@ void processInputBuffer(client *c, int callFlags) {
if (c->flags & CLIENT_BLOCKED) break; if (c->flags & CLIENT_BLOCKED) break;
/* Don't process input from the master while there is a busy script /* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication * condition on the replica. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and * stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */ * later resume the processing. */
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break; if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break;
@ -2910,10 +2910,10 @@ void flushSlavesOutputBuffers(void) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln); client *replica = (client*)listNodeValue(ln);
int events; int events;
if (!FCorrectThread(slave)) if (!FCorrectThread(replica))
continue; // we cannot synchronously flush other thread's clients continue; // we cannot synchronously flush other thread's clients
/* Note that the following will not flush output buffers of slaves /* Note that the following will not flush output buffers of slaves
@ -2922,12 +2922,12 @@ void flushSlavesOutputBuffers(void) {
* of put_online_on_ack is to postpone the moment it is installed. * of put_online_on_ack is to postpone the moment it is installed.
* This is what we want since slaves in this state should not receive * This is what we want since slaves in this state should not receive
* writes before the first ACK. */ * writes before the first ACK. */
events = aeGetFileEvents(g_pserver->rgthreadvar[slave->iel].el,slave->fd); events = aeGetFileEvents(g_pserver->rgthreadvar[replica->iel].el,replica->fd);
if (events & AE_WRITABLE && if (events & AE_WRITABLE &&
slave->replstate == SLAVE_STATE_ONLINE && replica->replstate == SLAVE_STATE_ONLINE &&
clientHasPendingReplies(slave)) clientHasPendingReplies(replica))
{ {
writeToClient(slave->fd,slave,0); writeToClient(replica->fd,replica,0);
} }
} }
} }

View File

@ -2119,7 +2119,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is * received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the * responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */ * snapshot taken by the master may not be reflected on the replica. */
if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) { if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
decrRefCount(key); decrRefCount(key);
key = nullptr; key = nullptr;
@ -2273,7 +2273,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE;
g_pserver->rdb_save_time_start = -1; g_pserver->rdb_save_time_start = -1;
/* If the child returns an OK exit code, read the set of slave client /* If the child returns an OK exit code, read the set of replica client
* IDs and the associated status code. We'll terminate all the slaves * IDs and the associated status code. We'll terminate all the slaves
* in error state. * in error state.
* *
@ -2312,35 +2312,35 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
uint64_t j; uint64_t j;
int errorcode = 0; int errorcode = 0;
/* Search for the slave ID in the reply. In order for a slave to /* Search for the replica ID in the reply. In order for a replica to
* continue the replication process, we need to find it in the list, * continue the replication process, we need to find it in the list,
* and it must have an error code set to 0 (which means success). */ * and it must have an error code set to 0 (which means success). */
for (j = 0; j < ok_slaves[0]; j++) { for (j = 0; j < ok_slaves[0]; j++) {
if (slave->id == ok_slaves[2*j+1]) { if (replica->id == ok_slaves[2*j+1]) {
errorcode = ok_slaves[2*j+2]; errorcode = ok_slaves[2*j+2];
break; /* Found in slaves list. */ break; /* Found in slaves list. */
} }
} }
if (j == ok_slaves[0] || errorcode != 0) { if (j == ok_slaves[0] || errorcode != 0) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Closing slave %s: child->slave RDB transfer failed: %s", "Closing replica %s: child->replica RDB transfer failed: %s",
replicationGetSlaveName(slave), replicationGetSlaveName(replica),
(errorcode == 0) ? "RDB transfer child aborted" (errorcode == 0) ? "RDB transfer child aborted"
: strerror(errorcode)); : strerror(errorcode));
freeClient(slave); freeClient(replica);
} else { } else {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Slave %s correctly received the streamed RDB file.", "Slave %s correctly received the streamed RDB file.",
replicationGetSlaveName(slave)); replicationGetSlaveName(replica));
/* Restore the socket as non-blocking. */ /* Restore the socket as non-blocking. */
anetNonBlock(NULL,slave->fd); anetNonBlock(NULL,replica->fd);
anetSendTimeout(NULL,slave->fd,0); anetSendTimeout(NULL,replica->fd,0);
} }
} }
} }
@ -2407,17 +2407,17 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numfds] = slave->id; clientids[numfds] = replica->id;
fds[numfds++] = slave->fd; fds[numfds++] = replica->fd;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
/* Put the socket in blocking mode to simplify RDB transfer. /* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket * We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */ * will share the O_NONBLOCK attribute with the parent). */
anetBlock(NULL,slave->fd); anetBlock(NULL,replica->fd);
anetSendTimeout(NULL,slave->fd,g_pserver->repl_timeout*1000); anetSendTimeout(NULL,replica->fd,g_pserver->repl_timeout*1000);
} }
} }
@ -2451,19 +2451,19 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->child_info_data.cow_size = private_dirty; g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB); sendChildInfo(CHILD_INFO_TYPE_RDB);
/* If we are returning OK, at least one slave was served /* If we are returning OK, at least one replica was served
* with the RDB file as expected, so we need to send a report * with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is: * to the parent via the pipe. The format of the message is:
* *
* <len> <slave[0].id> <slave[0].error> ... * <len> <replica[0].id> <replica[0].error> ...
* *
* len, slave IDs, and slave errors, are all uint64_t integers, * len, replica IDs, and replica errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field * so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total * plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries. * of 'len' entries.
* *
* The 'id' represents the slave's client ID, so that the master * The 'id' represents the replica's client ID, so that the master
* can match the report with a specific slave, and 'error' is * can match the report with a specific replica, and 'error' is
* set to 0 if the replication process terminated with a success * set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */ * or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds), MALLOC_LOCAL); void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds), MALLOC_LOCAL);
@ -2504,12 +2504,12 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
int j; int j;
for (j = 0; j < numfds; j++) { for (j = 0; j < numfds; j++) {
if (slave->id == clientids[j]) { if (replica->id == clientids[j]) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break; break;
} }
} }
@ -2603,14 +2603,14 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
/* If the instance is a master, we can populate the replication info /* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL, * only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this * it means that the instance isn't in any replication chains. In this
* scenario the replication info is useless, because when a slave * scenario the replication info is useless, because when a replica
* connects to us, the NULL repl_backlog will trigger a full * connects to us, the NULL repl_backlog will trigger a full
* synchronization, at the same time we will use a new replid and clear * synchronization, at the same time we will use a new replid and clear
* replid2. */ * replid2. */
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
/* Note that when g_pserver->replicaseldb is -1, it means that this master /* Note that when g_pserver->replicaseldb is -1, it means that this master
* didn't apply any write commands after a full synchronization. * didn't apply any write commands after a full synchronization.
* So we can let repl_stream_db be 0, this allows a restarted slave * So we can let repl_stream_db be 0, this allows a restarted replica
* to reload replication ID/offset, it's safe because the next write * to reload replication ID/offset, it's safe because the next write
* command must generate a SELECT statement. */ * command must generate a SELECT statement. */
rsi->repl_stream_db = g_pserver->replicaseldb == -1 ? 0 : g_pserver->replicaseldb; rsi->repl_stream_db = g_pserver->replicaseldb == -1 ? 0 : g_pserver->replicaseldb;
@ -2624,7 +2624,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
} }
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
/* If the instance is a slave we need a connected master /* If the instance is a replica we need a connected master
* in order to fetch the currently selected DB. */ * in order to fetch the currently selected DB. */
if (miFirst && miFirst->master) { if (miFirst && miFirst->master) {
rsi->repl_stream_db = miFirst->master->db->id; rsi->repl_stream_db = miFirst->master->db->id;
@ -2632,7 +2632,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
} }
/* If we have a cached master we can use it in order to populate the /* If we have a cached master we can use it in order to populate the
* replication selected DB info inside the RDB file: the slave can * replication selected DB info inside the RDB file: the replica can
* increment the master_repl_offset only from data arriving from the * increment the master_repl_offset only from data arriving from the
* master, so if we are disconnected the offset in the cached master * master, so if we are disconnected the offset in the cached master
* is valid. */ * is valid. */

View File

@ -152,7 +152,7 @@ typedef struct clusterNode {
int port; int port;
sds name; sds name;
int flags; int flags;
sds replicate; /* Master ID if node is a slave */ sds replicate; /* Master ID if node is a replica */
int *slots; int *slots;
int slots_count; int slots_count;
int current_slot_index; int current_slot_index;

View File

@ -124,8 +124,8 @@ extern "C" void freeClusterManager(void) {
* *
* The score is calculated as follows: * The score is calculated as follows:
* *
* SAME_AS_MASTER = 10000 * each slave in the same IP of its master. * SAME_AS_MASTER = 10000 * each replica in the same IP of its master.
* SAME_AS_SLAVE = 1 * each slave having the same IP as another slave * SAME_AS_SLAVE = 1 * each replica having the same IP as another replica
of the same master. of the same master.
* FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE * FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
* *

View File

@ -187,7 +187,7 @@ typedef struct clusterManagerNode {
time_t ping_recv; time_t ping_recv;
int flags; int flags;
list *flags_str; /* Flags string representations */ list *flags_str; /* Flags string representations */
sds replicate; /* Master ID if node is a slave */ sds replicate; /* Master ID if node is a replica */
int dirty; /* Node has changes that can be flushed */ int dirty; /* Node has changes that can be flushed */
uint8_t slots[CLUSTER_MANAGER_SLOTS]; uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count; int slots_count;

View File

@ -71,9 +71,9 @@ extern "C" {
#define REDISMODULE_CTX_FLAGS_MULTI (1<<1) #define REDISMODULE_CTX_FLAGS_MULTI (1<<1)
/* The instance is a master */ /* The instance is a master */
#define REDISMODULE_CTX_FLAGS_MASTER (1<<2) #define REDISMODULE_CTX_FLAGS_MASTER (1<<2)
/* The instance is a slave */ /* The instance is a replica */
#define REDISMODULE_CTX_FLAGS_SLAVE (1<<3) #define REDISMODULE_CTX_FLAGS_SLAVE (1<<3)
/* The instance is read-only (usually meaning it's a slave as well) */ /* The instance is read-only (usually meaning it's a replica as well) */
#define REDISMODULE_CTX_FLAGS_READONLY (1<<4) #define REDISMODULE_CTX_FLAGS_READONLY (1<<4)
/* The instance is running in cluster mode */ /* The instance is running in cluster mode */
#define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5) #define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5)

View File

@ -46,13 +46,13 @@
void replicationDiscardCachedMaster(redisMaster *mi); void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, int newfd); void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
void replicationSendAck(redisMaster *mi); void replicationSendAck(redisMaster *mi);
void putSlaveOnline(client *slave); void putSlaveOnline(client *replica);
int cancelReplicationHandshake(redisMaster *mi); int cancelReplicationHandshake(redisMaster *mi);
/* --------------------------- Utility functions ---------------------------- */ /* --------------------------- Utility functions ---------------------------- */
/* Return the pointer to a string representing the slave ip:listening_port /* Return the pointer to a string representing the replica ip:listening_port
* pair. Mostly useful for logging, since we want to log a slave using its * pair. Mostly useful for logging, since we want to log a replica using its
* IP address and its listening port which is more clear for the user, for * IP address and its listening port which is more clear for the user, for
* example: "Closing connection with replica 10.1.2.3:6380". */ * example: "Closing connection with replica 10.1.2.3:6380". */
char *replicationGetSlaveName(client *c) { char *replicationGetSlaveName(client *c) {
@ -231,12 +231,12 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len); feedReplicationBacklog(p,len);
} }
void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc, bool fSendRaw) void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw)
{ {
char llstr[LONG_STR_SIZE]; char llstr[LONG_STR_SIZE];
std::unique_lock<decltype(slave->lock)> lock(slave->lock); std::unique_lock<decltype(replica->lock)> lock(replica->lock);
/* Send SELECT command to every slave if needed. */ /* Send SELECT command to every replica if needed. */
if (g_pserver->replicaseldb != dictid) { if (g_pserver->replicaseldb != dictid) {
robj *selectcmd; robj *selectcmd;
@ -258,7 +258,7 @@ void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc, bool
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves */ /* Send it to slaves */
addReply(slave,selectcmd); addReply(replica,selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd); decrRefCount(selectcmd);
@ -270,18 +270,18 @@ void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc, bool
* or are already in sync with the master. */ * or are already in sync with the master. */
/* Add the multi bulk length. */ /* Add the multi bulk length. */
addReplyArrayLen(slave,argc); addReplyArrayLen(replica,argc);
/* Finally any additional argument that was not stored inside the /* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */ * static buffer if any (from j to argc). */
for (int j = 0; j < argc; j++) for (int j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]); addReplyBulk(replica,argv[j]);
} }
/* Propagate write commands to slaves, and populate the replication backlog /* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use * as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication * the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached, * stream. Instead if the instance is a replica and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */ * we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln, *lnReply; listNode *ln, *lnReply;
@ -293,7 +293,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* If the instance is not a top level master, return ASAP: we'll just proxy /* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to * the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can * propagate *identical* replication stream. In this way this replica can
* advertise the same replication ID as the master (since it shares the * advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */ * master replication history and has the same backlog and offsets). */
if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) return; if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) return;
@ -377,34 +377,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
} }
} }
/* Write the command to every slave. */ /* Write the command to every replica. */
listRewind(slaves,&li); listRewind(slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */ /* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
std::unique_lock<decltype(slave->lock)> lock(slave->lock); std::unique_lock<decltype(replica->lock)> lock(replica->lock);
if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
{ {
slave->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
continue; continue;
} }
if (!fSendRaw) if (!fSendRaw)
addReplyProtoAsync(slave, proto, cchProto); addReplyProtoAsync(replica, proto, cchProto);
addReplyProtoAsync(slave,fake->buf,fake->bufpos); addReplyProtoAsync(replica,fake->buf,fake->bufpos);
listRewind(fake->reply, &liReply); listRewind(fake->reply, &liReply);
while ((lnReply = listNext(&liReply))) while ((lnReply = listNext(&liReply)))
{ {
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
addReplyProtoAsync(slave, reply->buf(), reply->used); addReplyProtoAsync(replica, reply->buf(), reply->used);
} }
if (!fSendRaw) if (!fSendRaw)
{ {
addReplyAsync(slave,shared.crlf); addReplyAsync(replica,shared.crlf);
addReplyProtoAsync(slave, szDbNum, cchDbNum); addReplyProtoAsync(replica, szDbNum, cchDbNum);
} }
} }
@ -432,15 +432,15 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
listRewind(slaves,&li); listRewind(slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
std::lock_guard<decltype(slave->lock)> ulock(slave->lock); std::lock_guard<decltype(replica->lock)> ulock(replica->lock);
if (FMasterHost(slave)) if (FMasterHost(replica))
continue; // Active Active case, don't feed back continue; // Active Active case, don't feed back
/* Don't feed slaves that are still waiting for BGSAVE to start */ /* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyProtoAsync(slave,buf,buflen); addReplyProtoAsync(replica,buf,buflen);
} }
if (listLength(slaves)) if (listLength(slaves))
@ -488,7 +488,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
decrRefCount(cmdobj); decrRefCount(cmdobj);
} }
/* Feed the slave 'c' with the replication backlog starting from the /* Feed the replica 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */ * specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) { long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len; long long j, skip, len;
@ -523,7 +523,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
/* Discard the amount of data to seek to the specified 'offset'. */ /* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % g_pserver->repl_backlog_size; j = (j + skip) % g_pserver->repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to /* Feed replica with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */ * split the reply in two parts if we are cross-boundary. */
len = g_pserver->repl_backlog_histlen - skip; len = g_pserver->repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
@ -541,7 +541,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
} }
/* Return the offset to provide as reply to the PSYNC command received /* Return the offset to provide as reply to the PSYNC command received
* from the slave. The returned value is only valid immediately after * from the replica. The returned value is only valid immediately after
* the BGSAVE process started and before executing any other command * the BGSAVE process started and before executing any other command
* from clients. */ * from clients. */
long long getPsyncInitialOffset(void) { long long getPsyncInitialOffset(void) {
@ -549,39 +549,39 @@ long long getPsyncInitialOffset(void) {
} }
/* Send a FULLRESYNC reply in the specific case of a full resynchronization, /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways: * as a side effect setup the replica for a full sync in different ways:
* *
* 1) Remember, into the slave client structure, the replication offset * 1) Remember, into the replica client structure, the replication offset
* we sent here, so that if new slaves will later attach to the same * we sent here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output * background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave. * buffer), we can get the right offset from this replica.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that * 2) Set the replication state of the replica to WAIT_BGSAVE_END so that
* we start accumulating differences from this point. * we start accumulating differences from this point.
* 3) Force the replication stream to re-emit a SELECT statement so * 3) Force the replication stream to re-emit a SELECT statement so
* the new slave incremental differences will start selecting the * the new replica incremental differences will start selecting the
* right database number. * right database number.
* *
* Normally this function should be called immediately after a successful * Normally this function should be called immediately after a successful
* BGSAVE for replication was started, or when there is one already in * BGSAVE for replication was started, or when there is one already in
* progress that we attached our slave to. */ * progress that we attached our replica to. */
int replicationSetupSlaveForFullResync(client *slave, long long offset) { int replicationSetupSlaveForFullResync(client *replica, long long offset) {
char buf[128]; char buf[128];
int buflen; int buflen;
slave->psync_initial_offset = offset; replica->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this /* We are going to accumulate the incremental changes for this
* slave as well. Set replicaseldb to -1 in order to force to re-emit * replica as well. Set replicaseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */ * a SELECT statement in the replication stream. */
g_pserver->replicaseldb = -1; g_pserver->replicaseldb = -1;
/* Don't send this reply to slaves that approached us with /* Don't send this reply to slaves that approached us with
* the old SYNC command. */ * the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) { if (!(replica->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
g_pserver->replid,offset); g_pserver->replid,offset);
if (write(slave->fd,buf,buflen) != buflen) { if (write(replica->fd,buf,buflen) != buflen) {
freeClientAsync(slave); freeClientAsync(replica);
return C_ERR; return C_ERR;
} }
} }
@ -600,14 +600,14 @@ int masterTryPartialResynchronization(client *c) {
char buf[128]; char buf[128];
int buflen; int buflen;
/* Parse the replication offset asked by the slave. Go to full sync /* Parse the replication offset asked by the replica. Go to full sync
* on parse error: this should never happen but we try to handle * on parse error: this should never happen but we try to handle
* it in a robust way compared to aborting. */ * it in a robust way compared to aborting. */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync; C_OK) goto need_full_resync;
/* Is the replication ID of this master the same advertised by the wannabe /* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a * replica via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue. * different replication history, and there is no way to continue.
* *
* Note that there are two potentially valid replication IDs: the ID1 * Note that there are two potentially valid replication IDs: the ID1
@ -637,7 +637,7 @@ int masterTryPartialResynchronization(client *c) {
goto need_full_resync; goto need_full_resync;
} }
/* We still have the data our slave is asking for? */ /* We still have the data our replica is asking for? */
if (!g_pserver->repl_backlog || if (!g_pserver->repl_backlog ||
psync_offset < g_pserver->repl_backlog_off || psync_offset < g_pserver->repl_backlog_off ||
psync_offset > (g_pserver->repl_backlog_off + g_pserver->repl_backlog_histlen)) psync_offset > (g_pserver->repl_backlog_off + g_pserver->repl_backlog_histlen))
@ -652,9 +652,9 @@ int masterTryPartialResynchronization(client *c) {
} }
/* If we reached this point, we are able to perform a partial resync: /* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave. * 1) Set client state to make it a replica.
* 2) Inform the client we can continue with +CONTINUE * 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */ * 3) Send the backlog data (from the offset to the end) to the replica. */
c->flags |= CLIENT_SLAVE; c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE; c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = g_pserver->unixtime; c->repl_ack_time = g_pserver->unixtime;
@ -681,7 +681,7 @@ int masterTryPartialResynchronization(client *c) {
replicationGetSlaveName(c), replicationGetSlaveName(c),
psync_len, psync_offset); psync_len, psync_offset);
/* Note that we don't need to set the selected DB at g_pserver->replicaseldb /* Note that we don't need to set the selected DB at g_pserver->replicaseldb
* to -1 to force the master to emit SELECT, since the slave already * to -1 to force the master to emit SELECT, since the replica already
* has this state from the previous connection with the master. */ * has this state from the previous connection with the master. */
refreshGoodSlavesCount(); refreshGoodSlavesCount();
@ -700,7 +700,7 @@ need_full_resync:
* the script cache is flushed before to start. * the script cache is flushed before to start.
* *
* The mincapa argument is the bitwise AND among all the slaves capabilities * The mincapa argument is the bitwise AND among all the slaves capabilities
* of the slaves waiting for this BGSAVE, so represents the slave capabilities * of the slaves waiting for this BGSAVE, so represents the replica capabilities
* all the slaves support. Can be tested via SLAVE_CAPA_* macros. * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
* *
* Side effects, other than starting a BGSAVE: * Side effects, other than starting a BGSAVE:
@ -726,7 +726,7 @@ int startBgsaveForReplication(int mincapa) {
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL, /* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */ * otherwise replica will miss repl-stream-db. */
if (rsiptr) { if (rsiptr) {
if (socket_target) if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr); retval = rdbSaveToSlavesSockets(rsiptr);
@ -744,16 +744,16 @@ int startBgsaveForReplication(int mincapa) {
serverLog(LL_WARNING,"BGSAVE for replication failed"); serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
std::unique_lock<decltype(slave->lock)> lock(slave->lock); std::unique_lock<decltype(replica->lock)> lock(replica->lock);
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE; replica->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE; replica->flags &= ~CLIENT_SLAVE;
listDelNode(g_pserver->slaves,ln); listDelNode(g_pserver->slaves,ln);
addReplyError(slave, addReplyError(replica,
"BGSAVE failed, replication can't continue"); "BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY; replica->flags |= CLIENT_CLOSE_AFTER_REPLY;
} }
} }
return retval; return retval;
@ -764,17 +764,17 @@ int startBgsaveForReplication(int mincapa) {
if (!socket_target) { if (!socket_target) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
std::unique_lock<decltype(slave->lock)> lock(slave->lock); std::unique_lock<decltype(replica->lock)> lock(replica->lock);
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave, replicationSetupSlaveForFullResync(replica,
getPsyncInitialOffset()); getPsyncInitialOffset());
} }
} }
} }
/* Flush the script cache, since we need that slave differences are /* Flush the script cache, since we need that replica differences are
* accumulated without requiring slaves to match our cached scripts. */ * accumulated without requiring slaves to match our cached scripts. */
if (retval == C_OK) replicationScriptCacheFlush(); if (retval == C_OK) replicationScriptCacheFlush();
return retval; return retval;
@ -782,10 +782,10 @@ int startBgsaveForReplication(int mincapa) {
/* SYNC and PSYNC command implemenation. */ /* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) { void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */ /* ignore SYNC if already replica or in monitor mode */
if (c->flags & CLIENT_SLAVE) return; if (c->flags & CLIENT_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master /* Refuse SYNC requests if we are a replica but the link with our master
* is not ok... */ * is not ok... */
if (!g_pserver->fActiveReplica) { if (!g_pserver->fActiveReplica) {
if (FAnyDisconnectedMasters()) { if (FAnyDisconnectedMasters()) {
@ -813,7 +813,7 @@ void syncCommand(client *c) {
* *
* +FULLRESYNC <replid> <offset> * +FULLRESYNC <replid> <offset>
* *
* So the slave knows the new replid and offset to try a PSYNC later * So the replica knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */ * if the connection with the master is lost. */
if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) { if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) { if (masterTryPartialResynchronization(c) == C_OK) {
@ -829,8 +829,8 @@ void syncCommand(client *c) {
if (master_replid[0] != '?') g_pserver->stat_sync_partial_err++; if (master_replid[0] != '?') g_pserver->stat_sync_partial_err++;
} }
} else { } else {
/* If a slave uses SYNC, we are dealing with an old implementation /* If a replica uses SYNC, we are dealing with an old implementation
* of the replication protocol (like keydb-cli --slave). Flag the client * of the replication protocol (like keydb-cli --replica). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */ * so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC; c->flags |= CLIENT_PRE_PSYNC;
} }
@ -838,8 +838,8 @@ void syncCommand(client *c) {
/* Full resynchronization. */ /* Full resynchronization. */
g_pserver->stat_sync_full++; g_pserver->stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code /* Setup the replica as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */ * paths will change the state if we handle the replica differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (g_pserver->repl_disable_tcp_nodelay) if (g_pserver->repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
@ -862,25 +862,25 @@ void syncCommand(client *c) {
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
{ {
/* Ok a background save is in progress. Let's check if it is a good /* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is * one for replication, i.e. if there is another replica that is
* registering differences since the server forked to save. */ * registering differences since the server forked to save. */
client *slave; client *replica;
listNode *ln; listNode *ln;
listIter li; listIter li;
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
slave = (client*)ln->value; replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
} }
/* To attach this slave, we check that it has at least all the /* To attach this replica, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */ * capabilities of the replica that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { if (ln && ((c->slave_capa & replica->slave_capa) == replica->slave_capa)) {
/* Perfect, the server is already registering differences for /* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */ * another replica. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave); copyClientOutputBuffer(c,replica);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); replicationSetupSlaveForFullResync(c,replica->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else { } else {
/* No way, we need to wait for the next BGSAVE in order to /* No way, we need to wait for the next BGSAVE in order to
@ -906,7 +906,7 @@ void syncCommand(client *c) {
if (g_pserver->repl_diskless_sync_delay) if (g_pserver->repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else { } else {
/* Target is disk (or the slave is not capable of supporting /* Target is disk (or the replica is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress, * diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */ * let's start one. */
if (g_pserver->aof_child_pid == -1) { if (g_pserver->aof_child_pid == -1) {
@ -948,7 +948,7 @@ LError:
} }
/* REPLCONF <option> <value> <option> <value> ... /* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication * This command is used by a replica in order to configure the replication
* process before starting it with the SYNC command. * process before starting it with the SYNC command.
* *
* Currently the only use of this command is to communicate to the master * Currently the only use of this command is to communicate to the master
@ -994,7 +994,7 @@ void replconfCommand(client *c) {
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2")) else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2; c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount /* REPLCONF ACK is used by replica to inform the master the amount
* of replication stream that it processed so far. It is an * of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */ * internal only command that normal clients should never use. */
long long offset; long long offset;
@ -1006,15 +1006,15 @@ void replconfCommand(client *c) {
c->repl_ack_off = offset; c->repl_ack_off = offset;
c->repl_ack_time = g_pserver->unixtime; c->repl_ack_time = g_pserver->unixtime;
/* If this was a diskless replication, we need to really put /* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which * the replica online when the first ACK is received (which
* confirms slave is online and ready to get more data). */ * confirms replica is online and ready to get more data). */
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c); putSlaveOnline(c);
/* Note: this command does not reply anything! */ /* Note: this command does not reply anything! */
return; return;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"getack")) { } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"getack")) {
/* REPLCONF GETACK is used in order to request an ACK ASAP /* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */ * to the replica. */
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(g_pserver->masters, &li); listRewind(g_pserver->masters, &li);
@ -1036,59 +1036,59 @@ void replconfCommand(client *c) {
addReply(c,shared.ok); addReply(c,shared.ok);
} }
/* This function puts a slave in the online state, and should be called just /* This function puts a replica in the online state, and should be called just
* after a slave received the RDB file for the initial synchronization, and * after a replica received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands. * we are finally ready to send the incremental stream of commands.
* *
* It does a few things: * It does a few things:
* *
* 1) Put the slave in ONLINE state (useless when the function is called * 1) Put the replica in ONLINE state (useless when the function is called
* because state is already ONLINE but repl_put_online_on_ack is true). * because state is already ONLINE but repl_put_online_on_ack is true).
* 2) Make sure the writable event is re-installed, since calling the SYNC * 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without * command disables it, so that we can accumulate output buffer without
* sending it to the slave. * sending it to the replica.
* 3) Update the count of good slaves. */ * 3) Update the count of good slaves. */
void putSlaveOnline(client *slave) { void putSlaveOnline(client *replica) {
slave->replstate = SLAVE_STATE_ONLINE; replica->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0; replica->repl_put_online_on_ack = 0;
slave->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */ replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
AssertCorrectThread(slave); AssertCorrectThread(replica);
if (aeCreateFileEvent(g_pserver->rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE|AE_WRITE_THREADSAFE,
sendReplyToClient, slave) == AE_ERR) { sendReplyToClient, replica) == AE_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(slave); freeClient(replica);
return; return;
} }
refreshGoodSlavesCount(); refreshGoodSlavesCount();
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave)); replicationGetSlaveName(replica));
} }
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
client *slave = (client*)privdata; client *replica = (client*)privdata;
UNUSED(el); UNUSED(el);
UNUSED(mask); UNUSED(mask);
serverAssert(ielFromEventLoop(el) == slave->iel); serverAssert(ielFromEventLoop(el) == replica->iel);
char buf[PROTO_IOBUF_LEN]; char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen; ssize_t nwritten, buflen;
/* Before sending the RDB file, we send the preamble as configured by the /* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of * replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */ * the file in the form "$<length>\r\n". */
if (slave->replpreamble) { if (replica->replpreamble) {
serverAssert(slave->replpreamble[0] == '$'); serverAssert(replica->replpreamble[0] == '$');
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); nwritten = write(fd,replica->replpreamble,sdslen(replica->replpreamble));
if (nwritten == -1) { if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s", serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
strerror(errno)); strerror(errno));
freeClient(slave); freeClient(replica);
return; return;
} }
g_pserver->stat_net_output_bytes += nwritten; g_pserver->stat_net_output_bytes += nwritten;
sdsrange(slave->replpreamble,nwritten,-1); sdsrange(replica->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) { if (sdslen(replica->replpreamble) == 0) {
sdsfree(slave->replpreamble); sdsfree(replica->replpreamble);
slave->replpreamble = NULL; replica->replpreamble = NULL;
/* fall through sending data. */ /* fall through sending data. */
} else { } else {
return; return;
@ -1096,29 +1096,29 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
/* If the preamble was already transferred, send the RDB bulk data. */ /* If the preamble was already transferred, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET); lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) { if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s", serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno)); (buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave); freeClient(replica);
return; return;
} }
if ((nwritten = write(fd,buf,buflen)) == -1) { if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) { if (errno != EAGAIN) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s", serverLog(LL_WARNING,"Write error sending DB to replica: %s",
strerror(errno)); strerror(errno));
freeClient(slave); freeClient(replica);
} }
return; return;
} }
slave->repldboff += nwritten; replica->repldboff += nwritten;
g_pserver->stat_net_output_bytes += nwritten; g_pserver->stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) { if (replica->repldboff == replica->repldbsize) {
close(slave->repldbfd); close(replica->repldbfd);
slave->repldbfd = -1; replica->repldbfd = -1;
aeDeleteFileEvent(el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(el,replica->fd,AE_WRITABLE);
putSlaveOnline(slave); putSlaveOnline(replica);
} }
} }
@ -1130,7 +1130,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
* background saving in order to perform non-blocking synchronization, and * background saving in order to perform non-blocking synchronization, and
* to schedule a new BGSAVE if there are slaves that attached while a * to schedule a new BGSAVE if there are slaves that attached while a
* BGSAVE was in progress, but it was not a good one for replication (no * BGSAVE was in progress, but it was not a good one for replication (no
* other slave was accumulating differences). * other replica was accumulating differences).
* *
* The argument bgsaveerr is C_OK if the background saving succeeded * The argument bgsaveerr is C_OK if the background saving succeeded
* otherwise C_ERR is passed to the function. * otherwise C_ERR is passed to the function.
@ -1146,69 +1146,69 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1; startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa : mincapa = (mincapa == -1) ? replica->slave_capa :
(mincapa & slave->slave_capa); (mincapa & replica->slave_capa);
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { } else if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf; struct redis_stat buf;
/* If this was an RDB on disk save, we have to prepare to send /* If this was an RDB on disk save, we have to prepare to send
* the RDB from disk to the slave socket. Otherwise if this was * the RDB from disk to the replica socket. Otherwise if this was
* already an RDB -> Slaves socket transfer, used in the case of * already an RDB -> Slaves socket transfer, used in the case of
* diskless replication, our work is trivial, we can just put * diskless replication, our work is trivial, we can just put
* the slave online. */ * the replica online. */
if (type == RDB_CHILD_TYPE_SOCKET) { if (type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE, serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from replica to enable streaming",
replicationGetSlaveName(slave)); replicationGetSlaveName(replica));
/* Note: we wait for a REPLCONF ACK message from slave in /* Note: we wait for a REPLCONF ACK message from replica in
* order to really put it online (install the write handler * order to really put it online (install the write handler
* so that the accumulated data can be transferred). However * so that the accumulated data can be transferred). However
* we change the replication state ASAP, since our slave * we change the replication state ASAP, since our replica
* is technically online now. */ * is technically online now. */
slave->replstate = SLAVE_STATE_ONLINE; replica->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1; replica->repl_put_online_on_ack = 1;
slave->repl_ack_time = g_pserver->unixtime; /* Timeout otherwise. */ replica->repl_ack_time = g_pserver->unixtime; /* Timeout otherwise. */
} else { } else {
if (bgsaveerr != C_OK) { if (bgsaveerr != C_OK) {
if (FCorrectThread(slave)) if (FCorrectThread(replica))
freeClient(slave); freeClient(replica);
else else
freeClientAsync(slave); freeClientAsync(replica);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue; continue;
} }
if ((slave->repldbfd = open(g_pserver->rdb_filename,O_RDONLY)) == -1 || if ((replica->repldbfd = open(g_pserver->rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) { redis_fstat(replica->repldbfd,&buf) == -1) {
if (FCorrectThread(slave)) if (FCorrectThread(replica))
freeClient(slave); freeClient(replica);
else else
freeClientAsync(slave); freeClientAsync(replica);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue; continue;
} }
slave->repldboff = 0; replica->repldboff = 0;
slave->repldbsize = buf.st_size; replica->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK; replica->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", replica->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize); (unsigned long long) replica->repldbsize);
if (FCorrectThread(slave)) if (FCorrectThread(replica))
{ {
aeDeleteFileEvent(g_pserver->rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
if (aeCreateFileEvent(g_pserver->rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
freeClient(slave); freeClient(replica);
} }
} }
else else
{ {
aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [slave]{ aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{
aeDeleteFileEvent(g_pserver->rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
if (aeCreateFileEvent(g_pserver->rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
freeClient(slave); freeClient(replica);
} }
}); });
} }
@ -1258,17 +1258,17 @@ void clearReplicationId2(void) {
/* Use the current replication ID / offset as secondary replication /* Use the current replication ID / offset as secondary replication
* ID, and change the current one in order to start a new history. * ID, and change the current one in order to start a new history.
* This should be used when an instance is switched from slave to master * This should be used when an instance is switched from replica to master
* so that it can serve PSYNC requests performed using the master * so that it can serve PSYNC requests performed using the master
* replication ID. */ * replication ID. */
void shiftReplicationId(void) { void shiftReplicationId(void) {
memcpy(g_pserver->replid2,g_pserver->replid,sizeof(g_pserver->replid)); memcpy(g_pserver->replid2,g_pserver->replid,sizeof(g_pserver->replid));
/* We set the second replid offset to the master offset + 1, since /* We set the second replid offset to the master offset + 1, since
* the slave will ask for the first byte it has not yet received, so * the replica will ask for the first byte it has not yet received, so
* we need to add one to the offset: for example if, as a slave, we are * we need to add one to the offset: for example if, as a replica, we are
* sure we have the same history as the master for 50 bytes, after we * sure we have the same history as the master for 50 bytes, after we
* are turned into a master, we can accept a PSYNC request with offset * are turned into a master, we can accept a PSYNC request with offset
* 51, since the slave asking has the same history up to the 50th * 51, since the replica asking has the same history up to the 50th
* byte, and is asking for the new bytes starting at offset 51. */ * byte, and is asking for the new bytes starting at offset 51. */
g_pserver->second_replid_offset = g_pserver->master_repl_offset+1; g_pserver->second_replid_offset = g_pserver->master_repl_offset+1;
changeReplicationId(); changeReplicationId();
@ -1284,7 +1284,7 @@ int slaveIsInHandshakeState(redisMaster *mi) {
mi->repl_state <= REPL_STATE_RECEIVE_PSYNC; mi->repl_state <= REPL_STATE_RECEIVE_PSYNC;
} }
/* Avoid the master to detect the slave is timing out while loading the /* Avoid the master to detect the replica is timing out while loading the
* RDB file in initial synchronization. We send a single newline character * RDB file in initial synchronization. We send a single newline character
* that is valid protocol but is guaranteed to either be sent entirely or * that is valid protocol but is guaranteed to either be sent entirely or
* not, since the byte is indivisible. * not, since the byte is indivisible.
@ -1561,7 +1561,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (aof_is_enabled) restartAOFAfterSYNC(); if (aof_is_enabled) restartAOFAfterSYNC();
return; return;
} }
/* Final setup of the connected slave <- master link */ /* Final setup of the connected replica <- master link */
if (fUpdate) if (fUpdate)
unlink(mi->repl_transfer_tmpfile); // if we're not updating this became the backup RDB unlink(mi->repl_transfer_tmpfile); // if we're not updating this became the backup RDB
zfree(mi->repl_transfer_tmpfile); zfree(mi->repl_transfer_tmpfile);
@ -2006,8 +2006,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
// fallthrough // fallthrough
} }
/* Set the slave port, so that Master's INFO command can list the /* Set the replica port, so that Master's INFO command can list the
* slave listening port correctly. */ * replica listening port correctly. */
if (mi->repl_state == REPL_STATE_SEND_PORT) { if (mi->repl_state == REPL_STATE_SEND_PORT) {
sds port = sdsfromlonglong(g_pserver->slave_announce_port ? sds port = sdsfromlonglong(g_pserver->slave_announce_port ?
g_pserver->slave_announce_port : g_pserver->port); g_pserver->slave_announce_port : g_pserver->port);
@ -2033,15 +2033,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
mi->repl_state = REPL_STATE_SEND_IP; mi->repl_state = REPL_STATE_SEND_IP;
} }
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ /* Skip REPLCONF ip-address if there is no replica-announce-ip option set. */
if (mi->repl_state == REPL_STATE_SEND_IP && if (mi->repl_state == REPL_STATE_SEND_IP &&
g_pserver->slave_announce_ip == NULL) g_pserver->slave_announce_ip == NULL)
{ {
mi->repl_state = REPL_STATE_SEND_CAPA; mi->repl_state = REPL_STATE_SEND_CAPA;
} }
/* Set the slave ip, so that Master's INFO command can list the /* Set the replica ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */ * replica IP address port correctly in case of port forwarding or NAT. */
if (mi->repl_state == REPL_STATE_SEND_IP) { if (mi->repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",g_pserver->slave_announce_ip, NULL); "ip-address",g_pserver->slave_announce_ip, NULL);
@ -2064,7 +2064,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
mi->repl_state = REPL_STATE_SEND_CAPA; mi->repl_state = REPL_STATE_SEND_CAPA;
} }
/* Inform the master of our (slave) capabilities. /* Inform the master of our (replica) capabilities.
* *
* EOF: supports EOF-style RDB transfer for diskless replication. * EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
@ -2322,7 +2322,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
else else
freeClientAsync(mi->master); freeClientAsync(mi->master);
} }
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ disconnectAllBlockedClients(); /* Clients blocked in master, now replica. */
/* Force our slaves to resync with us as well. They may hopefully be able /* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */ * to partially resync with us, but we can notify the replid change. */
@ -2349,7 +2349,7 @@ void replicationUnsetMaster(redisMaster *mi) {
sdsfree(mi->masterhost); sdsfree(mi->masterhost);
mi->masterhost = NULL; mi->masterhost = NULL;
/* When a slave is turned into a master, the current replication ID /* When a replica is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is * (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication * used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history. */ * ID is created to continue with a new replication history. */
@ -2376,7 +2376,7 @@ void replicationUnsetMaster(redisMaster *mi) {
* master switch. */ * master switch. */
g_pserver->replicaseldb = -1; g_pserver->replicaseldb = -1;
/* Once we turn from slave to master, we consider the starting time without /* Once we turn from replica to master, we consider the starting time without
* slaves (that is used to count the replication backlog time to live) as * slaves (that is used to count the replication backlog time to live) as
* starting from now. Otherwise the backlog will be freed after a * starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */ * failover if slaves do not connect immediately. */
@ -2388,7 +2388,7 @@ void replicationUnsetMaster(redisMaster *mi) {
freeMasterInfo(mi); freeMasterInfo(mi);
} }
/* This function is called when the slave lose the connection with the /* This function is called when the replica lose the connection with the
* master into an unexpected way. */ * master into an unexpected way. */
void replicationHandleMasterDisconnection(redisMaster *mi) { void replicationHandleMasterDisconnection(redisMaster *mi) {
if (mi != nullptr) if (mi != nullptr)
@ -2460,7 +2460,7 @@ void replicaofCommand(client *c) {
} }
/* ROLE command: provide information about the role of the instance /* ROLE command: provide information about the role of the instance
* (master or slave) and additional information related to replication * (master or replica) and additional information related to replication
* in an easy to process format. */ * in an easy to process format. */
void roleCommand(client *c) { void roleCommand(client *c) {
if (listLength(g_pserver->masters) == 0) { if (listLength(g_pserver->masters) == 0) {
@ -2475,19 +2475,19 @@ void roleCommand(client *c) {
mbcount = addReplyDeferredLen(c); mbcount = addReplyDeferredLen(c);
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip; char ip[NET_IP_STR_LEN], *slaveip = replica->slave_ip;
if (slaveip[0] == '\0') { if (slaveip[0] == '\0') {
if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) if (anetPeerToString(replica->fd,ip,sizeof(ip),NULL) == -1)
continue; continue;
slaveip = ip; slaveip = ip;
} }
if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (replica->replstate != SLAVE_STATE_ONLINE) continue;
addReplyArrayLen(c,3); addReplyArrayLen(c,3);
addReplyBulkCString(c,slaveip); addReplyBulkCString(c,slaveip);
addReplyBulkLongLong(c,slave->slave_listening_port); addReplyBulkLongLong(c,replica->slave_listening_port);
addReplyBulkLongLong(c,slave->repl_ack_off+slave->reploff_skipped); addReplyBulkLongLong(c,replica->repl_ack_off+replica->reploff_skipped);
slaves++; slaves++;
} }
setDeferredArrayLen(c,mbcount,slaves); setDeferredArrayLen(c,mbcount,slaves);
@ -2602,9 +2602,9 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
replicationHandleMasterDisconnection(mi); replicationHandleMasterDisconnection(mi);
} }
/* This function is called when a master is turend into a slave, in order to /* This function is called when a master is turend into a replica, in order to
* create from scratch a cached master for the new client, that will allow * create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a * to PSYNC with the replica that was promoted as the new master after a
* failover. * failover.
* *
* Assuming this instance was previously the master instance of the new master, * Assuming this instance was previously the master instance of the new master,
@ -2696,10 +2696,10 @@ void refreshGoodSlavesCount(void) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
time_t lag = g_pserver->unixtime - slave->repl_ack_time; time_t lag = g_pserver->unixtime - replica->repl_ack_time;
if (slave->replstate == SLAVE_STATE_ONLINE && if (replica->replstate == SLAVE_STATE_ONLINE &&
lag <= g_pserver->repl_min_slaves_max_lag) good++; lag <= g_pserver->repl_min_slaves_max_lag) good++;
} }
g_pserver->repl_good_slaves_count = good; g_pserver->repl_good_slaves_count = good;
@ -2707,14 +2707,14 @@ void refreshGoodSlavesCount(void) {
/* ----------------------- REPLICATION SCRIPT CACHE -------------------------- /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
* The goal of this code is to keep track of scripts already sent to every * The goal of this code is to keep track of scripts already sent to every
* connected slave, in order to be able to replicate EVALSHA as it is without * connected replica, in order to be able to replicate EVALSHA as it is without
* translating it to EVAL every time it is possible. * translating it to EVAL every time it is possible.
* *
* We use a capped collection implemented by a hash table for fast lookup * We use a capped collection implemented by a hash table for fast lookup
* of scripts we can send as EVALSHA, plus a linked list that is used for * of scripts we can send as EVALSHA, plus a linked list that is used for
* eviction of the oldest entry when the max number of items is reached. * eviction of the oldest entry when the max number of items is reached.
* *
* We don't care about taking a different cache for every different slave * We don't care about taking a different cache for every different replica
* since to fill the cache again is not very costly, the goal of this code * since to fill the cache again is not very costly, the goal of this code
* is to avoid that the same big script is trasmitted a big number of times * is to avoid that the same big script is trasmitted a big number of times
* per second wasting bandwidth and processor speed, but it is not a problem * per second wasting bandwidth and processor speed, but it is not a problem
@ -2723,15 +2723,15 @@ void refreshGoodSlavesCount(void) {
* *
* This is how the system works: * This is how the system works:
* *
* 1) Every time a new slave connects, we flush the whole script cache. * 1) Every time a new replica connects, we flush the whole script cache.
* 2) We only send as EVALSHA what was sent to the master as EVALSHA, without * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
* trying to convert EVAL into EVALSHA specifically for slaves. * trying to convert EVAL into EVALSHA specifically for slaves.
* 3) Every time we trasmit a script as EVAL to the slaves, we also add the * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
* corresponding SHA1 of the script into the cache as we are sure every * corresponding SHA1 of the script into the cache as we are sure every
* slave knows about the script starting from now. * replica knows about the script starting from now.
* 4) On SCRIPT FLUSH command, we replicate the command to all the slaves * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
* and at the same time flush the script cache. * and at the same time flush the script cache.
* 5) When the last slave disconnects, flush the cache. * 5) When the last replica disconnects, flush the cache.
* 6) We handle SCRIPT LOAD as well since that's how scripts are loaded * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
* in the master sometimes. * in the master sometimes.
*/ */
@ -2744,11 +2744,11 @@ void replicationScriptCacheInit(void) {
} }
/* Empty the script cache. Should be called every time we are no longer sure /* Empty the script cache. Should be called every time we are no longer sure
* that every slave knows about all the scripts in our set, or when the * that every replica knows about all the scripts in our set, or when the
* current AOF "context" is no longer aware of the script. In general we * current AOF "context" is no longer aware of the script. In general we
* should flush the cache: * should flush the cache:
* *
* 1) Every time a new slave reconnects to this master and performs a * 1) Every time a new replica reconnects to this master and performs a
* full SYNC (PSYNC does not require flushing). * full SYNC (PSYNC does not require flushing).
* 2) Every time an AOF rewrite is performed. * 2) Every time an AOF rewrite is performed.
* 3) Every time we are left without slaves at all, and AOF is off, in order * 3) Every time we are left without slaves at all, and AOF is off, in order
@ -2832,10 +2832,10 @@ int replicationCountAcksByOffset(long long offset) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (replica->replstate != SLAVE_STATE_ONLINE) continue;
if ((slave->repl_ack_off + slave->reploff_skipped) >= offset) count++; if ((replica->repl_ack_off + replica->reploff_skipped) >= offset) count++;
} }
return count; return count;
} }
@ -2925,7 +2925,7 @@ void processClientsWaitingReplicas(void) {
} }
} }
/* Return the slave replication offset for this instance, that is /* Return the replica replication offset for this instance, that is
* the offset for which we already processed the master replication stream. */ * the offset for which we already processed the master replication stream. */
long long replicationGetSlaveOffset(redisMaster *mi) { long long replicationGetSlaveOffset(redisMaster *mi) {
long long offset = 0; long long offset = 0;
@ -2980,7 +2980,7 @@ void replicationCron(void) {
cancelReplicationHandshake(mi); cancelReplicationHandshake(mi);
} }
/* Timed out master when we are an already connected slave? */ /* Timed out master when we are an already connected replica? */
if (mi->masterhost && mi->repl_state == REPL_STATE_CONNECTED && if (mi->masterhost && mi->repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout) (time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout)
{ {
@ -3022,7 +3022,7 @@ void replicationCron(void) {
{ {
/* Note that we don't send the PING if the clients are paused during /* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise * a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer * alter the replication offsets of master and replica, and will no longer
* match the one stored into 'mf_master_offset' state. */ * match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress = int manual_failover_in_progress =
g_pserver->cluster_enabled && g_pserver->cluster_enabled &&
@ -3047,21 +3047,21 @@ void replicationCron(void) {
* to avoid altering the replication offsets. This special out of band * to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset. * pings (newlines) can be sent, they will have no effect in the offset.
* *
* The newline will be ignored by the slave but will refresh the * The newline will be ignored by the replica but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the * last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain * ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */ * timeouts are set at a few seconds (example: PSYNC response). */
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
int is_presync = int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
g_pserver->rdb_child_type != RDB_CHILD_TYPE_SOCKET)); g_pserver->rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) { if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) { if (write(replica->fd, "\n", 1) == -1) {
/* Don't worry about socket errors, it's just a ping. */ /* Don't worry about socket errors, it's just a ping. */
} }
} }
@ -3074,18 +3074,18 @@ void replicationCron(void) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (replica->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue; if (replica->flags & CLIENT_PRE_PSYNC) continue;
if ((g_pserver->unixtime - slave->repl_ack_time) > g_pserver->repl_timeout) if ((g_pserver->unixtime - replica->repl_ack_time) > g_pserver->repl_timeout)
{ {
serverLog(LL_WARNING, "Disconnecting timedout replica: %s", serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave)); replicationGetSlaveName(replica));
if (FCorrectThread(slave)) if (FCorrectThread(replica))
freeClient(slave); freeClient(replica);
else else
freeClientAsync(slave); freeClientAsync(replica);
} }
} }
} }
@ -3109,11 +3109,11 @@ void replicationCron(void) {
* ID, leading to the following problem: * ID, leading to the following problem:
* *
* 1. We are a master instance. * 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will * 2. Our replica is promoted to master. It's repl-id-2 will
* be the same as our repl-id. * be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not * 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset. * increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new * 4. Later we are turned into a replica, connect to the new
* master that will accept our PSYNC request by second * master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency * replication ID, but there will be data inconsistency
* because we received writes. */ * because we received writes. */
@ -3152,13 +3152,13 @@ void replicationCron(void) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)ln->value; client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = g_pserver->unixtime - slave->lastinteraction; idle = g_pserver->unixtime - replica->lastinteraction;
if (idle > max_idle) max_idle = idle; if (idle > max_idle) max_idle = idle;
slaves_waiting++; slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa : mincapa = (mincapa == -1) ? replica->slave_capa :
(mincapa & slave->slave_capa); (mincapa & replica->slave_capa);
} }
} }

View File

@ -1479,7 +1479,7 @@ void evalGenericCommand(client *c, int evalsha) {
* To do so we use a cache of SHA1s of scripts that we already propagated * To do so we use a cache of SHA1s of scripts that we already propagated
* as full EVAL, that's called the Replication Script Cache. * as full EVAL, that's called the Replication Script Cache.
* *
* For repliation, everytime a new slave attaches to the master, we need to * For repliation, everytime a new replica attaches to the master, we need to
* flush our cache of scripts that can be replicated as EVALSHA, while * flush our cache of scripts that can be replicated as EVALSHA, while
* for AOF we need to do so every time we rewrite the AOF file. */ * for AOF we need to do so every time we rewrite the AOF file. */
if (evalsha && !g_pserver->lua_replicate_commands) { if (evalsha && !g_pserver->lua_replicate_commands) {

View File

@ -154,7 +154,7 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */
* *
* ok-loading: Allow the command while loading the database. * ok-loading: Allow the command while loading the database.
* *
* ok-stale: Allow the command while a slave has stale data but is not * ok-stale: Allow the command while a replica has stale data but is not
* allowed to serve this data. Normally no command is accepted * allowed to serve this data. Normally no command is accepted
* in this condition but just a few. * in this condition but just a few.
* *
@ -2511,7 +2511,7 @@ void initServerConfig(void) {
/* By default we want scripts to be always replicated by effects /* By default we want scripts to be always replicated by effects
* (single commands executed by the script), and not by sending the * (single commands executed by the script), and not by sending the
* script to the slave / AOF. This is the new way starting from * script to the replica / AOF. This is the new way starting from
* Redis 5. However it is possible to revert it via redis.conf. */ * Redis 5. However it is possible to revert it via redis.conf. */
g_pserver->lua_always_replicate_commands = 1; g_pserver->lua_always_replicate_commands = 1;
@ -3588,8 +3588,8 @@ int processCommand(client *c, int callFlags) {
* propagation of DELs due to eviction. */ * propagation of DELs due to eviction. */
if (g_pserver->maxmemory && !g_pserver->lua_timedout) { if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result /* freeMemoryIfNeeded may flush replica output buffers. This may result
* into a slave, that may be the active client, to be freed. */ * into a replica, that may be the active client, to be freed. */
if (serverTL->current_client == NULL) return C_ERR; if (serverTL->current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client /* It was impossible to free enough memory, and the command the client
@ -3636,7 +3636,7 @@ int processCommand(client *c, int callFlags) {
return C_OK; return C_OK;
} }
/* Don't accept write commands if this is a read only slave. But /* Don't accept write commands if this is a read only replica. But
* accept write commands if this is our master. */ * accept write commands if this is our master. */
if (listLength(g_pserver->masters) && g_pserver->repl_slave_ro && if (listLength(g_pserver->masters) && g_pserver->repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER) &&
@ -3659,7 +3659,7 @@ int processCommand(client *c, int callFlags) {
} }
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken * when replica-serve-stale-data is no and we are a replica with a broken
* link with master. */ * link with master. */
if (FBrokenLinkToMaster() && if (FBrokenLinkToMaster() &&
g_pserver->repl_serve_stale_data == 0 && g_pserver->repl_serve_stale_data == 0 &&
@ -3792,7 +3792,7 @@ int prepareForShutdown(int flags) {
unlink(cserver.pidfile); unlink(cserver.pidfile);
} }
/* Best effort flush of slave output buffers, so that we hopefully /* Best effort flush of replica output buffers, so that we hopefully
* send them pending writes. */ * send them pending writes. */
flushSlavesOutputBuffers(); flushSlavesOutputBuffers();
@ -4470,18 +4470,18 @@ sds genRedisInfoString(const char *section) {
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln); client *replica = (client*)listNodeValue(ln);
const char *state = NULL; const char *state = NULL;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip; char ip[NET_IP_STR_LEN], *slaveip = replica->slave_ip;
int port; int port;
long lag = 0; long lag = 0;
if (slaveip[0] == '\0') { if (slaveip[0] == '\0') {
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) if (anetPeerToString(replica->fd,ip,sizeof(ip),&port) == -1)
continue; continue;
slaveip = ip; slaveip = ip;
} }
switch(slave->replstate) { switch(replica->replstate) {
case SLAVE_STATE_WAIT_BGSAVE_START: case SLAVE_STATE_WAIT_BGSAVE_START:
case SLAVE_STATE_WAIT_BGSAVE_END: case SLAVE_STATE_WAIT_BGSAVE_END:
state = "wait_bgsave"; state = "wait_bgsave";
@ -4494,14 +4494,14 @@ sds genRedisInfoString(const char *section) {
break; break;
} }
if (state == NULL) continue; if (state == NULL) continue;
if (slave->replstate == SLAVE_STATE_ONLINE) if (replica->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - slave->repl_ack_time; lag = time(NULL) - replica->repl_ack_time;
info = sdscatprintf(info, info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s," "slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld\r\n", "offset=%lld,lag=%ld\r\n",
slaveid,slaveip,slave->slave_listening_port,state, slaveid,slaveip,replica->slave_listening_port,state,
(slave->repl_ack_off + slave->reploff_skipped), lag); (replica->repl_ack_off + replica->reploff_skipped), lag);
slaveid++; slaveid++;
} }
} }
@ -4609,7 +4609,7 @@ void infoCommand(client *c) {
} }
void monitorCommand(client *c) { void monitorCommand(client *c) {
/* ignore MONITOR if already slave or in monitor mode */ /* ignore MONITOR if already replica or in monitor mode */
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_SLAVE) return; if (c->flags & CLIENT_SLAVE) return;
@ -4836,7 +4836,7 @@ void loadDataFromDisk(void) {
while ((ln = listNext(&li))) while ((ln = listNext(&li)))
{ {
redisMaster *mi = (redisMaster*)listNodeValue(ln); redisMaster *mi = (redisMaster*)listNodeValue(ln);
/* If we are a slave, create a cached master from this /* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations * information, in order to allow partial resynchronizations
* with masters. */ * with masters. */
replicationCacheMasterUsingMyself(mi); replicationCacheMasterUsingMyself(mi);

View File

@ -331,7 +331,7 @@ public:
/* Client flags */ /* Client flags */
#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */ #define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
#define CLIENT_MASTER (1<<1) /* This client is a master */ #define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */ #define CLIENT_MONITOR (1<<2) /* This client is a replica monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */ #define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */ #define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */ #define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
@ -395,7 +395,7 @@ public:
#define CLIENT_TYPE_MASTER 3 /* Master. */ #define CLIENT_TYPE_MASTER 3 /* Master. */
#define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output #define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output
buffer configuration. Just the first buffer configuration. Just the first
three: normal, slave, pubsub. */ three: normal, replica, pubsub. */
/* Slave replication state. Used in g_pserver->repl_state for slaves to remember /* Slave replication state. Used in g_pserver->repl_state for slaves to remember
* what to do next. */ * what to do next. */
@ -421,12 +421,12 @@ public:
#define REPL_STATE_CONNECTED 17 /* Connected to master */ #define REPL_STATE_CONNECTED 17 /* Connected to master */
/* State of slaves from the POV of the master. Used in client->replstate. /* State of slaves from the POV of the master. Used in client->replstate.
* In SEND_BULK and ONLINE state the slave receives new updates * In SEND_BULK and ONLINE state the replica receives new updates
* in its output queue. In the WAIT_BGSAVE states instead the server is waiting * in its output queue. In the WAIT_BGSAVE states instead the server is waiting
* to start the next background saving in order to send updates to it. */ * to start the next background saving in order to send updates to it. */
#define SLAVE_STATE_WAIT_BGSAVE_START 6 /* We need to produce a new RDB file. */ #define SLAVE_STATE_WAIT_BGSAVE_START 6 /* We need to produce a new RDB file. */
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */ #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to slave. */ #define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
/* Slave capabilities. */ /* Slave capabilities. */
@ -434,7 +434,7 @@ public:
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Synchronous read timeout - slave side */ /* Synchronous read timeout - replica side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5 #define CONFIG_REPL_SYNCIO_TIMEOUT 5
/* List related stuff */ /* List related stuff */
@ -543,7 +543,7 @@ public:
/* RDB active child save type. */ /* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0 #define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ #define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
#define RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */ #define RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to replica socket. */
/* Keyspace changes notification classes. Every class is associated with a /* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes. */ * character for configuration purposes. */
@ -1192,8 +1192,8 @@ typedef struct client {
int casyncOpsPending; int casyncOpsPending;
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
int authenticated; /* Needed when the default user requires auth. */ int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */ int replstate; /* Replication state if this is a replica. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repl_put_online_on_ack; /* Install replica write handler on ACK. */
int repldbfd; /* Replication DB file descriptor. */ int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */ off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */ off_t repldbsize; /* Replication DB file size. */
@ -1201,10 +1201,10 @@ typedef struct client {
long long read_reploff; /* Read replication offset if this is a master. */ long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */
long long reploff_skipped; /* Repl backlog we did not send to this client */ long long reploff_skipped; /* Repl backlog we did not send to this client */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_off; /* Replication ack offset, if this is a replica. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a replica. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer copying this replica output buffer
should use. */ should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */
@ -1445,7 +1445,7 @@ struct redisMaster {
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */ long long master_initial_offset; /* Master PSYNC offset. */
int repl_state; /* Replication status if the instance is a slave */ int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
@ -1655,7 +1655,7 @@ struct redisServer {
int lastbgsave_status; /* C_OK or C_ERR */ int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ int rdb_pipe_read_result_from_child; /* of each replica in diskless SYNC. */
/* Pipe and data structures for child -> parent info sharing. */ /* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct { struct {
@ -1676,7 +1676,7 @@ struct redisServer {
long long master_repl_offset; /* My current replication offset */ long long master_repl_offset; /* My current replication offset */
long long second_replid_offset; /* Accept offsets up to this for replid2. */ long long second_replid_offset; /* Accept offsets up to this for replid2. */
int replicaseldb; /* Last SELECTed DB in replication output */ int replicaseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */ int repl_ping_slave_period; /* Master pings the replica every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */ char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */ long long repl_backlog_histlen; /* Backlog actual data length */
@ -1693,7 +1693,7 @@ struct redisServer {
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */ /* Replication (replica) */
list *masters; list *masters;
int enable_multimaster; int enable_multimaster;
int repl_timeout; /* Timeout after N seconds of master idle */ int repl_timeout; /* Timeout after N seconds of master idle */
@ -1764,7 +1764,7 @@ struct redisServer {
int cluster_slave_validity_factor; /* Slave max data age for failover. */ int cluster_slave_validity_factor; /* Slave max data age for failover. */
int cluster_require_full_coverage; /* If true, put the cluster down if int cluster_require_full_coverage; /* If true, put the cluster down if
there is at least an uncovered slot.*/ there is at least an uncovered slot.*/
int cluster_slave_no_failover; /* Prevent slave from starting a failover int cluster_slave_no_failover; /* Prevent replica from starting a failover
if the master is in failure state. */ if the master is in failure state. */
char *cluster_announce_ip; /* IP address to announce on cluster bus. */ char *cluster_announce_ip; /* IP address to announce on cluster bus. */
int cluster_announce_port; /* base port to announce on cluster bus. */ int cluster_announce_port; /* base port to announce on cluster bus. */
@ -2187,7 +2187,7 @@ void replicationSendNewlineToMaster(struct redisMaster *mi);
long long replicationGetSlaveOffset(struct redisMaster *mi); long long replicationGetSlaveOffset(struct redisMaster *mi);
char *replicationGetSlaveName(client *c); char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void); long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset); int replicationSetupSlaveForFullResync(client *replica, long long offset);
void changeReplicationId(void); void changeReplicationId(void);
void clearReplicationId2(void); void clearReplicationId2(void);
void mergeReplicationId(const char *); void mergeReplicationId(const char *);

View File

@ -33,7 +33,7 @@
/* ----------------- Blocking sockets I/O with timeouts --------------------- */ /* ----------------- Blocking sockets I/O with timeouts --------------------- */
/* Redis performs most of the I/O in a nonblocking way, with the exception /* Redis performs most of the I/O in a nonblocking way, with the exception
* of the SYNC command where the slave does it in a blocking way, and * of the SYNC command where the replica does it in a blocking way, and
* the MIGRATE command that must be blocking in order to be atomic from the * the MIGRATE command that must be blocking in order to be atomic from the
* point of view of the two instances (one migrating the key and one receiving * point of view of the two instances (one migrating the key and one receiving
* the key). This is why need the following blocking I/O functions. * the key). This is why need the following blocking I/O functions.

View File

@ -803,7 +803,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* RETRYCOUNT <count> FORCE JUSTID LASTID <id>. * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
* *
* Note that JUSTID is useful in order to avoid that XCLAIM will do * Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */ * useless work in the replica side, trying to fetch the stream item. */
robj *argv[14]; robj *argv[14];
argv[0] = createStringObject("XCLAIM",6); argv[0] = createStringObject("XCLAIM",6);
argv[1] = key; argv[1] = key;