Fix active replication offset synchronization accounting, and enable the wait command

Former-commit-id: a5784ef09e71a9a45780a8f3dbab875b1f1fe1a5
This commit is contained in:
John Sully 2019-07-11 17:00:23 -04:00
parent 0c611d8347
commit 8ba25371a5
3 changed files with 12 additions and 4 deletions

View File

@ -144,6 +144,7 @@ client *createClient(int fd, int iel) {
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->reploff_skipped = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;

View File

@ -323,6 +323,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
char proto[1024];
int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
cchProto = std::min((int)sizeof(proto), cchProto);
long long master_repl_offset_start = g_pserver->master_repl_offset;
/* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog)
@ -375,8 +376,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) continue;
std::unique_lock<decltype(slave->lock)> lock(slave->lock);
if (serverTL->current_client && FSameHost(serverTL->current_client, slave))
{
slave->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
continue;
}
if (!fSendRaw)
addReplyProtoAsync(slave, proto, cchProto);
@ -987,7 +992,7 @@ void replconfCommand(client *c) {
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_off = offset + c->reploff_skipped;
c->repl_ack_time = g_pserver->unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
@ -1290,6 +1295,7 @@ void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) {
mi->master->flags |= CLIENT_MASTER;
mi->master->authenticated = 1;
mi->master->reploff = mi->master_initial_offset;
mi->master->reploff_skipped = 0;
mi->master->read_reploff = mi->master->reploff;
mi->master->puser = NULL; /* This client can do everything. */
@ -2786,7 +2792,7 @@ void waitCommand(client *c) {
long numreplicas, ackreplicas;
long long offset = c->woff;
if (listLength(g_pserver->masters)) {
if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) {
addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
return;
}

View File

@ -932,6 +932,7 @@ typedef struct client {
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long reploff_skipped; /* Repl backlog we did not send to this client */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
@ -1160,7 +1161,7 @@ struct redisMaster {
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *master;
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into