Do not send updates to servers overly frequently during load

This commit is contained in:
John Sully 2022-04-12 18:57:01 +00:00 committed by John Sully
parent 55dbb698e7
commit c9d70ca62d
3 changed files with 26 additions and 12 deletions

View File

@ -3030,15 +3030,18 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
bool fUpdateReplication = (g_pserver->mstime - r->last_update) > 1000;
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
if (fUpdateReplication) {
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
}
loadingProgress(r->processed_bytes);
@ -3050,12 +3053,15 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
if (fUpdateReplication) {
robj *ping_argv[1];
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
}
if (fUpdateReplication) r->last_update = g_pserver->mstime;
r->keys_since_last_callback = 0;
}
}

View File

@ -105,6 +105,7 @@ static const rio rioBufferIO = {
0, /* bytes read or written */
0, /* keys since last callback */
0, /* read/write chunk size */
0, /* last update time */
{ { NULL, 0 } } /* union for io-specific vars */
};
@ -120,6 +121,7 @@ static const rio rioConstBufferIO = {
0, /* bytes read or written */
0, /* keys since last callback */
0, /* read/write chunk size */
0, /* last update time */
{ { NULL, 0 } } /* union for io-specific vars */
};
@ -184,6 +186,7 @@ static const rio rioFileIO = {
0, /* bytes read or written */
0, /* keys since last callback */
0, /* read/write chunk size */
0, /* last update time */
{ { NULL, 0 } } /* union for io-specific vars */
};
@ -284,6 +287,7 @@ static const rio rioConnIO = {
0, /* bytes read or written */
0, /* keys since last callback */
0, /* read/write chunk size */
0, /* last update time */
{ { NULL, 0 } } /* union for io-specific vars */
};
@ -404,6 +408,7 @@ static const rio rioFdIO = {
0, /* bytes read or written */
0, /* keys since last callback */
0, /* read/write chunk size */
0, /* last update time */
{ { NULL, 0 } } /* union for io-specific vars */
};

View File

@ -72,6 +72,9 @@ struct _rio {
/* maximum single read or write chunk size */
size_t max_processing_chunk;
/* last update time */
long long last_update;
/* Backend-specific vars. */
union {
/* In-memory buffer target. */