From 905e28ee87eb0bbea448ec28c11dc74991359bf5 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 May 2020 19:13:47 +0300 Subject: [PATCH 1/3] fix redis 6.0 not freeing closed connections during loading. This bug was introduced by a recent change in which readQueryFromClient is using freeClientAsync, and despite the fact that now freeClientsInAsyncFreeQueue is in beforeSleep, that's not enough since it's not called during loading in processEventsWhileBlocked. furthermore, afterSleep was called in that case but beforeSleep wasn't. This bug also caused slowness sine the level-triggered mode of epoll kept signaling these connections as readable causing us to keep doing connRead again and again for ll of these, which keep accumulating. now both before and after sleep are called, but not all of their actions are performed during loading, some are only reserved for the main loop. fixes issue #7215 --- src/ae.c | 5 +- src/server.c | 124 ++++++++++++++++++++++---------------- tests/integration/rdb.tcl | 52 ++++++++++++++++ 3 files changed, 128 insertions(+), 53 deletions(-) diff --git a/src/ae.c b/src/ae.c index 1bf6cbfbf..7aa204250 100644 --- a/src/ae.c +++ b/src/ae.c @@ -428,6 +428,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } + if (eventLoop->beforesleep != NULL) + eventLoop->beforesleep(eventLoop); + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); @@ -522,8 +525,6 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) - eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } diff --git a/src/server.c b/src/server.c index e2b4b6f3d..fd75f25d6 100644 --- a/src/server.c +++ b/src/server.c @@ -2081,14 +2081,26 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +extern int ProcessingEventsWhileBlocked; + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep - * for ready file descriptors. */ + * for ready file descriptors. + * Note: This function is (currently) called from two functions: + * 1. aeMain - The main server loop + * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * If it was called from processEventsWhileBlocked we don't want + * to perform all actions (For example, we don't want to expire + * keys), but we do need to perform some actions. + * The most important is freeClientsInAsyncFreeQueue but we also + * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + if (!ProcessingEventsWhileBlocked) { + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + } /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2099,65 +2111,69 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (!ProcessingEventsWhileBlocked) { + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; + } + + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); } - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); + if (!ProcessingEventsWhileBlocked) { + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); + } } /* This function is called immadiately after the event loop multiplexing @@ -2165,7 +2181,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (moduleCount()) moduleAcquireGIL(); + + if (!ProcessingEventsWhileBlocked) { + if (moduleCount()) moduleAcquireGIL(); + } } /* =========================== Server initialization ======================== */ @@ -2873,6 +2892,11 @@ void initServer(void) { "blocked clients subsystem."); } + /* Register before and after sleep handlers (note this needs to be done + * before loading persistence since it is used by processEventsWhileBlocked. */ + aeSetBeforeSleepProc(server.el,beforeSleep); + aeSetAfterSleepProc(server.el,afterSleep); + /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, @@ -5129,8 +5153,6 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(server.server_cpulist); - aeSetBeforeSleepProc(server.el,beforeSleep); - aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index b364291ee..123e9c8b6 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -128,4 +128,56 @@ start_server {} { # make sure the server is still writable r set x xx } +} + +test {client freed during loading} { + start_server [list overrides [list key-load-delay 10 rdbcompression no]] { + # create a big rdb that will take long to load. it is important + # for keys to be big since the server processes events only once in 2mb. + # 100mb of rdb, 100k keys will load in more than 1 second + r debug populate 100000 key 1000 + + catch { + r debug restart + } + + set stdout [srv 0 stdout] + while 1 { + # check that the new server actually started and is ready for connections + if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} { + break + } + after 10 + } + # make sure it's still loading + assert_equal [s loading] 1 + + # connect and disconnect 10 clients + set clients {} + for {set j 0} {$j < 10} {incr j} { + lappend clients [redis_deferring_client] + } + foreach rd $clients { + $rd debug log bla + } + foreach rd $clients { + $rd read + } + foreach rd $clients { + $rd close + } + + # make sure the server freed the clients + wait_for_condition 100 100 { + [s connected_clients] < 3 + } else { + fail "clients didn't disconnect" + } + + # make sure it's still loading + assert_equal [s loading] 1 + + # no need to keep waiting for loading to complete + exec kill [srv 0 pid] + } } \ No newline at end of file From 27e25e9d1ed1b0ba3f639eb64c8f27b852753045 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 12 May 2020 13:07:44 +0200 Subject: [PATCH 2/3] Some rework of #7234. --- src/ae.c | 7 ++- src/ae.h | 9 ++-- src/networking.c | 5 ++- src/server.c | 111 +++++++++++++++++++++++++---------------------- 4 files changed, 72 insertions(+), 60 deletions(-) diff --git a/src/ae.c b/src/ae.c index 7aa204250..379cfd1e6 100644 --- a/src/ae.c +++ b/src/ae.c @@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) { * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) @@ -428,7 +429,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } - if (eventLoop->beforesleep != NULL) + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when @@ -525,7 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); + aeProcessEvents(eventLoop, AE_ALL_EVENTS| + AE_CALL_BEFORE_SLEEP| + AE_CALL_AFTER_SLEEP); } } diff --git a/src/ae.h b/src/ae.h index 9acd72434..63b306615 100644 --- a/src/ae.h +++ b/src/ae.h @@ -47,11 +47,12 @@ things to disk before sending replies, and want to do that in a group fashion. */ -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 +#define AE_FILE_EVENTS (1<<0) +#define AE_TIME_EVENTS (1<<1) #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 -#define AE_CALL_AFTER_SLEEP 8 +#define AE_DONT_WAIT (1<<2) +#define AE_CALL_BEFORE_SLEEP (1<<3) +#define AE_CALL_AFTER_SLEEP (1<<4) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 diff --git a/src/networking.c b/src/networking.c index 75c0c16b1..ce5b0ae38 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2863,8 +2863,9 @@ int processEventsWhileBlocked(void) { ProcessingEventsWhileBlocked = 1; while (iterations--) { int events = 0; - events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(); + events += aeProcessEvents(server.el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); if (!events) break; count += events; } diff --git a/src/server.c b/src/server.c index fd75f25d6..380a6d5b6 100644 --- a/src/server.c +++ b/src/server.c @@ -2086,22 +2086,33 @@ extern int ProcessingEventsWhileBlocked; /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. + * * Note: This function is (currently) called from two functions: * 1. aeMain - The main server loop * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * * If it was called from processEventsWhileBlocked we don't want * to perform all actions (For example, we don't want to expire * keys), but we do need to perform some actions. + * * The most important is freeClientsInAsyncFreeQueue but we also * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (!ProcessingEventsWhileBlocked) { - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + /* Just call a subset of vital functions in case we are re-entering + * the event loop from processEventsWhileBlocked(). */ + if (ProcessingEventsWhileBlocked) { + handleClientsWithPendingReadsUsingThreads(); + tlsProcessPendingData(); + handleClientsWithPendingWrites(); + freeClientsInAsyncFreeQueue(); + return; } + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2111,69 +2122,65 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - if (!ProcessingEventsWhileBlocked) { - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; - } - - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; } + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); + /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - if (!ProcessingEventsWhileBlocked) { - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); - } + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); } /* This function is called immadiately after the event loop multiplexing From bc4667acbcbeced700d0f64735e8ba8c7ffb8357 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 May 2020 10:02:57 +0200 Subject: [PATCH 3/3] Track events processed while blocked globally. Related to #7234. --- src/connection.h | 2 +- src/networking.c | 19 ++++++++++++------- src/server.c | 16 +++++++++++----- src/server.h | 5 +++-- src/tls.c | 7 +++++-- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/connection.h b/src/connection.h index db09dfd83..0fd6c5f24 100644 --- a/src/connection.h +++ b/src/connection.h @@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len); /* Helpers for tls special considerations */ int tlsHasPendingData(); -void tlsProcessPendingData(); +int tlsProcessPendingData(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/networking.c b/src/networking.c index ce5b0ae38..671e374f4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } -void freeClientsInAsyncFreeQueue(void) { +/* Free the clietns marked as CLOSE_ASAP, return the number of clients + * freed. */ +int freeClientsInAsyncFreeQueue(void) { + int freed = listLength(server.clients_to_close); while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); @@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) { freeClient(c); listDelNode(server.clients_to_close,ln); } + return freed; } /* Return a client by ID, or NULL if the client ID is not in the set @@ -2852,9 +2856,8 @@ int clientsArePaused(void) { * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(void) { +void processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ - int count = 0; /* Note: when we are processing events while blocked (for instance during * busy Lua scripts), we set a global flag. When such flag is set, we @@ -2862,15 +2865,17 @@ int processEventsWhileBlocked(void) { * See https://github.com/antirez/redis/issues/6988 for more info. */ ProcessingEventsWhileBlocked = 1; while (iterations--) { - int events = 0; - events += aeProcessEvents(server.el, + long long startval = server.events_processed_while_blocked; + long long ae_events = aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT| AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that server.events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + server.events_processed_while_blocked += ae_events; + long long events = server.events_processed_while_blocked - startval; if (!events) break; - count += events; } ProcessingEventsWhileBlocked = 0; - return count; } /* ========================================================================== diff --git a/src/server.c b/src/server.c index 380a6d5b6..1bed3959d 100644 --- a/src/server.c +++ b/src/server.c @@ -2101,12 +2101,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Just call a subset of vital functions in case we are re-entering - * the event loop from processEventsWhileBlocked(). */ + * the event loop from processEventsWhileBlocked(). Note that in this + * case we keep track of the number of events we are processing, since + * processEventsWhileBlocked() wants to stop ASAP if there are no longer + * events to handle. */ if (ProcessingEventsWhileBlocked) { - handleClientsWithPendingReadsUsingThreads(); - tlsProcessPendingData(); - handleClientsWithPendingWrites(); - freeClientsInAsyncFreeQueue(); + uint64_t processed = 0; + processed += handleClientsWithPendingReadsUsingThreads(); + processed += tlsProcessPendingData(); + processed += handleClientsWithPendingWrites(); + processed += freeClientsInAsyncFreeQueue(); + server.events_processed_while_blocked += processed; return; } @@ -2757,6 +2762,7 @@ void initServer(void) { server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; + server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { diff --git a/src/server.h b/src/server.h index 59cf1370e..55ee2d300 100644 --- a/src/server.h +++ b/src/server.h @@ -1095,6 +1095,7 @@ struct redisServer { queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ + long long events_processed_while_blocked; /* processEventsWhileBlocked() */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); -void freeClientsInAsyncFreeQueue(void); +int freeClientsInAsyncFreeQueue(void); void asyncCloseClientOnOutputBufferLimitReached(client *c); int getClientType(client *c); int getClientTypeByName(char *name); @@ -1662,7 +1663,7 @@ void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); -int processEventsWhileBlocked(void); +void processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); diff --git a/src/tls.c b/src/tls.c index c18aafebe..ee85bd302 100644 --- a/src/tls.c +++ b/src/tls.c @@ -768,15 +768,17 @@ int tlsHasPendingData() { return listLength(pending_list) > 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { listIter li; listNode *ln; + int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { tls_connection *conn = listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } + return processed; } #else /* USE_OPENSSL */ @@ -804,7 +806,8 @@ int tlsHasPendingData() { return 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { + return 0; } #endif