diff --git a/src/rdb.cpp b/src/rdb.cpp index 4858f75f1..0afe08267 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2619,6 +2619,7 @@ class rdbAsyncWorkThread std::atomic workerThreadDone; std::thread m_thread; long long now; + long long lastPing = -1; static void listFreeMethod(const void *v) { delete reinterpret_cast(v); @@ -2654,7 +2655,7 @@ public: l.unlock(); usleep(1); pauseExecution(); - processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + ProcessWhileBlocked(); resumeExecution(); l.lock(); } @@ -2685,6 +2686,23 @@ public: cv.notify_one(); } + void ProcessWhileBlocked() { + if ((mstime() - lastPing) > 1000) { // Ping if its been a second or longer + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER) + replicationSendNewlineToMaster(mi); + } + lastPing = mstime(); + } + + processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + } + size_t ckeys() { return ckeysLoaded; } size_t endWork() { @@ -2698,14 +2716,14 @@ public: while (!workerThreadDone) { usleep(10); pauseExecution(); - processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + ProcessWhileBlocked(); resumeExecution(); } } m_thread.join(); while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) { usleep(10); - processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + ProcessWhileBlocked(); } fLaunched = false; fExit = false;