We need to send keepalives to masters while waiting to prevent disconnects

Former-commit-id: 7cbd6758b1042198c14ca9e8da0f1f7bc05df93d
This commit is contained in:
John Sully 2021-09-01 04:15:59 +00:00
parent 40d5a2db44
commit 29518a1399

View File

@ -2619,6 +2619,7 @@ class rdbAsyncWorkThread
std::atomic<bool> workerThreadDone; std::atomic<bool> workerThreadDone;
std::thread m_thread; std::thread m_thread;
long long now; long long now;
long long lastPing = -1;
static void listFreeMethod(const void *v) { static void listFreeMethod(const void *v) {
delete reinterpret_cast<const rdbInsertJob*>(v); delete reinterpret_cast<const rdbInsertJob*>(v);
@ -2654,7 +2655,7 @@ public:
l.unlock(); l.unlock();
usleep(1); usleep(1);
pauseExecution(); pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); ProcessWhileBlocked();
resumeExecution(); resumeExecution();
l.lock(); l.lock();
} }
@ -2685,6 +2686,23 @@ public:
cv.notify_one(); cv.notify_one();
} }
void ProcessWhileBlocked() {
if ((mstime() - lastPing) > 1000) { // Ping if its been a second or longer
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
lastPing = mstime();
}
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
}
size_t ckeys() { return ckeysLoaded; } size_t ckeys() { return ckeysLoaded; }
size_t endWork() { size_t endWork() {
@ -2698,14 +2716,14 @@ public:
while (!workerThreadDone) { while (!workerThreadDone) {
usleep(10); usleep(10);
pauseExecution(); pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); ProcessWhileBlocked();
resumeExecution(); resumeExecution();
} }
} }
m_thread.join(); m_thread.join();
while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) { while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) {
usleep(10); usleep(10);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); ProcessWhileBlocked();
} }
fLaunched = false; fLaunched = false;
fExit = false; fExit = false;