From e380087cdb00da8a88c47c7a56e4f0a5266942d5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 29 May 2020 17:44:55 -0400 Subject: [PATCH] Unify beforeSleep handling Former-commit-id: 1cb48c7bf6a7e91e728a677902a7bfc64fe80dd6 --- src/aof.cpp | 1 + src/networking.cpp | 37 +++++++++++++++---------------------- src/server.cpp | 16 +++++++++------- src/server.h | 1 + tests/test_helper.tcl | 6 +++++- 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index e320f6cfd..9b33caf39 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -751,6 +751,7 @@ struct client *createAOFClient(void) { c->bufpos = 0; c->flags = 0; c->fPendingAsyncWrite = FALSE; + c->fPendingAsyncWriteHandler = FALSE; c->btype = BLOCKED_NONE; /* We set the fake client as a replica waiting for the synchronization * so that Redis will not try to send replies to this client. */ diff --git a/src/networking.cpp b/src/networking.cpp index 8211c8d88..ac248d41f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -135,6 +135,7 @@ client *createClient(connection *conn, int iel) { c->sentlenAsync = 0; c->flags = 0; c->fPendingAsyncWrite = FALSE; + c->fPendingAsyncWriteHandler = FALSE; c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -1850,29 +1851,21 @@ void ProcessPendingAsyncWrites() std::atomic_thread_fence(std::memory_order_seq_cst); - if (c->casyncOpsPending == 0) + if (FCorrectThread(c)) { - if (FCorrectThread(c)) - { - prepareClientToWrite(c, false); // queue an event - } - else - { - // We need to start the write on the client's thread - if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{ - // Install a write handler. Don't do the actual write here since we don't want - // to duplicate the throttling and safety mechanisms of the normal write code - std::lock_guardlock)> lock(c->lock); - serverAssert(c->casyncOpsPending > 0); - c->casyncOpsPending--; - connSetWriteHandler(c->conn, sendReplyToClient, true); - }, false) == AE_ERR - ) - { - // Posting the function failed - continue; // We can retry later in the cron - } - ++c->casyncOpsPending; // race is handled by the client lock in the lambda + prepareClientToWrite(c, false); // queue an event + } + else + { + if (!c->fPendingAsyncWriteHandler) { + c->fPendingAsyncWriteHandler = true; + bool fResult = c->postFunction([](client *c) { + c->fPendingAsyncWriteHandler = false; + connSetWriteHandler(c->conn, sendReplyToClient, true); + }); + + if (!fResult) + c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset } } } diff --git a/src/server.cpp b/src/server.cpp index 8b2b03b43..92bff8cd7 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1907,8 +1907,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { { processUnblockedClients(IDX_EVENT_LOOP_MAIN); } - - ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ @@ -2143,6 +2141,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { &ei); + /* CRON functions may trigger async writes, so do this last */ + ProcessPendingAsyncWrites(); + g_pserver->cronloops++; return 1000/g_pserver->hz; } @@ -2192,6 +2193,7 @@ extern int ProcessingEventsWhileBlocked; * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + int iel = ielFromEventLoop(eventLoop); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2225,9 +2227,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (moduleCount()) moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) + if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { - processUnblockedClients(IDX_EVENT_LOOP_MAIN); + processUnblockedClients(iel); } /* Send all the slaves an ACK request if at least one client blocked @@ -2258,11 +2260,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ int aof_state = g_pserver->aof_state; aeReleaseLock(); - handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN, aof_state); + handleClientsWithPendingWrites(iel, aof_state); aeAcquireLock(); /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); + freeClientsInAsyncFreeQueue(iel); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2958,7 +2960,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->tlsfd_count = 0; pvar->cclients = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); - aeSetBeforeSleepProc(pvar->el, fMain ? beforeSleep : beforeSleepLite, fMain ? 0 : AE_SLEEP_THREADSAFE); + aeSetBeforeSleepProc(pvar->el, beforeSleep, 0); aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); pvar->current_client = nullptr; pvar->clients_paused = 0; diff --git a/src/server.h b/src/server.h index 29bc3ffdd..712ffacb4 100644 --- a/src/server.h +++ b/src/server.h @@ -1268,6 +1268,7 @@ typedef struct client { std::atomic flags; /* Client flags: CLIENT_* macros. */ int casyncOpsPending; int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ + int fPendingAsyncWriteHandler; int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a replica. */ int repl_put_online_on_ack; /* Install replica write handler on ACK. */ diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 724149d94..85b89f711 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -428,6 +428,7 @@ proc signal_idle_client fd { lappend ::active_clients $fd incr ::next_test if {$::loop && $::next_test == [llength $::all_tests]} { + incr ::loop -1 set ::next_test 0 } } elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} { @@ -612,7 +613,10 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--stop}} { set ::stop_on_failure 1 } elseif {$opt eq {--loop}} { - set ::loop 1 + set ::loop 1000 + } elseif {$opt eq {--loopn}} { + set ::loop [expr $arg - 1] + incr j } elseif {$opt eq {--timeout}} { set ::timeout $arg incr j