From 1a597c78bd189bd5b05bdf2e431b68f4fb8e0c22 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 27 Dec 2021 00:15:09 -0500 Subject: [PATCH] Handle the case where querybuf data is read by the fastsync read handler Former-commit-id: c4a5b904e941e09132413abc3b4d86c59c342051 --- src/replication.cpp | 21 +++++++++++++++++---- tests/integration/replication-fast.tcl | 4 ++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index b1ad41b14..1b095f031 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2157,14 +2157,13 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) * Returns if no storage provider is used. */ void saveMasterStatusToStorage(bool fShutdown) { - long long tmp = LONG_LONG_MAX; if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); if (fShutdown) g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); else - g_pserver->metadataDb->insert("repl-offset", 11, &tmp, sizeof(g_pserver->master_repl_offset), true); + g_pserver->metadataDb->erase("repl-offset", 11); if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { int zero = 0; @@ -2527,6 +2526,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi while (sdslen(mi->bulkreadBuffer) > offset) { // Pop completed items mi->parseState->trimState(); + if (mi->parseState->depth() == 0) + break; if (mi->bulkreadBuffer[offset] == '*') { // Starting an array @@ -2609,8 +2610,6 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi return false; serverLog(LL_NOTICE, "Fast sync complete"); - sdsfree(mi->bulkreadBuffer); - mi->bulkreadBuffer = nullptr; delete mi->parseState; mi->parseState = nullptr; return true; @@ -2996,6 +2995,18 @@ void readSyncBulkPayload(connection *conn) { /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); + if (mi->isRocksdbSnapshotRepl) { + /* We need to handle the case where the initial querybuf data was read by fast sync */ + /* This should match the work readQueryFromClient would do for a master client */ + mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer); + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + + mi->master->pending_querybuf = sdscatlen(mi->master->pending_querybuf, + mi->master->querybuf,sdslen(mi->master->querybuf)); + + mi->master->read_reploff += sdslen(mi->master->querybuf); + } mi->repl_transfer_s = nullptr; mi->repl_state = REPL_STATE_CONNECTED; mi->repl_down_since = 0; @@ -3043,6 +3054,8 @@ void readSyncBulkPayload(connection *conn) { * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (g_pserver->aof_enabled) restartAOFAfterSYNC(); + if (mi->isRocksdbSnapshotRepl) + readQueryFromClient(conn); // There may be querybuf data we just appeneded return; } diff --git a/tests/integration/replication-fast.tcl b/tests/integration/replication-fast.tcl index f190524c1..a7368ff2c 100644 --- a/tests/integration/replication-fast.tcl +++ b/tests/integration/replication-fast.tcl @@ -6,11 +6,11 @@ proc prepare_value {size} { return $_v } -start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} { +start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master} databases 256}} { set slave [srv 0 client] set slave_host [srv 0 host] set slave_port [srv 0 port] - start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} { + start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica} databases 256}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port]