From 28895d72fe2f98452f7be38057c1bbc684feffdd Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 24 Mar 2021 19:27:39 +0000 Subject: [PATCH 1/5] added condition variable to time thread; wake on afterSleep, sleep on beforeSleep Former-commit-id: ff2f2a3aceff2ba4a74951197348d67fc39568b2 --- src/server.cpp | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index fbe0458e3..6c3f740af 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -61,6 +61,7 @@ #include #include #include +#include #include "aelocker.h" #include "keycheck.h" #include "motd.h" @@ -97,6 +98,10 @@ redisServer *g_pserver = &GlobalHidden::server; struct redisServerConst cserver; thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars volatile unsigned long lru_clock; /* Server global current LRU time. */ +std::mutex time_thread_mutex; +std::condition_variable time_thread_cv; +bool time_thread_running = false; +void wakeTimeThread(); /* Our command table. * @@ -2509,6 +2514,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { serverAssert(g_pserver->repl_batch_offStart < 0); runAndPropogateToReplicas(processClients); + time_thread_running = false; + /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2655,6 +2662,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { /* Aquire the modules GIL so that their threads won't touch anything. */ if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); + wakeTimeThread(); + serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); for (int idb = 0; idb < cserver.dbnum; ++idb) @@ -5731,6 +5740,8 @@ static void sigShutdownHandler(int sig) { msg = "Received shutdown signal, scheduling shutdown..."; }; + wakeTimeThread(); + /* SIGINT is often delivered via Ctrl+C in an interactive session. * If we receive the signal the second time, we interpret this as * the user really wanting to quit ASAP without waiting to persist @@ -6076,15 +6087,24 @@ void OnTerminate() serverPanic("std::teminate() called"); } +void wakeTimeThread() { + time_thread_running = true; + time_thread_cv.notify_one(); +} + void *timeThreadMain(void*) { timespec delay; delay.tv_sec = 0; delay.tv_nsec = 100; while (true) { + std::unique_lock lock(time_thread_mutex); + if (!time_thread_running) { + time_thread_cv.wait(lock); + } updateCachedTime(); clock_nanosleep(CLOCK_REALTIME, 0, &delay, NULL); } -} +} void *workerThreadMain(void *parg) { From 22cffc5d5465c46113055ee7079fef7a2b84ea36 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 24 Mar 2021 20:24:07 +0000 Subject: [PATCH 2/5] removed extraneous wakeTimeThread cal Former-commit-id: cfb5c5d7dc1e09c51ca416a2dbb19eab4bbaa77a --- src/server.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 6c3f740af..0f503ece3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5740,8 +5740,6 @@ static void sigShutdownHandler(int sig) { msg = "Received shutdown signal, scheduling shutdown..."; }; - wakeTimeThread(); - /* SIGINT is often delivered via Ctrl+C in an interactive session. * If we receive the signal the second time, we interpret this as * the user really wanting to quit ASAP without waiting to persist From 6d1be2a3f5ee59ae1ceef0a5fb56b83a56d4eeb1 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 24 Mar 2021 20:33:26 +0000 Subject: [PATCH 3/5] counter for time thread sleeping threads Former-commit-id: 6270939d6a2d7820fecac9c5aedf0ff985531f6b --- src/server.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 0f503ece3..7827879a5 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -100,7 +100,7 @@ thread_local struct redisServerThreadVars *serverTL = NULL; // thread local se volatile unsigned long lru_clock; /* Server global current LRU time. */ std::mutex time_thread_mutex; std::condition_variable time_thread_cv; -bool time_thread_running = false; +int sleeping_threads = cserver.cthreads; void wakeTimeThread(); /* Our command table. @@ -2514,7 +2514,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { serverAssert(g_pserver->repl_batch_offStart < 0); runAndPropogateToReplicas(processClients); - time_thread_running = false; + time_thread_mutex.lock(); + sleeping_threads++; + time_thread_mutex.unlock(); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -6086,7 +6088,9 @@ void OnTerminate() } void wakeTimeThread() { - time_thread_running = true; + time_thread_mutex.lock(); + sleeping_threads--; + time_thread_mutex.unlock(); time_thread_cv.notify_one(); } @@ -6096,7 +6100,7 @@ void *timeThreadMain(void*) { delay.tv_nsec = 100; while (true) { std::unique_lock lock(time_thread_mutex); - if (!time_thread_running) { + if (sleeping_threads >= cserver.cthreads) { time_thread_cv.wait(lock); } updateCachedTime(); From fa44f2650edd7a2a7ca7b9b17b9cb307edbdb709 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 24 Mar 2021 20:44:40 +0000 Subject: [PATCH 4/5] serverAsserts on sleeping_threads, remove bare locks Former-commit-id: 8c64ca2333d8eb2ee92d835907474ec63e127b62 --- src/server.cpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 7827879a5..26bf59b79 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2514,9 +2514,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { serverAssert(g_pserver->repl_batch_offStart < 0); runAndPropogateToReplicas(processClients); - time_thread_mutex.lock(); - sleeping_threads++; - time_thread_mutex.unlock(); + { + std::lock_guard lock(time_thread_mutex); + sleeping_threads++; + serverAssert(sleeping_threads <= cserver.cthreads); + } /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -6088,9 +6090,9 @@ void OnTerminate() } void wakeTimeThread() { - time_thread_mutex.lock(); + std::lock_guard lock(time_thread_mutex); sleeping_threads--; - time_thread_mutex.unlock(); + serverAssert(sleeping_threads >= 0); time_thread_cv.notify_one(); } @@ -6099,9 +6101,11 @@ void *timeThreadMain(void*) { delay.tv_sec = 0; delay.tv_nsec = 100; while (true) { - std::unique_lock lock(time_thread_mutex); - if (sleeping_threads >= cserver.cthreads) { - time_thread_cv.wait(lock); + { + std::unique_lock lock(time_thread_mutex); + if (sleeping_threads >= cserver.cthreads) { + time_thread_cv.wait(lock); + } } updateCachedTime(); clock_nanosleep(CLOCK_REALTIME, 0, &delay, NULL); From 13697db12158d176725dfd7572440ef8277bd9ba Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 24 Mar 2021 21:16:21 +0000 Subject: [PATCH 5/5] moved time thread code to end of beforeSleep Former-commit-id: ac1022c772c7357571829f24c87aa3dc2deade72 --- src/server.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 26bf59b79..97a29f1a3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2514,12 +2514,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { serverAssert(g_pserver->repl_batch_offStart < 0); runAndPropogateToReplicas(processClients); - { - std::lock_guard lock(time_thread_mutex); - sleeping_threads++; - serverAssert(sleeping_threads <= cserver.cthreads); - } - /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2651,6 +2645,13 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.disarm(); if (!fSentReplies) handleClientsWithPendingWrites(iel, aof_state); + + { + std::lock_guard lock(time_thread_mutex); + sleeping_threads++; + serverAssert(sleeping_threads <= cserver.cthreads); + } + if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/); /* Do NOT add anything below moduleReleaseGIL !!! */