Backported time thread code from enterprise

Former-commit-id: b03eab2a0628df157c1f6b6242bb500ca826ef45
This commit is contained in:
VivekSainiEQ 2021-11-05 00:30:34 +00:00
parent 591f0c77c4
commit 83b23eafd0
4 changed files with 49 additions and 19 deletions

View File

@ -592,7 +592,6 @@ void handleClientsBlockedOnKeys(void) {
* lookup, invalidating the first one. * lookup, invalidating the first one.
* See https://github.com/redis/redis/pull/6554. */ * See https://github.com/redis/redis/pull/6554. */
serverTL->fixed_time_expire++; serverTL->fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on the key. */ /* Serve clients blocked on the key. */
robj *o = lookupKeyWrite(rl->db,rl->key); robj *o = lookupKeyWrite(rl->db,rl->key);

View File

@ -3869,10 +3869,6 @@ void processEventsWhileBlocked(int iel) {
listNode *ln; listNode *ln;
listRewind(g_pserver->clients, &li); listRewind(g_pserver->clients, &li);
/* Update our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
updateCachedTime(0);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later // so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr) while ((ln = listNext(&li)) != nullptr)

View File

@ -63,6 +63,7 @@
#include <algorithm> #include <algorithm>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include <mutex> #include <mutex>
#include <condition_variable>
#include "aelocker.h" #include "aelocker.h"
#include "motd.h" #include "motd.h"
#include "t_nhash.h" #include "t_nhash.h"
@ -94,6 +95,10 @@ struct redisServer server; /* Server global state */
redisServer *g_pserver = &GlobalHidden::server; redisServer *g_pserver = &GlobalHidden::server;
struct redisServerConst cserver; struct redisServerConst cserver;
__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars __thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex;
std::condition_variable time_thread_cv;
int sleeping_threads = 0;
void wakeTimeThread();
/* Our command table. /* Our command table.
* *
@ -2007,7 +2012,7 @@ void databasesCron(bool fMainThread) {
* info or not using the 'update_daylight_info' argument. Normally we update * info or not using the 'update_daylight_info' argument. Normally we update
* such info only when calling this function from serverCron() but not when * such info only when calling this function from serverCron() but not when
* calling it from call(). */ * calling it from call(). */
void updateCachedTime(int update_daylight_info) { void updateCachedTime() {
long long t = ustime(); long long t = ustime();
__atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED); __atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED);
t /= 1000; t /= 1000;
@ -2020,12 +2025,10 @@ void updateCachedTime(int update_daylight_info) {
* context is safe since we will never fork() while here, in the main * context is safe since we will never fork() while here, in the main
* thread. The logging function will call a thread safe version of * thread. The logging function will call a thread safe version of
* localtime that has no locks. */ * localtime that has no locks. */
if (update_daylight_info) {
struct tm tm; struct tm tm;
time_t ut = g_pserver->unixtime; time_t ut = g_pserver->unixtime;
localtime_r(&ut,&tm); localtime_r(&ut,&tm);
__atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED); __atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED);
}
} }
void checkChildrenDone(void) { void checkChildrenDone(void) {
@ -2152,9 +2155,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* handler if we don't return here fast enough. */ * handler if we don't return here fast enough. */
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period); if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
/* Update the time cache. */
updateCachedTime(1);
g_pserver->hz = g_pserver->config_hz; g_pserver->hz = g_pserver->config_hz;
/* Adapt the g_pserver->hz value to the number of configured clients. If we have /* Adapt the g_pserver->hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */ * many clients, we want to call serverCron() with an higher frequency. */
@ -2412,7 +2412,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
void blockingOperationStarts() { void blockingOperationStarts() {
if(!g_pserver->blocking_op_nesting++){ if(!g_pserver->blocking_op_nesting++){
updateCachedTime(0);
g_pserver->blocked_last_cron = g_pserver->mstime; g_pserver->blocked_last_cron = g_pserver->mstime;
} }
} }
@ -2623,6 +2622,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
locker.disarm(); locker.disarm();
if (!fSentReplies) if (!fSentReplies)
handleClientsWithPendingWrites(iel, aof_state); handleClientsWithPendingWrites(iel, aof_state);
// Scope lock_guard
{
std::lock_guard<std::mutex> lock(time_thread_mutex);
sleeping_threads++;
serverAssert(sleeping_threads <= cserver.cthreads);
}
/* Determine whether the modules are enabled before sleeping, and use that result /* Determine whether the modules are enabled before sleeping, and use that result
both here, and after wakeup to avoid double acquire or release of the GIL */ both here, and after wakeup to avoid double acquire or release of the GIL */
serverTL->modulesEnabledThisAeLoop = !!moduleCount(); serverTL->modulesEnabledThisAeLoop = !!moduleCount();
@ -2642,6 +2649,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Don't check here that modules are enabled, rather use the result from beforeSleep Don't check here that modules are enabled, rather use the result from beforeSleep
Otherwise you may double acquire the GIL and cause deadlocks in the module */ Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) { if (!ProcessingEventsWhileBlocked) {
wakeTimeThread();
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
} }
} }
@ -2839,7 +2847,7 @@ void initMasterInfo(redisMaster *master)
void initServerConfig(void) { void initServerConfig(void) {
int j; int j;
updateCachedTime(true); updateCachedTime();
getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE);
g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId(); changeReplicationId();
@ -4006,7 +4014,6 @@ void call(client *c, int flags) {
prev_err_count = g_pserver->stat_total_error_replies; prev_err_count = g_pserver->stat_total_error_replies;
g_pserver->fixed_time_expire++; g_pserver->fixed_time_expire++;
updateCachedTime(0);
incrementMvccTstamp(); incrementMvccTstamp();
elapsedStart(&call_timer); elapsedStart(&call_timer);
try { try {
@ -6534,6 +6541,31 @@ void OnTerminate()
serverPanic("std::teminate() called"); serverPanic("std::teminate() called");
} }
void wakeTimeThread() {
updateCachedTime();
std::lock_guard<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads)
time_thread_cv.notify_one();
sleeping_threads--;
serverAssert(sleeping_threads >= 0);
}
void *timeThreadMain(void*) {
timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = 100;
while (true) {
{
std::unique_lock<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads) {
time_thread_cv.wait(lock);
}
}
updateCachedTime();
clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL);
}
}
void *workerThreadMain(void *parg) void *workerThreadMain(void *parg)
{ {
int iel = (int)((int64_t)parg); int iel = (int)((int64_t)parg);
@ -6930,6 +6962,8 @@ int main(int argc, char **argv) {
setOOMScoreAdj(-1); setOOMScoreAdj(-1);
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
pthread_attr_t tattr; pthread_attr_t tattr;
pthread_attr_init(&tattr); pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB

View File

@ -1536,6 +1536,7 @@ struct redisServerConst {
pid_t pid; /* Main process pid. */ pid_t pid; /* Main process pid. */
time_t stat_starttime; /* Server start time */ time_t stat_starttime; /* Server start time */
pthread_t main_thread_id; /* Main thread id */ pthread_t main_thread_id; /* Main thread id */
pthread_t time_thread_id;
char *configfile; /* Absolute config file path, or NULL */ char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */ char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */ char **exec_argv; /* Executable argv vector (copy). */
@ -2637,7 +2638,7 @@ void resetErrorTableStats(void);
void adjustOpenFilesLimit(void); void adjustOpenFilesLimit(void);
void incrementErrorCount(const char *fullerr, size_t namelen); void incrementErrorCount(const char *fullerr, size_t namelen);
void closeListeningSockets(int unlink_unix_socket); void closeListeningSockets(int unlink_unix_socket);
void updateCachedTime(int update_daylight_info); void updateCachedTime(void);
void resetServerStats(void); void resetServerStats(void);
void activeDefragCycle(void); void activeDefragCycle(void);
unsigned int getLRUClock(void); unsigned int getLRUClock(void);