Remove the meaningful offset feature.
After a closer look, the Redis core devleopers all believe that this was too fragile, caused many bugs that we didn't expect and that were very hard to track. Better to find an alternative solution that is simpler.
This commit is contained in:
parent
325409a011
commit
22472fe5a1
@ -1037,24 +1037,13 @@ static void freeClientArgv(client *c) {
|
||||
|
||||
/* Close all the slaves connections. This is useful in chained replication
|
||||
* when we resync with our own master and want to force all our slaves to
|
||||
* resync with us as well.
|
||||
*
|
||||
* If 'async' is non-zero we free the clients asynchronously. This is needed
|
||||
* when we call this function from a context where in the chain of the
|
||||
* callers somebody is iterating the list of clients. For instance when
|
||||
* CLIENT KILL TYPE master is called, caching the master client may
|
||||
* adjust the meaningful offset of replication, and in turn call
|
||||
* discionectSlaves(). Since CLIENT KILL iterates the clients this is
|
||||
* not safe. */
|
||||
void disconnectSlaves(int async) {
|
||||
* resync with us as well. */
|
||||
void disconnectSlaves(void) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
listNode *ln = listFirst(server.slaves);
|
||||
if (async)
|
||||
freeClientAsync((client*)ln->value);
|
||||
else
|
||||
freeClient((client*)ln->value);
|
||||
}
|
||||
}
|
||||
@ -1769,7 +1758,6 @@ int processMultibulkBuffer(client *c) {
|
||||
* 2. In the case of master clients, the replication offset is updated.
|
||||
* 3. Propagate commands we got from our master to replicas down the line. */
|
||||
void commandProcessed(client *c) {
|
||||
int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
|
||||
long long prev_offset = c->reploff;
|
||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||
/* Update the applied replication offset of our master. */
|
||||
@ -1794,16 +1782,11 @@ void commandProcessed(client *c) {
|
||||
* sub-replicas and to the replication backlog. */
|
||||
if (c->flags & CLIENT_MASTER) {
|
||||
long long applied = c->reploff - prev_offset;
|
||||
long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset;
|
||||
if (applied) {
|
||||
replicationFeedSlavesFromMasterStream(server.slaves,
|
||||
c->pending_querybuf, applied);
|
||||
sdsrange(c->pending_querybuf,applied,-1);
|
||||
}
|
||||
/* The server.master_repl_meaningful_offset variable represents
|
||||
* the offset of the replication stream without the pending PINGs. */
|
||||
if (cmd_is_ping)
|
||||
server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,6 @@
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
|
||||
long long adjustMeaningfulReplOffset(int *adjusted);
|
||||
void replicationDiscardCachedMaster(void);
|
||||
void replicationResurrectCachedMaster(connection *conn);
|
||||
void replicationSendAck(void);
|
||||
@ -163,7 +162,6 @@ void feedReplicationBacklog(void *ptr, size_t len) {
|
||||
unsigned char *p = ptr;
|
||||
|
||||
server.master_repl_offset += len;
|
||||
server.master_repl_meaningful_offset = server.master_repl_offset;
|
||||
|
||||
/* This is a circular buffer, so write as much data we can at every
|
||||
* iteration and rewind the "idx" index if we reach the limit. */
|
||||
@ -1831,7 +1829,6 @@ void readSyncBulkPayload(connection *conn) {
|
||||
* we are starting a new history. */
|
||||
memcpy(server.replid,server.master->replid,sizeof(server.replid));
|
||||
server.master_repl_offset = server.master->reploff;
|
||||
server.master_repl_meaningful_offset = server.master->reploff;
|
||||
clearReplicationId2();
|
||||
|
||||
/* Let's create the replication backlog if needed. Slaves need to
|
||||
@ -2086,7 +2083,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
|
||||
memcpy(server.cached_master->replid,new,sizeof(server.replid));
|
||||
|
||||
/* Disconnect all the sub-slaves: they need to be notified. */
|
||||
disconnectSlaves(0);
|
||||
disconnectSlaves();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2359,7 +2356,7 @@ void syncWithMaster(connection *conn) {
|
||||
* as well, if we have any sub-slaves. The master may transfer us an
|
||||
* entirely different data set and we have no way to incrementally feed
|
||||
* our slaves after that. */
|
||||
disconnectSlaves(0); /* Force our slaves to resync with us as well. */
|
||||
disconnectSlaves(); /* Force our slaves to resync with us as well. */
|
||||
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
|
||||
|
||||
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
|
||||
@ -2506,7 +2503,7 @@ void replicationSetMaster(char *ip, int port) {
|
||||
|
||||
/* Force our slaves to resync with us as well. They may hopefully be able
|
||||
* to partially resync with us, but we can notify the replid change. */
|
||||
disconnectSlaves(0);
|
||||
disconnectSlaves();
|
||||
cancelReplicationHandshake();
|
||||
/* Before destroying our master state, create a cached master using
|
||||
* our own parameters, to later PSYNC with the new master. */
|
||||
@ -2557,7 +2554,7 @@ void replicationUnsetMaster(void) {
|
||||
* of the replication ID change (see shiftReplicationId() call). However
|
||||
* the slaves will be able to partially resync with us, so it will be
|
||||
* a very fast reconnection. */
|
||||
disconnectSlaves(0);
|
||||
disconnectSlaves();
|
||||
server.repl_state = REPL_STATE_NONE;
|
||||
|
||||
/* We need to make sure the new master will start the replication stream
|
||||
@ -2760,10 +2757,7 @@ void replicationCacheMaster(client *c) {
|
||||
sdsclear(server.master->querybuf);
|
||||
sdsclear(server.master->pending_querybuf);
|
||||
|
||||
/* Adjust reploff and read_reploff to the last meaningful offset we
|
||||
* executed. This is the offset the replica will use for future PSYNC. */
|
||||
int offset_adjusted;
|
||||
server.master->reploff = adjustMeaningfulReplOffset(&offset_adjusted);
|
||||
server.master->reploff = server.master_repl_offset;
|
||||
server.master->read_reploff = server.master->reploff;
|
||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||
listEmpty(c->reply);
|
||||
@ -2786,53 +2780,6 @@ void replicationCacheMaster(client *c) {
|
||||
* so make sure to adjust the replication state. This function will
|
||||
* also set server.master to NULL. */
|
||||
replicationHandleMasterDisconnection();
|
||||
|
||||
/* If we trimmed this replica backlog, we need to disconnect our chained
|
||||
* replicas (if any), otherwise they may have the PINGs we removed
|
||||
* from the stream and their offset would no longer match: upon
|
||||
* disconnection they will also trim the final PINGs and will be able
|
||||
* to incrementally sync without issues. */
|
||||
if (offset_adjusted) disconnectSlaves(1);
|
||||
}
|
||||
|
||||
/* If the "meaningful" offset, that is the offset without the final PINGs
|
||||
* in the stream, is different than the last offset, use it instead:
|
||||
* often when the master is no longer reachable, replicas will never
|
||||
* receive the PINGs, however the master will end with an incremented
|
||||
* offset because of the PINGs and will not be able to incrementally
|
||||
* PSYNC with the new master.
|
||||
* This function trims the replication backlog when needed, and returns
|
||||
* the offset to be used for future partial sync.
|
||||
*
|
||||
* If the integer 'adjusted' was passed by reference, it is set to 1
|
||||
* if the function call actually modified the offset and the replication
|
||||
* backlog, otherwise it is set to 0. It can be NULL if the caller is
|
||||
* not interested in getting this info. */
|
||||
long long adjustMeaningfulReplOffset(int *adjusted) {
|
||||
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
|
||||
long long delta = server.master_repl_offset -
|
||||
server.master_repl_meaningful_offset;
|
||||
serverLog(LL_NOTICE,
|
||||
"Using the meaningful offset %lld instead of %lld to exclude "
|
||||
"the final PINGs (%lld bytes difference)",
|
||||
server.master_repl_meaningful_offset,
|
||||
server.master_repl_offset,
|
||||
delta);
|
||||
server.master_repl_offset = server.master_repl_meaningful_offset;
|
||||
if (server.repl_backlog_histlen <= delta) {
|
||||
server.repl_backlog_histlen = 0;
|
||||
server.repl_backlog_idx = 0;
|
||||
} else {
|
||||
server.repl_backlog_histlen -= delta;
|
||||
server.repl_backlog_idx =
|
||||
(server.repl_backlog_idx + (server.repl_backlog_size - delta)) %
|
||||
server.repl_backlog_size;
|
||||
}
|
||||
if (adjusted) *adjusted = 1;
|
||||
} else {
|
||||
if (adjusted) *adjusted = 0;
|
||||
}
|
||||
return server.master_repl_offset;
|
||||
}
|
||||
|
||||
/* This function is called when a master is turend into a slave, in order to
|
||||
@ -2854,7 +2801,7 @@ void replicationCacheMasterUsingMyself(void) {
|
||||
* by replicationCreateMasterClient(). We'll later set the created
|
||||
* master as server.cached_master, so the replica will use such
|
||||
* offset for PSYNC. */
|
||||
server.master_initial_offset = adjustMeaningfulReplOffset(NULL);
|
||||
server.master_initial_offset = server.master_repl_offset;
|
||||
|
||||
/* The master client we create can be set to any DBID, because
|
||||
* the new master will start its replication stream with SELECT. */
|
||||
@ -3246,18 +3193,10 @@ void replicationCron(void) {
|
||||
clientsArePaused();
|
||||
|
||||
if (!manual_failover_in_progress) {
|
||||
long long before_ping = server.master_repl_meaningful_offset;
|
||||
ping_argv[0] = createStringObject("PING",4);
|
||||
replicationFeedSlaves(server.slaves, server.slaveseldb,
|
||||
ping_argv, 1);
|
||||
decrRefCount(ping_argv[0]);
|
||||
/* The server.master_repl_meaningful_offset variable represents
|
||||
* the offset of the replication stream without the pending PINGs.
|
||||
* This is useful to set the right replication offset for PSYNC
|
||||
* when the master is turned into a replica. Otherwise pending
|
||||
* PINGs may not allow it to perform an incremental sync with the
|
||||
* new master. */
|
||||
server.master_repl_meaningful_offset = before_ping;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2394,7 +2394,6 @@ void initServerConfig(void) {
|
||||
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
|
||||
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||
server.master_repl_offset = 0;
|
||||
server.master_repl_meaningful_offset = 0;
|
||||
|
||||
/* Replication partial resync backlog */
|
||||
server.repl_backlog = NULL;
|
||||
@ -4471,7 +4470,6 @@ sds genRedisInfoString(const char *section) {
|
||||
"master_replid:%s\r\n"
|
||||
"master_replid2:%s\r\n"
|
||||
"master_repl_offset:%lld\r\n"
|
||||
"master_repl_meaningful_offset:%lld\r\n"
|
||||
"second_repl_offset:%lld\r\n"
|
||||
"repl_backlog_active:%d\r\n"
|
||||
"repl_backlog_size:%lld\r\n"
|
||||
@ -4480,7 +4478,6 @@ sds genRedisInfoString(const char *section) {
|
||||
server.replid,
|
||||
server.replid2,
|
||||
server.master_repl_offset,
|
||||
server.master_repl_meaningful_offset,
|
||||
server.second_replid_offset,
|
||||
server.repl_backlog != NULL,
|
||||
server.repl_backlog_size,
|
||||
@ -4858,7 +4855,6 @@ void loadDataFromDisk(void) {
|
||||
{
|
||||
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
|
||||
server.master_repl_offset = rsi.repl_offset;
|
||||
server.master_repl_meaningful_offset = rsi.repl_offset;
|
||||
/* If we are a slave, create a cached master from this
|
||||
* information, in order to allow partial resynchronizations
|
||||
* with masters. */
|
||||
|
@ -1261,7 +1261,6 @@ struct redisServer {
|
||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
|
||||
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
|
||||
long long master_repl_offset; /* My current replication offset */
|
||||
long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */
|
||||
long long second_replid_offset; /* Accept offsets up to this for replid2. */
|
||||
int slaveseldb; /* Last SELECTed DB in replication output */
|
||||
int repl_ping_slave_period; /* Master pings the slave every N seconds */
|
||||
@ -1660,7 +1659,7 @@ int getClientType(client *c);
|
||||
int getClientTypeByName(char *name);
|
||||
char *getClientTypeName(int class);
|
||||
void flushSlavesOutputBuffers(void);
|
||||
void disconnectSlaves(int async);
|
||||
void disconnectSlaves(void);
|
||||
int listenToPort(int port, int *fds, int *count);
|
||||
void pauseClients(mstime_t duration);
|
||||
int clientsArePaused(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user