diff --git a/src/blocked.c b/src/blocked.c index 00cc798d5..045369e93 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -110,7 +110,7 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBufferAndReplicate(c); + processInputBuffer(c); } } } diff --git a/src/networking.c b/src/networking.c index 744979d16..e4d40fdf0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1671,12 +1671,55 @@ int processMultibulkBuffer(client *c) { return C_ERR; } +/* Perform necessary tasks after a command was executed: + * + * 1. The client is reset unless there are reasons to avoid doing it. + * 2. In the case of master clients, the replication offset is updated. + * 3. Propagate commands we got from our master to replicas down the line. */ +void commandProcessed(client *c) { + int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand; + long long prev_offset = c->reploff; + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } + + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ + if (c->flags & CLIENT_MASTER) { + long long applied = c->reploff - prev_offset; + long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset; + if (applied) { + replicationFeedSlavesFromMasterStream(server.slaves, + c->pending_querybuf, applied); + sdsrange(c->pending_querybuf,applied,-1); + } + /* The server.master_repl_meaningful_offset variable represents + * the offset of the replication stream without the pending PINGs. */ + if (cmd_is_ping) + server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset; + } +} + /* This function calls processCommand(), but also performs a few sub tasks - * that are useful in that context: + * for the client that are useful in that context: * * 1. It sets the current client to the client 'c'. - * 2. In the case of master clients, the replication offset is updated. - * 3. The client is reset unless there are reasons to avoid doing it. + * 2. calls commandProcessed() if the command was handled. * * The function returns C_ERR in case the client was freed as a side effect * of processing the command, otherwise C_OK is returned. */ @@ -1684,20 +1727,7 @@ int processCommandAndResetClient(client *c) { int deadclient = 0; server.current_client = c; if (processCommand(c) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) - { - resetClient(c); - } + commandProcessed(c); } if (server.current_client == NULL) deadclient = 1; server.current_client = NULL; @@ -1794,31 +1824,6 @@ void processInputBuffer(client *c) { } } -/* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-replicas, in case the client 'c' - * is flagged as master. Usually you want to call this instead of the - * raw processInputBuffer(). */ -void processInputBufferAndReplicate(client *c) { - if (!(c->flags & CLIENT_MASTER)) { - processInputBuffer(c); - } else { - /* If the client is a master we need to compute the difference - * between the applied offset before and after processing the buffer, - * to understand how much of the replication stream was actually - * applied to the master state: this quantity, and its corresponding - * part of the replication stream, will be propagated to the - * sub-replicas and to the replication backlog. */ - size_t prev_offset = c->reploff; - processInputBuffer(c); - size_t applied = c->reploff - prev_offset; - if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); - sdsrange(c->pending_querybuf,applied,-1); - } - } -} - void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; @@ -1886,7 +1891,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ - processInputBufferAndReplicate(c); + processInputBuffer(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -3101,7 +3106,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { continue; } } - processInputBufferAndReplicate(c); + processInputBuffer(c); } return processed; } diff --git a/src/replication.c b/src/replication.c index 3e9910374..c59639cd1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,6 +39,7 @@ #include #include +long long adjustMeaningfulReplOffset(); void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); @@ -2693,6 +2694,9 @@ void replicationCacheMaster(client *c) { * pending outputs to the master. */ sdsclear(server.master->querybuf); sdsclear(server.master->pending_querybuf); + /* Adjust reploff and read_reploff to the last meaningful offset we executed. + * this is the offset the replica will use for future PSYNC. */ + server.master->reploff = adjustMeaningfulReplOffset(); server.master->read_reploff = server.master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); @@ -2717,6 +2721,38 @@ void replicationCacheMaster(client *c) { replicationHandleMasterDisconnection(); } +/* If the "meaningful" offset, that is the offset without the final PINGs + * in the stream, is different than the last offset, use it instead: + * often when the master is no longer reachable, replicas will never + * receive the PINGs, however the master will end with an incremented + * offset because of the PINGs and will not be able to incrementally + * PSYNC with the new master. + * This function trims the replication backlog when needed, and returns + * the offset to be used for future partial sync. */ +long long adjustMeaningfulReplOffset() { + if (server.master_repl_offset > server.master_repl_meaningful_offset) { + long long delta = server.master_repl_offset - + server.master_repl_meaningful_offset; + serverLog(LL_NOTICE, + "Using the meaningful offset %lld instead of %lld to exclude " + "the final PINGs (%lld bytes difference)", + server.master_repl_meaningful_offset, + server.master_repl_offset, + delta); + server.master_repl_offset = server.master_repl_meaningful_offset; + if (server.repl_backlog_histlen <= delta) { + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + } else { + server.repl_backlog_histlen -= delta; + server.repl_backlog_idx = + (server.repl_backlog_idx + (server.repl_backlog_size - delta)) % + server.repl_backlog_size; + } + } + return server.master_repl_offset; +} + /* This function is called when a master is turend into a slave, in order to * create from scratch a cached master for the new client, that will allow * to PSYNC with the slave that was promoted as the new master after a @@ -2736,35 +2772,7 @@ void replicationCacheMasterUsingMyself(void) { * by replicationCreateMasterClient(). We'll later set the created * master as server.cached_master, so the replica will use such * offset for PSYNC. */ - server.master_initial_offset = server.master_repl_offset; - - /* However if the "meaningful" offset, that is the offset without - * the final PINGs in the stream, is different, use this instead: - * often when the master is no longer reachable, replicas will never - * receive the PINGs, however the master will end with an incremented - * offset because of the PINGs and will not be able to incrementally - * PSYNC with the new master. */ - if (server.master_repl_offset > server.master_repl_meaningful_offset) { - long long delta = server.master_repl_offset - - server.master_repl_meaningful_offset; - serverLog(LL_NOTICE, - "Using the meaningful offset %lld instead of %lld to exclude " - "the final PINGs (%lld bytes difference)", - server.master_repl_meaningful_offset, - server.master_repl_offset, - delta); - server.master_initial_offset = server.master_repl_meaningful_offset; - server.master_repl_offset = server.master_repl_meaningful_offset; - if (server.repl_backlog_histlen <= delta) { - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - } else { - server.repl_backlog_histlen -= delta; - server.repl_backlog_idx = - (server.repl_backlog_idx + (server.repl_backlog_size - delta)) % - server.repl_backlog_size; - } - } + server.master_initial_offset = adjustMeaningfulReplOffset(); /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ diff --git a/src/server.h b/src/server.h index 9e1e506af..41d767e13 100644 --- a/src/server.h +++ b/src/server.h @@ -1600,7 +1600,6 @@ void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); void processInputBuffer(client *c); -void processInputBufferAndReplicate(client *c); void processGopherRequest(client *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 333736ffa..4e1189e0b 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -44,6 +44,7 @@ start_server {} { set used [list $master_id] test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" { $R($master_id) slaveof no one + $R($master_id) config set repl-ping-replica-period 1 ;# increse the chance that random ping will cause issues if {$counter_value == 0} { $R($master_id) set x $counter_value } @@ -114,23 +115,20 @@ start_server {} { } } - # wait for all the slaves to be in sync with the master - set master_ofs [status $R($master_id) master_repl_offset] + # wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too wait_for_condition 500 100 { - $master_ofs == [status $R(0) master_repl_offset] && - $master_ofs == [status $R(1) master_repl_offset] && - $master_ofs == [status $R(2) master_repl_offset] && - $master_ofs == [status $R(3) master_repl_offset] && - $master_ofs == [status $R(4) master_repl_offset] + [status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset] } else { - if {$debug_msg} { - for {set j 0} {$j < 5} {incr j} { - puts "$j: sync_full: [status $R($j) sync_full]" - puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]" - puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]" - puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]" - puts "---" - } + for {set j 0} {$j < 5} {incr j} { + puts "$j: sync_full: [status $R($j) sync_full]" + puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]" + puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]" + puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]" + puts "---" } fail "Slaves are not in sync with the master after too long time." } @@ -175,9 +173,14 @@ start_server {} { $R($j) slaveof $master_host $master_port } - # Wait for slaves to sync + # Wait for replicas to sync. it is not enough to just wait for connected_slaves==4 + # since we might do the check before the master realized that they're disconnected wait_for_condition 50 1000 { - [status $R($master_id) connected_slaves] == 4 + [status $R($master_id) connected_slaves] == 4 && + [status $R([expr {($master_id+1)%5}]) master_link_status] == "up" && + [status $R([expr {($master_id+2)%5}]) master_link_status] == "up" && + [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" && + [status $R([expr {($master_id+4)%5}]) master_link_status] == "up" } else { fail "Replica not reconnecting" } @@ -188,6 +191,7 @@ start_server {} { set slave_id [expr {($master_id+1)%5}] set sync_count [status $R($master_id) sync_full] set sync_partial [status $R($master_id) sync_partial_ok] + set sync_partial_err [status $R($master_id) sync_partial_err] catch { $R($slave_id) config rewrite $R($slave_id) debug restart @@ -197,7 +201,11 @@ start_server {} { wait_for_condition 50 1000 { [status $R($master_id) sync_partial_ok] == $sync_partial + 1 } else { - fail "Replica not reconnecting" + puts "prev sync_full: $sync_count" + puts "prev sync_partial_ok: $sync_partial" + puts "prev sync_partial_err: $sync_partial_err" + puts [$R($master_id) info stats] + fail "Replica didn't partial sync" } set new_sync_count [status $R($master_id) sync_full] assert {$sync_count == $new_sync_count} @@ -271,3 +279,103 @@ start_server {} { } }}}}} + +start_server {tags {"psync2"}} { +start_server {} { +start_server {} { +start_server {} { +start_server {} { + test {pings at the end of replication stream are ignored for psync} { + set master [srv -4 client] + set master_host [srv -4 host] + set master_port [srv -4 port] + set replica1 [srv -3 client] + set replica2 [srv -2 client] + set replica3 [srv -1 client] + set replica4 [srv -0 client] + + $replica1 replicaof $master_host $master_port + $replica2 replicaof $master_host $master_port + $replica3 replicaof $master_host $master_port + $replica4 replicaof $master_host $master_port + wait_for_condition 50 1000 { + [status $master connected_slaves] == 4 + } else { + fail "replicas didn't connect" + } + + $master incr x + wait_for_condition 50 1000 { + [$replica1 get x] == 1 && [$replica2 get x] == 1 && + [$replica3 get x] == 1 && [$replica4 get x] == 1 + } else { + fail "replicas didn't get incr" + } + + # disconnect replica1 and replica2 + # and wait for the master to send a ping to replica3 and replica4 + $replica1 replicaof no one + $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id + $master config set repl-ping-replica-period 1 + after 1500 + + # make everyone sync from the replica1 that didn't get the last ping from the old master + # replica4 will keep syncing from the old master which now syncs from replica1 + # and replica2 will re-connect to the old master (which went back in time) + set new_master_host [srv -3 host] + set new_master_port [srv -3 port] + $replica3 replicaof $new_master_host $new_master_port + $master replicaof $new_master_host $new_master_port + $replica2 replicaof $master_host $master_port + wait_for_condition 50 1000 { + [status $replica2 master_link_status] == "up" && + [status $replica3 master_link_status] == "up" && + [status $replica4 master_link_status] == "up" && + [status $master master_link_status] == "up" + } else { + fail "replicas didn't connect" + } + + # make sure replication is still alive and kicking + $replica1 incr x + wait_for_condition 50 1000 { + [$replica2 get x] == 2 && + [$replica3 get x] == 2 && + [$replica4 get x] == 2 && + [$master get x] == 2 + } else { + fail "replicas didn't get incr" + } + + # make sure there are full syncs other than the initial ones + assert_equal [status $master sync_full] 4 + assert_equal [status $replica1 sync_full] 0 + assert_equal [status $replica2 sync_full] 0 + assert_equal [status $replica3 sync_full] 0 + assert_equal [status $replica4 sync_full] 0 + + # force psync + $master client kill type master + $replica2 client kill type master + $replica3 client kill type master + $replica4 client kill type master + + # make sure replication is still alive and kicking + $replica1 incr x + wait_for_condition 50 1000 { + [$replica2 get x] == 3 && + [$replica3 get x] == 3 && + [$replica4 get x] == 3 && + [$master get x] == 3 + } else { + fail "replicas didn't get incr" + } + + # make sure there are full syncs other than the initial ones + assert_equal [status $master sync_full] 4 + assert_equal [status $replica1 sync_full] 0 + assert_equal [status $replica2 sync_full] 0 + assert_equal [status $replica3 sync_full] 0 + assert_equal [status $replica4 sync_full] 0 +} +}}}}}