diff --git a/src/ae.c b/src/ae.c index 7aa204250..379cfd1e6 100644 --- a/src/ae.c +++ b/src/ae.c @@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) { * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) @@ -428,7 +429,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } - if (eventLoop->beforesleep != NULL) + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when @@ -525,7 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); + aeProcessEvents(eventLoop, AE_ALL_EVENTS| + AE_CALL_BEFORE_SLEEP| + AE_CALL_AFTER_SLEEP); } } diff --git a/src/ae.h b/src/ae.h index 9acd72434..63b306615 100644 --- a/src/ae.h +++ b/src/ae.h @@ -47,11 +47,12 @@ things to disk before sending replies, and want to do that in a group fashion. */ -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 +#define AE_FILE_EVENTS (1<<0) +#define AE_TIME_EVENTS (1<<1) #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 -#define AE_CALL_AFTER_SLEEP 8 +#define AE_DONT_WAIT (1<<2) +#define AE_CALL_BEFORE_SLEEP (1<<3) +#define AE_CALL_AFTER_SLEEP (1<<4) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 diff --git a/src/networking.c b/src/networking.c index 75c0c16b1..ce5b0ae38 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2863,8 +2863,9 @@ int processEventsWhileBlocked(void) { ProcessingEventsWhileBlocked = 1; while (iterations--) { int events = 0; - events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(); + events += aeProcessEvents(server.el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); if (!events) break; count += events; } diff --git a/src/server.c b/src/server.c index d272a6fe0..73a4a9f55 100644 --- a/src/server.c +++ b/src/server.c @@ -2092,22 +2092,33 @@ extern int ProcessingEventsWhileBlocked; /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. + * * Note: This function is (currently) called from two functions: * 1. aeMain - The main server loop * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * * If it was called from processEventsWhileBlocked we don't want * to perform all actions (For example, we don't want to expire * keys), but we do need to perform some actions. + * * The most important is freeClientsInAsyncFreeQueue but we also * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (!ProcessingEventsWhileBlocked) { - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + /* Just call a subset of vital functions in case we are re-entering + * the event loop from processEventsWhileBlocked(). */ + if (ProcessingEventsWhileBlocked) { + handleClientsWithPendingReadsUsingThreads(); + tlsProcessPendingData(); + handleClientsWithPendingWrites(); + freeClientsInAsyncFreeQueue(); + return; } + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2117,69 +2128,65 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - if (!ProcessingEventsWhileBlocked) { - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; - } - - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; } + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); + /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - if (!ProcessingEventsWhileBlocked) { - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); - } + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); } /* This function is called immadiately after the event loop multiplexing