From 83b23eafd06da5e39506b8c8b3676ad473158fe8 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 5 Nov 2021 00:30:34 +0000 Subject: [PATCH] Backported time thread code from enterprise Former-commit-id: b03eab2a0628df157c1f6b6242bb500ca826ef45 --- src/blocked.cpp | 1 - src/networking.cpp | 4 ---- src/server.cpp | 60 ++++++++++++++++++++++++++++++++++++---------- src/server.h | 3 ++- 4 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/blocked.cpp b/src/blocked.cpp index 55c165336..7a17eaae9 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -592,7 +592,6 @@ void handleClientsBlockedOnKeys(void) { * lookup, invalidating the first one. * See https://github.com/redis/redis/pull/6554. */ serverTL->fixed_time_expire++; - updateCachedTime(0); /* Serve clients blocked on the key. */ robj *o = lookupKeyWrite(rl->db,rl->key); diff --git a/src/networking.cpp b/src/networking.cpp index 91310f9ab..71c9fc663 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3869,10 +3869,6 @@ void processEventsWhileBlocked(int iel) { listNode *ln; listRewind(g_pserver->clients, &li); - /* Update our cached time since it is used to create and update the last - * interaction time with clients and for other important things. */ - updateCachedTime(0); - // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks // so unlock here, and save them for reacquisition later while ((ln = listNext(&li)) != nullptr) diff --git a/src/server.cpp b/src/server.cpp index 7dc38179a..60454fe58 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -63,6 +63,7 @@ #include #include #include +#include #include "aelocker.h" #include "motd.h" #include "t_nhash.h" @@ -94,6 +95,10 @@ struct redisServer server; /* Server global state */ redisServer *g_pserver = &GlobalHidden::server; struct redisServerConst cserver; __thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars +std::mutex time_thread_mutex; +std::condition_variable time_thread_cv; +int sleeping_threads = 0; +void wakeTimeThread(); /* Our command table. * @@ -2007,7 +2012,7 @@ void databasesCron(bool fMainThread) { * info or not using the 'update_daylight_info' argument. Normally we update * such info only when calling this function from serverCron() but not when * calling it from call(). */ -void updateCachedTime(int update_daylight_info) { +void updateCachedTime() { long long t = ustime(); __atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED); t /= 1000; @@ -2020,12 +2025,10 @@ void updateCachedTime(int update_daylight_info) { * context is safe since we will never fork() while here, in the main * thread. The logging function will call a thread safe version of * localtime that has no locks. */ - if (update_daylight_info) { - struct tm tm; - time_t ut = g_pserver->unixtime; - localtime_r(&ut,&tm); - __atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED); - } + struct tm tm; + time_t ut = g_pserver->unixtime; + localtime_r(&ut,&tm); + __atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED); } void checkChildrenDone(void) { @@ -2152,9 +2155,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * handler if we don't return here fast enough. */ if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period); - /* Update the time cache. */ - updateCachedTime(1); - g_pserver->hz = g_pserver->config_hz; /* Adapt the g_pserver->hz value to the number of configured clients. If we have * many clients, we want to call serverCron() with an higher frequency. */ @@ -2412,7 +2412,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData void blockingOperationStarts() { if(!g_pserver->blocking_op_nesting++){ - updateCachedTime(0); g_pserver->blocked_last_cron = g_pserver->mstime; } } @@ -2623,6 +2622,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.disarm(); if (!fSentReplies) handleClientsWithPendingWrites(iel, aof_state); + + // Scope lock_guard + { + std::lock_guard lock(time_thread_mutex); + sleeping_threads++; + serverAssert(sleeping_threads <= cserver.cthreads); + } + /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -2642,6 +2649,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { Don't check here that modules are enabled, rather use the result from beforeSleep Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { + wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); } } @@ -2839,7 +2847,7 @@ void initMasterInfo(redisMaster *master) void initServerConfig(void) { int j; - updateCachedTime(true); + updateCachedTime(); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); @@ -4006,7 +4014,6 @@ void call(client *c, int flags) { prev_err_count = g_pserver->stat_total_error_replies; g_pserver->fixed_time_expire++; - updateCachedTime(0); incrementMvccTstamp(); elapsedStart(&call_timer); try { @@ -6534,6 +6541,31 @@ void OnTerminate() serverPanic("std::teminate() called"); } +void wakeTimeThread() { + updateCachedTime(); + std::lock_guard lock(time_thread_mutex); + if (sleeping_threads >= cserver.cthreads) + time_thread_cv.notify_one(); + sleeping_threads--; + serverAssert(sleeping_threads >= 0); +} + +void *timeThreadMain(void*) { + timespec delay; + delay.tv_sec = 0; + delay.tv_nsec = 100; + while (true) { + { + std::unique_lock lock(time_thread_mutex); + if (sleeping_threads >= cserver.cthreads) { + time_thread_cv.wait(lock); + } + } + updateCachedTime(); + clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); + } +} + void *workerThreadMain(void *parg) { int iel = (int)((int64_t)parg); @@ -6930,6 +6962,8 @@ int main(int argc, char **argv) { setOOMScoreAdj(-1); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); + pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr); + pthread_attr_t tattr; pthread_attr_init(&tattr); pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB diff --git a/src/server.h b/src/server.h index 22ec2fa07..7d813c4a0 100644 --- a/src/server.h +++ b/src/server.h @@ -1536,6 +1536,7 @@ struct redisServerConst { pid_t pid; /* Main process pid. */ time_t stat_starttime; /* Server start time */ pthread_t main_thread_id; /* Main thread id */ + pthread_t time_thread_id; char *configfile; /* Absolute config file path, or NULL */ char *executable; /* Absolute executable file path. */ char **exec_argv; /* Executable argv vector (copy). */ @@ -2637,7 +2638,7 @@ void resetErrorTableStats(void); void adjustOpenFilesLimit(void); void incrementErrorCount(const char *fullerr, size_t namelen); void closeListeningSockets(int unlink_unix_socket); -void updateCachedTime(int update_daylight_info); +void updateCachedTime(void); void resetServerStats(void); void activeDefragCycle(void); unsigned int getLRUClock(void);