Unify beforeSleep handling

Former-commit-id: 1cb48c7bf6a7e91e728a677902a7bfc64fe80dd6
This commit is contained in:
John Sully 2020-05-29 17:44:55 -04:00
parent 343745f218
commit e380087cdb
5 changed files with 31 additions and 30 deletions

View File

@ -751,6 +751,7 @@ struct client *createAOFClient(void) {
c->bufpos = 0; c->bufpos = 0;
c->flags = 0; c->flags = 0;
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->btype = BLOCKED_NONE; c->btype = BLOCKED_NONE;
/* We set the fake client as a replica waiting for the synchronization /* We set the fake client as a replica waiting for the synchronization
* so that Redis will not try to send replies to this client. */ * so that Redis will not try to send replies to this client. */

View File

@ -135,6 +135,7 @@ client *createClient(connection *conn, int iel) {
c->sentlenAsync = 0; c->sentlenAsync = 0;
c->flags = 0; c->flags = 0;
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->ctime = c->lastinteraction = g_pserver->unixtime; c->ctime = c->lastinteraction = g_pserver->unixtime;
/* If the default user does not require authentication, the user is /* If the default user does not require authentication, the user is
* directly authenticated. */ * directly authenticated. */
@ -1850,29 +1851,21 @@ void ProcessPendingAsyncWrites()
std::atomic_thread_fence(std::memory_order_seq_cst); std::atomic_thread_fence(std::memory_order_seq_cst);
if (c->casyncOpsPending == 0)
{
if (FCorrectThread(c)) if (FCorrectThread(c))
{ {
prepareClientToWrite(c, false); // queue an event prepareClientToWrite(c, false); // queue an event
} }
else else
{ {
// We need to start the write on the client's thread if (!c->fPendingAsyncWriteHandler) {
if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{ c->fPendingAsyncWriteHandler = true;
// Install a write handler. Don't do the actual write here since we don't want bool fResult = c->postFunction([](client *c) {
// to duplicate the throttling and safety mechanisms of the normal write code c->fPendingAsyncWriteHandler = false;
std::lock_guard<decltype(c->lock)> lock(c->lock);
serverAssert(c->casyncOpsPending > 0);
c->casyncOpsPending--;
connSetWriteHandler(c->conn, sendReplyToClient, true); connSetWriteHandler(c->conn, sendReplyToClient, true);
}, false) == AE_ERR });
)
{ if (!fResult)
// Posting the function failed c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset
continue; // We can retry later in the cron
}
++c->casyncOpsPending; // race is handled by the client lock in the lambda
} }
} }
} }

View File

@ -1908,8 +1908,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
processUnblockedClients(IDX_EVENT_LOOP_MAIN); processUnblockedClients(IDX_EVENT_LOOP_MAIN);
} }
ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up
/* Software watchdog: deliver the SIGALRM that will reach the signal /* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */ * handler if we don't return here fast enough. */
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period); if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
@ -2143,6 +2141,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
&ei); &ei);
/* CRON functions may trigger async writes, so do this last */
ProcessPendingAsyncWrites();
g_pserver->cronloops++; g_pserver->cronloops++;
return 1000/g_pserver->hz; return 1000/g_pserver->hz;
} }
@ -2192,6 +2193,7 @@ extern int ProcessingEventsWhileBlocked;
* call some other low-risk functions. */ * call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) { void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
int iel = ielFromEventLoop(eventLoop);
/* Handle precise timeouts of blocked clients. */ /* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout(); handleBlockedClientsTimeout();
@ -2225,9 +2227,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (moduleCount()) moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); if (moduleCount()) moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients))
{ {
processUnblockedClients(IDX_EVENT_LOOP_MAIN); processUnblockedClients(iel);
} }
/* Send all the slaves an ACK request if at least one client blocked /* Send all the slaves an ACK request if at least one client blocked
@ -2258,11 +2260,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
int aof_state = g_pserver->aof_state; int aof_state = g_pserver->aof_state;
aeReleaseLock(); aeReleaseLock();
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN, aof_state); handleClientsWithPendingWrites(iel, aof_state);
aeAcquireLock(); aeAcquireLock();
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); freeClientsInAsyncFreeQueue(iel);
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
@ -2958,7 +2960,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->tlsfd_count = 0; pvar->tlsfd_count = 0;
pvar->cclients = 0; pvar->cclients = 0;
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
aeSetBeforeSleepProc(pvar->el, fMain ? beforeSleep : beforeSleepLite, fMain ? 0 : AE_SLEEP_THREADSAFE); aeSetBeforeSleepProc(pvar->el, beforeSleep, 0);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
pvar->current_client = nullptr; pvar->current_client = nullptr;
pvar->clients_paused = 0; pvar->clients_paused = 0;

View File

@ -1268,6 +1268,7 @@ typedef struct client {
std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */ std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */
int casyncOpsPending; int casyncOpsPending;
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
int fPendingAsyncWriteHandler;
int authenticated; /* Needed when the default user requires auth. */ int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a replica. */ int replstate; /* Replication state if this is a replica. */
int repl_put_online_on_ack; /* Install replica write handler on ACK. */ int repl_put_online_on_ack; /* Install replica write handler on ACK. */

View File

@ -428,6 +428,7 @@ proc signal_idle_client fd {
lappend ::active_clients $fd lappend ::active_clients $fd
incr ::next_test incr ::next_test
if {$::loop && $::next_test == [llength $::all_tests]} { if {$::loop && $::next_test == [llength $::all_tests]} {
incr ::loop -1
set ::next_test 0 set ::next_test 0
} }
} elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} { } elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} {
@ -612,7 +613,10 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--stop}} { } elseif {$opt eq {--stop}} {
set ::stop_on_failure 1 set ::stop_on_failure 1
} elseif {$opt eq {--loop}} { } elseif {$opt eq {--loop}} {
set ::loop 1 set ::loop 1000
} elseif {$opt eq {--loopn}} {
set ::loop [expr $arg - 1]
incr j
} elseif {$opt eq {--timeout}} { } elseif {$opt eq {--timeout}} {
set ::timeout $arg set ::timeout $arg
incr j incr j