Handle the case where querybuf data is read by the fastsync read handler
Former-commit-id: c4a5b904e941e09132413abc3b4d86c59c342051
This commit is contained in:
parent
30bba7f7de
commit
1a597c78bd
@ -2157,14 +2157,13 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
* Returns if no storage provider is used. */
|
* Returns if no storage provider is used. */
|
||||||
void saveMasterStatusToStorage(bool fShutdown)
|
void saveMasterStatusToStorage(bool fShutdown)
|
||||||
{
|
{
|
||||||
long long tmp = LONG_LONG_MAX;
|
|
||||||
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
|
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
|
||||||
|
|
||||||
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
|
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
|
||||||
if (fShutdown)
|
if (fShutdown)
|
||||||
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
|
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
|
||||||
else
|
else
|
||||||
g_pserver->metadataDb->insert("repl-offset", 11, &tmp, sizeof(g_pserver->master_repl_offset), true);
|
g_pserver->metadataDb->erase("repl-offset", 11);
|
||||||
|
|
||||||
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
|
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
|
||||||
int zero = 0;
|
int zero = 0;
|
||||||
@ -2527,6 +2526,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
|
|||||||
while (sdslen(mi->bulkreadBuffer) > offset) {
|
while (sdslen(mi->bulkreadBuffer) > offset) {
|
||||||
// Pop completed items
|
// Pop completed items
|
||||||
mi->parseState->trimState();
|
mi->parseState->trimState();
|
||||||
|
if (mi->parseState->depth() == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
if (mi->bulkreadBuffer[offset] == '*') {
|
if (mi->bulkreadBuffer[offset] == '*') {
|
||||||
// Starting an array
|
// Starting an array
|
||||||
@ -2609,8 +2610,6 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "Fast sync complete");
|
serverLog(LL_NOTICE, "Fast sync complete");
|
||||||
sdsfree(mi->bulkreadBuffer);
|
|
||||||
mi->bulkreadBuffer = nullptr;
|
|
||||||
delete mi->parseState;
|
delete mi->parseState;
|
||||||
mi->parseState = nullptr;
|
mi->parseState = nullptr;
|
||||||
return true;
|
return true;
|
||||||
@ -2996,6 +2995,18 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
|
|
||||||
/* Final setup of the connected slave <- master link */
|
/* Final setup of the connected slave <- master link */
|
||||||
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
|
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
|
||||||
|
if (mi->isRocksdbSnapshotRepl) {
|
||||||
|
/* We need to handle the case where the initial querybuf data was read by fast sync */
|
||||||
|
/* This should match the work readQueryFromClient would do for a master client */
|
||||||
|
mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer);
|
||||||
|
sdsfree(mi->bulkreadBuffer);
|
||||||
|
mi->bulkreadBuffer = nullptr;
|
||||||
|
|
||||||
|
mi->master->pending_querybuf = sdscatlen(mi->master->pending_querybuf,
|
||||||
|
mi->master->querybuf,sdslen(mi->master->querybuf));
|
||||||
|
|
||||||
|
mi->master->read_reploff += sdslen(mi->master->querybuf);
|
||||||
|
}
|
||||||
mi->repl_transfer_s = nullptr;
|
mi->repl_transfer_s = nullptr;
|
||||||
mi->repl_state = REPL_STATE_CONNECTED;
|
mi->repl_state = REPL_STATE_CONNECTED;
|
||||||
mi->repl_down_since = 0;
|
mi->repl_down_since = 0;
|
||||||
@ -3043,6 +3054,8 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
* will trigger an AOF rewrite, and when done will start appending
|
* will trigger an AOF rewrite, and when done will start appending
|
||||||
* to the new file. */
|
* to the new file. */
|
||||||
if (g_pserver->aof_enabled) restartAOFAfterSYNC();
|
if (g_pserver->aof_enabled) restartAOFAfterSYNC();
|
||||||
|
if (mi->isRocksdbSnapshotRepl)
|
||||||
|
readQueryFromClient(conn); // There may be querybuf data we just appeneded
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,11 +6,11 @@ proc prepare_value {size} {
|
|||||||
return $_v
|
return $_v
|
||||||
}
|
}
|
||||||
|
|
||||||
start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} {
|
start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master} databases 256}} {
|
||||||
set slave [srv 0 client]
|
set slave [srv 0 client]
|
||||||
set slave_host [srv 0 host]
|
set slave_host [srv 0 host]
|
||||||
set slave_port [srv 0 port]
|
set slave_port [srv 0 port]
|
||||||
start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} {
|
start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica} databases 256}} {
|
||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
set master_port [srv 0 port]
|
set master_port [srv 0 port]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user