diff --git a/src/ae.c b/src/ae.c index 1bf6cbfbf..7aa204250 100644 --- a/src/ae.c +++ b/src/ae.c @@ -428,6 +428,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } + if (eventLoop->beforesleep != NULL) + eventLoop->beforesleep(eventLoop); + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); @@ -522,8 +525,6 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) - eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } diff --git a/src/server.c b/src/server.c index e2b4b6f3d..fd75f25d6 100644 --- a/src/server.c +++ b/src/server.c @@ -2081,14 +2081,26 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +extern int ProcessingEventsWhileBlocked; + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep - * for ready file descriptors. */ + * for ready file descriptors. + * Note: This function is (currently) called from two functions: + * 1. aeMain - The main server loop + * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * If it was called from processEventsWhileBlocked we don't want + * to perform all actions (For example, we don't want to expire + * keys), but we do need to perform some actions. + * The most important is freeClientsInAsyncFreeQueue but we also + * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + if (!ProcessingEventsWhileBlocked) { + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + } /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2099,65 +2111,69 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (!ProcessingEventsWhileBlocked) { + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; + } + + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); } - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); + if (!ProcessingEventsWhileBlocked) { + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); + } } /* This function is called immadiately after the event loop multiplexing @@ -2165,7 +2181,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (moduleCount()) moduleAcquireGIL(); + + if (!ProcessingEventsWhileBlocked) { + if (moduleCount()) moduleAcquireGIL(); + } } /* =========================== Server initialization ======================== */ @@ -2873,6 +2892,11 @@ void initServer(void) { "blocked clients subsystem."); } + /* Register before and after sleep handlers (note this needs to be done + * before loading persistence since it is used by processEventsWhileBlocked. */ + aeSetBeforeSleepProc(server.el,beforeSleep); + aeSetAfterSleepProc(server.el,afterSleep); + /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, @@ -5129,8 +5153,6 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(server.server_cpulist); - aeSetBeforeSleepProc(server.el,beforeSleep); - aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index b364291ee..123e9c8b6 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -128,4 +128,56 @@ start_server {} { # make sure the server is still writable r set x xx } +} + +test {client freed during loading} { + start_server [list overrides [list key-load-delay 10 rdbcompression no]] { + # create a big rdb that will take long to load. it is important + # for keys to be big since the server processes events only once in 2mb. + # 100mb of rdb, 100k keys will load in more than 1 second + r debug populate 100000 key 1000 + + catch { + r debug restart + } + + set stdout [srv 0 stdout] + while 1 { + # check that the new server actually started and is ready for connections + if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} { + break + } + after 10 + } + # make sure it's still loading + assert_equal [s loading] 1 + + # connect and disconnect 10 clients + set clients {} + for {set j 0} {$j < 10} {incr j} { + lappend clients [redis_deferring_client] + } + foreach rd $clients { + $rd debug log bla + } + foreach rd $clients { + $rd read + } + foreach rd $clients { + $rd close + } + + # make sure the server freed the clients + wait_for_condition 100 100 { + [s connected_clients] < 3 + } else { + fail "clients didn't disconnect" + } + + # make sure it's still loading + assert_equal [s loading] 1 + + # no need to keep waiting for loading to complete + exec kill [srv 0 pid] + } } \ No newline at end of file