diff --git a/src/rdb.cpp b/src/rdb.cpp index 2666e0e8c..2f5f73cd5 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -3030,15 +3030,18 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) { rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); + bool fUpdateReplication = (g_pserver->mstime - r->last_update) > 1000; - listIter li; - listNode *ln; - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); - if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER) - replicationSendNewlineToMaster(mi); + if (fUpdateReplication) { + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER) + replicationSendNewlineToMaster(mi); + } } loadingProgress(r->processed_bytes); @@ -3050,12 +3053,15 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processModuleLoadingProgressEvent(0); - robj *ping_argv[1]; + if (fUpdateReplication) { + robj *ping_argv[1]; - ping_argv[0] = createStringObject("PING",4); - replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); - decrRefCount(ping_argv[0]); + ping_argv[0] = createStringObject("PING",4); + replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); + decrRefCount(ping_argv[0]); + } + if (fUpdateReplication) r->last_update = g_pserver->mstime; r->keys_since_last_callback = 0; } } diff --git a/src/rio.cpp b/src/rio.cpp index 809d028ef..bdff0c66b 100644 --- a/src/rio.cpp +++ b/src/rio.cpp @@ -105,6 +105,7 @@ static const rio rioBufferIO = { 0, /* bytes read or written */ 0, /* keys since last callback */ 0, /* read/write chunk size */ + 0, /* last update time */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -120,6 +121,7 @@ static const rio rioConstBufferIO = { 0, /* bytes read or written */ 0, /* keys since last callback */ 0, /* read/write chunk size */ + 0, /* last update time */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -184,6 +186,7 @@ static const rio rioFileIO = { 0, /* bytes read or written */ 0, /* keys since last callback */ 0, /* read/write chunk size */ + 0, /* last update time */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -284,6 +287,7 @@ static const rio rioConnIO = { 0, /* bytes read or written */ 0, /* keys since last callback */ 0, /* read/write chunk size */ + 0, /* last update time */ { { NULL, 0 } } /* union for io-specific vars */ }; @@ -404,6 +408,7 @@ static const rio rioFdIO = { 0, /* bytes read or written */ 0, /* keys since last callback */ 0, /* read/write chunk size */ + 0, /* last update time */ { { NULL, 0 } } /* union for io-specific vars */ }; diff --git a/src/rio.h b/src/rio.h index 031dfb36c..4198cc63f 100644 --- a/src/rio.h +++ b/src/rio.h @@ -72,6 +72,9 @@ struct _rio { /* maximum single read or write chunk size */ size_t max_processing_chunk; + /* last update time */ + long long last_update; + /* Backend-specific vars. */ union { /* In-memory buffer target. */