Some rework of #7234.
This commit is contained in:
parent
9da134cd88
commit
8bf660af90
7
src/ae.c
7
src/ae.c
@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||
* if flags has AE_DONT_WAIT set the function returns ASAP until all
|
||||
* the events that's possible to process without to wait are processed.
|
||||
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
|
||||
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
|
||||
*
|
||||
* The function returns the number of events processed. */
|
||||
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
@ -428,7 +429,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
tvp = &tv;
|
||||
}
|
||||
|
||||
if (eventLoop->beforesleep != NULL)
|
||||
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
|
||||
eventLoop->beforesleep(eventLoop);
|
||||
|
||||
/* Call the multiplexing API, will return only on timeout or when
|
||||
@ -525,7 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) {
|
||||
void aeMain(aeEventLoop *eventLoop) {
|
||||
eventLoop->stop = 0;
|
||||
while (!eventLoop->stop) {
|
||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
|
||||
AE_CALL_BEFORE_SLEEP|
|
||||
AE_CALL_AFTER_SLEEP);
|
||||
}
|
||||
}
|
||||
|
||||
|
9
src/ae.h
9
src/ae.h
@ -47,11 +47,12 @@
|
||||
things to disk before sending replies, and want
|
||||
to do that in a group fashion. */
|
||||
|
||||
#define AE_FILE_EVENTS 1
|
||||
#define AE_TIME_EVENTS 2
|
||||
#define AE_FILE_EVENTS (1<<0)
|
||||
#define AE_TIME_EVENTS (1<<1)
|
||||
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
|
||||
#define AE_DONT_WAIT 4
|
||||
#define AE_CALL_AFTER_SLEEP 8
|
||||
#define AE_DONT_WAIT (1<<2)
|
||||
#define AE_CALL_BEFORE_SLEEP (1<<3)
|
||||
#define AE_CALL_AFTER_SLEEP (1<<4)
|
||||
|
||||
#define AE_NOMORE -1
|
||||
#define AE_DELETED_EVENT_ID -1
|
||||
|
@ -2863,8 +2863,9 @@ int processEventsWhileBlocked(void) {
|
||||
ProcessingEventsWhileBlocked = 1;
|
||||
while (iterations--) {
|
||||
int events = 0;
|
||||
events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||
events += handleClientsWithPendingWrites();
|
||||
events += aeProcessEvents(server.el,
|
||||
AE_FILE_EVENTS|AE_DONT_WAIT|
|
||||
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
|
||||
if (!events) break;
|
||||
count += events;
|
||||
}
|
||||
|
111
src/server.c
111
src/server.c
@ -2092,22 +2092,33 @@ extern int ProcessingEventsWhileBlocked;
|
||||
/* This function gets called every time Redis is entering the
|
||||
* main loop of the event driven library, that is, before to sleep
|
||||
* for ready file descriptors.
|
||||
*
|
||||
* Note: This function is (currently) called from two functions:
|
||||
* 1. aeMain - The main server loop
|
||||
* 2. processEventsWhileBlocked - Process clients during RDB/AOF load
|
||||
*
|
||||
* If it was called from processEventsWhileBlocked we don't want
|
||||
* to perform all actions (For example, we don't want to expire
|
||||
* keys), but we do need to perform some actions.
|
||||
*
|
||||
* The most important is freeClientsInAsyncFreeQueue but we also
|
||||
* call some other low-risk functions. */
|
||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
handleBlockedClientsTimeout();
|
||||
/* Just call a subset of vital functions in case we are re-entering
|
||||
* the event loop from processEventsWhileBlocked(). */
|
||||
if (ProcessingEventsWhileBlocked) {
|
||||
handleClientsWithPendingReadsUsingThreads();
|
||||
tlsProcessPendingData();
|
||||
handleClientsWithPendingWrites();
|
||||
freeClientsInAsyncFreeQueue();
|
||||
return;
|
||||
}
|
||||
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
handleBlockedClientsTimeout();
|
||||
|
||||
/* We should handle pending reads clients ASAP after event loop. */
|
||||
handleClientsWithPendingReadsUsingThreads();
|
||||
|
||||
@ -2117,69 +2128,65 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* If tls still has pending unread data don't sleep at all. */
|
||||
aeSetDontWait(server.el, tlsHasPendingData());
|
||||
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
/* Call the Redis Cluster before sleep function. Note that this function
|
||||
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
||||
* so it's a good idea to call it before serving the unblocked clients
|
||||
* later in this function. */
|
||||
if (server.cluster_enabled) clusterBeforeSleep();
|
||||
/* Call the Redis Cluster before sleep function. Note that this function
|
||||
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
||||
* so it's a good idea to call it before serving the unblocked clients
|
||||
* later in this function. */
|
||||
if (server.cluster_enabled) clusterBeforeSleep();
|
||||
|
||||
/* Run a fast expire cycle (the called function will return
|
||||
* ASAP if a fast cycle is not needed). */
|
||||
if (server.active_expire_enabled && server.masterhost == NULL)
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||
/* Run a fast expire cycle (the called function will return
|
||||
* ASAP if a fast cycle is not needed). */
|
||||
if (server.active_expire_enabled && server.masterhost == NULL)
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||
|
||||
/* Unblock all the clients blocked for synchronous replication
|
||||
* in WAIT. */
|
||||
if (listLength(server.clients_waiting_acks))
|
||||
processClientsWaitingReplicas();
|
||||
/* Unblock all the clients blocked for synchronous replication
|
||||
* in WAIT. */
|
||||
if (listLength(server.clients_waiting_acks))
|
||||
processClientsWaitingReplicas();
|
||||
|
||||
/* Check if there are clients unblocked by modules that implement
|
||||
* blocking commands. */
|
||||
if (moduleCount()) moduleHandleBlockedClients();
|
||||
/* Check if there are clients unblocked by modules that implement
|
||||
* blocking commands. */
|
||||
if (moduleCount()) moduleHandleBlockedClients();
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.unblocked_clients))
|
||||
processUnblockedClients();
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.unblocked_clients))
|
||||
processUnblockedClients();
|
||||
|
||||
/* Send all the slaves an ACK request if at least one client blocked
|
||||
* during the previous event loop iteration. Note that we do this after
|
||||
* processUnblockedClients(), so if there are multiple pipelined WAITs
|
||||
* and the just unblocked WAIT gets blocked again, we don't have to wait
|
||||
* a server cron cycle in absence of other event loop events. See #6623. */
|
||||
if (server.get_ack_from_slaves) {
|
||||
robj *argv[3];
|
||||
/* Send all the slaves an ACK request if at least one client blocked
|
||||
* during the previous event loop iteration. Note that we do this after
|
||||
* processUnblockedClients(), so if there are multiple pipelined WAITs
|
||||
* and the just unblocked WAIT gets blocked again, we don't have to wait
|
||||
* a server cron cycle in absence of other event loop events. See #6623. */
|
||||
if (server.get_ack_from_slaves) {
|
||||
robj *argv[3];
|
||||
|
||||
argv[0] = createStringObject("REPLCONF",8);
|
||||
argv[1] = createStringObject("GETACK",6);
|
||||
argv[2] = createStringObject("*",1); /* Not used argument. */
|
||||
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
|
||||
decrRefCount(argv[0]);
|
||||
decrRefCount(argv[1]);
|
||||
decrRefCount(argv[2]);
|
||||
server.get_ack_from_slaves = 0;
|
||||
}
|
||||
|
||||
/* Send the invalidation messages to clients participating to the
|
||||
* client side caching protocol in broadcasting (BCAST) mode. */
|
||||
trackingBroadcastInvalidationMessages();
|
||||
|
||||
/* Write the AOF buffer on disk */
|
||||
flushAppendOnlyFile(0);
|
||||
argv[0] = createStringObject("REPLCONF",8);
|
||||
argv[1] = createStringObject("GETACK",6);
|
||||
argv[2] = createStringObject("*",1); /* Not used argument. */
|
||||
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
|
||||
decrRefCount(argv[0]);
|
||||
decrRefCount(argv[1]);
|
||||
decrRefCount(argv[2]);
|
||||
server.get_ack_from_slaves = 0;
|
||||
}
|
||||
|
||||
/* Send the invalidation messages to clients participating to the
|
||||
* client side caching protocol in broadcasting (BCAST) mode. */
|
||||
trackingBroadcastInvalidationMessages();
|
||||
|
||||
/* Write the AOF buffer on disk */
|
||||
flushAppendOnlyFile(0);
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWritesUsingThreads();
|
||||
|
||||
/* Close clients that need to be closed asynchronous */
|
||||
freeClientsInAsyncFreeQueue();
|
||||
|
||||
if (!ProcessingEventsWhileBlocked) {
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
if (moduleCount()) moduleReleaseGIL();
|
||||
}
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
if (moduleCount()) moduleReleaseGIL();
|
||||
}
|
||||
|
||||
/* This function is called immadiately after the event loop multiplexing
|
||||
|
Loading…
x
Reference in New Issue
Block a user