Merge pull request #375 from VivekSainiEQ/time_thread_backport

Backport the time thread code from the enterprise version of KeyDB.

Also fix an issue with clustering where connections are potentially written to after being closed.

Former-commit-id: 4b6bdcbccef3c4c93d85c62281b266e452912bac
This commit is contained in:
Vivek Saini 2021-11-15 12:57:15 -05:00 committed by GitHub
commit 2c6548213e
5 changed files with 53 additions and 20 deletions

View File

@ -592,7 +592,6 @@ void handleClientsBlockedOnKeys(void) {
* lookup, invalidating the first one. * lookup, invalidating the first one.
* See https://github.com/redis/redis/pull/6554. */ * See https://github.com/redis/redis/pull/6554. */
serverTL->fixed_time_expire++; serverTL->fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on the key. */ /* Serve clients blocked on the key. */
robj *o = lookupKeyWrite(rl->db,rl->key); robj *o = lookupKeyWrite(rl->db,rl->key);

View File

@ -2447,7 +2447,10 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0) if (sdslen(link->sndbuf) == 0 && msglen != 0)
{ {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link] { aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link] {
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); /* The connection could be timed out before this posted function executes (thanks to TCP keepalive).
* So check that the connection is still there before setting the write handler, otherwise you segfault */
if (link->conn != nullptr)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
}); });
} }

View File

@ -3867,10 +3867,6 @@ void processEventsWhileBlocked(int iel) {
listNode *ln; listNode *ln;
listRewind(g_pserver->clients, &li); listRewind(g_pserver->clients, &li);
/* Update our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
updateCachedTime(0);
// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later // so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr) while ((ln = listNext(&li)) != nullptr)

View File

@ -63,6 +63,7 @@
#include <algorithm> #include <algorithm>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include <mutex> #include <mutex>
#include <condition_variable>
#include "aelocker.h" #include "aelocker.h"
#include "motd.h" #include "motd.h"
#include "t_nhash.h" #include "t_nhash.h"
@ -94,6 +95,10 @@ struct redisServer server; /* Server global state */
redisServer *g_pserver = &GlobalHidden::server; redisServer *g_pserver = &GlobalHidden::server;
struct redisServerConst cserver; struct redisServerConst cserver;
__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars __thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex;
std::condition_variable time_thread_cv;
int sleeping_threads = 0;
void wakeTimeThread();
/* Our command table. /* Our command table.
* *
@ -2007,7 +2012,7 @@ void databasesCron(bool fMainThread) {
* info or not using the 'update_daylight_info' argument. Normally we update * info or not using the 'update_daylight_info' argument. Normally we update
* such info only when calling this function from serverCron() but not when * such info only when calling this function from serverCron() but not when
* calling it from call(). */ * calling it from call(). */
void updateCachedTime(int update_daylight_info) { void updateCachedTime() {
long long t = ustime(); long long t = ustime();
__atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED); __atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED);
t /= 1000; t /= 1000;
@ -2020,12 +2025,10 @@ void updateCachedTime(int update_daylight_info) {
* context is safe since we will never fork() while here, in the main * context is safe since we will never fork() while here, in the main
* thread. The logging function will call a thread safe version of * thread. The logging function will call a thread safe version of
* localtime that has no locks. */ * localtime that has no locks. */
if (update_daylight_info) { struct tm tm;
struct tm tm; time_t ut = g_pserver->unixtime;
time_t ut = g_pserver->unixtime; localtime_r(&ut,&tm);
localtime_r(&ut,&tm); __atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED);
__atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED);
}
} }
void checkChildrenDone(void) { void checkChildrenDone(void) {
@ -2152,9 +2155,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* handler if we don't return here fast enough. */ * handler if we don't return here fast enough. */
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period); if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
/* Update the time cache. */
updateCachedTime(1);
g_pserver->hz = g_pserver->config_hz; g_pserver->hz = g_pserver->config_hz;
/* Adapt the g_pserver->hz value to the number of configured clients. If we have /* Adapt the g_pserver->hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */ * many clients, we want to call serverCron() with an higher frequency. */
@ -2412,7 +2412,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
void blockingOperationStarts() { void blockingOperationStarts() {
if(!g_pserver->blocking_op_nesting++){ if(!g_pserver->blocking_op_nesting++){
updateCachedTime(0);
g_pserver->blocked_last_cron = g_pserver->mstime; g_pserver->blocked_last_cron = g_pserver->mstime;
} }
} }
@ -2623,6 +2622,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
locker.disarm(); locker.disarm();
if (!fSentReplies) if (!fSentReplies)
handleClientsWithPendingWrites(iel, aof_state); handleClientsWithPendingWrites(iel, aof_state);
// Scope lock_guard
{
std::lock_guard<std::mutex> lock(time_thread_mutex);
sleeping_threads++;
serverAssert(sleeping_threads <= cserver.cthreads);
}
/* Determine whether the modules are enabled before sleeping, and use that result /* Determine whether the modules are enabled before sleeping, and use that result
both here, and after wakeup to avoid double acquire or release of the GIL */ both here, and after wakeup to avoid double acquire or release of the GIL */
serverTL->modulesEnabledThisAeLoop = !!moduleCount(); serverTL->modulesEnabledThisAeLoop = !!moduleCount();
@ -2642,6 +2649,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Don't check here that modules are enabled, rather use the result from beforeSleep Don't check here that modules are enabled, rather use the result from beforeSleep
Otherwise you may double acquire the GIL and cause deadlocks in the module */ Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) { if (!ProcessingEventsWhileBlocked) {
wakeTimeThread();
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
} }
} }
@ -2839,7 +2847,7 @@ void initMasterInfo(redisMaster *master)
void initServerConfig(void) { void initServerConfig(void) {
int j; int j;
updateCachedTime(true); updateCachedTime();
getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE);
g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId(); changeReplicationId();
@ -4006,7 +4014,6 @@ void call(client *c, int flags) {
prev_err_count = g_pserver->stat_total_error_replies; prev_err_count = g_pserver->stat_total_error_replies;
g_pserver->fixed_time_expire++; g_pserver->fixed_time_expire++;
updateCachedTime(0);
incrementMvccTstamp(); incrementMvccTstamp();
elapsedStart(&call_timer); elapsedStart(&call_timer);
try { try {
@ -6534,6 +6541,31 @@ void OnTerminate()
serverPanic("std::teminate() called"); serverPanic("std::teminate() called");
} }
void wakeTimeThread() {
updateCachedTime();
std::lock_guard<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads)
time_thread_cv.notify_one();
sleeping_threads--;
serverAssert(sleeping_threads >= 0);
}
void *timeThreadMain(void*) {
timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = 100;
while (true) {
{
std::unique_lock<std::mutex> lock(time_thread_mutex);
if (sleeping_threads >= cserver.cthreads) {
time_thread_cv.wait(lock);
}
}
updateCachedTime();
clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL);
}
}
void *workerThreadMain(void *parg) void *workerThreadMain(void *parg)
{ {
int iel = (int)((int64_t)parg); int iel = (int)((int64_t)parg);
@ -6930,6 +6962,8 @@ int main(int argc, char **argv) {
setOOMScoreAdj(-1); setOOMScoreAdj(-1);
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
pthread_attr_t tattr; pthread_attr_t tattr;
pthread_attr_init(&tattr); pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB

View File

@ -1536,6 +1536,7 @@ struct redisServerConst {
pid_t pid; /* Main process pid. */ pid_t pid; /* Main process pid. */
time_t stat_starttime; /* Server start time */ time_t stat_starttime; /* Server start time */
pthread_t main_thread_id; /* Main thread id */ pthread_t main_thread_id; /* Main thread id */
pthread_t time_thread_id;
char *configfile; /* Absolute config file path, or NULL */ char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */ char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */ char **exec_argv; /* Executable argv vector (copy). */
@ -2637,7 +2638,7 @@ void resetErrorTableStats(void);
void adjustOpenFilesLimit(void); void adjustOpenFilesLimit(void);
void incrementErrorCount(const char *fullerr, size_t namelen); void incrementErrorCount(const char *fullerr, size_t namelen);
void closeListeningSockets(int unlink_unix_socket); void closeListeningSockets(int unlink_unix_socket);
void updateCachedTime(int update_daylight_info); void updateCachedTime(void);
void resetServerStats(void); void resetServerStats(void);
void activeDefragCycle(void); void activeDefragCycle(void);
unsigned int getLRUClock(void); unsigned int getLRUClock(void);