diff --git a/src/networking.cpp b/src/networking.cpp index 76d07a117..39282d5c6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -144,6 +144,7 @@ client *createClient(int fd, int iel) { c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; + c->reploff_skipped = 0; c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; diff --git a/src/replication.cpp b/src/replication.cpp index e9e37f191..c5d9fa6bc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -323,6 +323,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char proto[1024]; int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); + long long master_repl_offset_start = g_pserver->master_repl_offset; /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -375,8 +376,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) continue; std::unique_locklock)> lock(slave->lock); + if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) + { + slave->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; + continue; + } if (!fSendRaw) addReplyProtoAsync(slave, proto, cchProto); @@ -987,7 +992,7 @@ void replconfCommand(client *c) { if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) return; if (offset > c->repl_ack_off) - c->repl_ack_off = offset; + c->repl_ack_off = offset + c->reploff_skipped; c->repl_ack_time = g_pserver->unixtime; /* If this was a diskless replication, we need to really put * the slave online when the first ACK is received (which @@ -1290,6 +1295,7 @@ void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) { mi->master->flags |= CLIENT_MASTER; mi->master->authenticated = 1; mi->master->reploff = mi->master_initial_offset; + mi->master->reploff_skipped = 0; mi->master->read_reploff = mi->master->reploff; mi->master->puser = NULL; /* This client can do everything. */ @@ -2786,7 +2792,7 @@ void waitCommand(client *c) { long numreplicas, ackreplicas; long long offset = c->woff; - if (listLength(g_pserver->masters)) { + if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated."); return; } diff --git a/src/server.h b/src/server.h index 1e854abca..8f45d39ed 100644 --- a/src/server.h +++ b/src/server.h @@ -932,6 +932,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_skipped; /* Repl backlog we did not send to this client */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves @@ -1160,7 +1161,7 @@ struct redisMaster { char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ int masterport; /* Port of master */ - client *cached_master; /* Cached master to be reused for PSYNC. */ + client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into