diff --git a/src/ae.c b/src/ae.c index 1bf6cbfbf..379cfd1e6 100644 --- a/src/ae.c +++ b/src/ae.c @@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) { * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) @@ -428,6 +429,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) + eventLoop->beforesleep(eventLoop); + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); @@ -522,9 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) - eventLoop->beforesleep(eventLoop); - aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); + aeProcessEvents(eventLoop, AE_ALL_EVENTS| + AE_CALL_BEFORE_SLEEP| + AE_CALL_AFTER_SLEEP); } } diff --git a/src/ae.h b/src/ae.h index 9acd72434..63b306615 100644 --- a/src/ae.h +++ b/src/ae.h @@ -47,11 +47,12 @@ things to disk before sending replies, and want to do that in a group fashion. */ -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 +#define AE_FILE_EVENTS (1<<0) +#define AE_TIME_EVENTS (1<<1) #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 -#define AE_CALL_AFTER_SLEEP 8 +#define AE_DONT_WAIT (1<<2) +#define AE_CALL_BEFORE_SLEEP (1<<3) +#define AE_CALL_AFTER_SLEEP (1<<4) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 diff --git a/src/connection.h b/src/connection.h index db09dfd83..0fd6c5f24 100644 --- a/src/connection.h +++ b/src/connection.h @@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len); /* Helpers for tls special considerations */ int tlsHasPendingData(); -void tlsProcessPendingData(); +int tlsProcessPendingData(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/networking.c b/src/networking.c index 75c0c16b1..671e374f4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } -void freeClientsInAsyncFreeQueue(void) { +/* Free the clietns marked as CLOSE_ASAP, return the number of clients + * freed. */ +int freeClientsInAsyncFreeQueue(void) { + int freed = listLength(server.clients_to_close); while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); @@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) { freeClient(c); listDelNode(server.clients_to_close,ln); } + return freed; } /* Return a client by ID, or NULL if the client ID is not in the set @@ -2852,9 +2856,8 @@ int clientsArePaused(void) { * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(void) { +void processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ - int count = 0; /* Note: when we are processing events while blocked (for instance during * busy Lua scripts), we set a global flag. When such flag is set, we @@ -2862,14 +2865,17 @@ int processEventsWhileBlocked(void) { * See https://github.com/antirez/redis/issues/6988 for more info. */ ProcessingEventsWhileBlocked = 1; while (iterations--) { - int events = 0; - events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(); + long long startval = server.events_processed_while_blocked; + long long ae_events = aeProcessEvents(server.el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that server.events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + server.events_processed_while_blocked += ae_events; + long long events = server.events_processed_while_blocked - startval; if (!events) break; - count += events; } ProcessingEventsWhileBlocked = 0; - return count; } /* ========================================================================== diff --git a/src/server.c b/src/server.c index 847a6a95a..5bc4666ee 100644 --- a/src/server.c +++ b/src/server.c @@ -2087,12 +2087,40 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +extern int ProcessingEventsWhileBlocked; + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep - * for ready file descriptors. */ + * for ready file descriptors. + * + * Note: This function is (currently) called from two functions: + * 1. aeMain - The main server loop + * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * + * If it was called from processEventsWhileBlocked we don't want + * to perform all actions (For example, we don't want to expire + * keys), but we do need to perform some actions. + * + * The most important is freeClientsInAsyncFreeQueue but we also + * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Just call a subset of vital functions in case we are re-entering + * the event loop from processEventsWhileBlocked(). Note that in this + * case we keep track of the number of events we are processing, since + * processEventsWhileBlocked() wants to stop ASAP if there are no longer + * events to handle. */ + if (ProcessingEventsWhileBlocked) { + uint64_t processed = 0; + processed += handleClientsWithPendingReadsUsingThreads(); + processed += tlsProcessPendingData(); + processed += handleClientsWithPendingWrites(); + processed += freeClientsInAsyncFreeQueue(); + server.events_processed_while_blocked += processed; + return; + } + /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2171,7 +2199,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (moduleCount()) moduleAcquireGIL(); + + if (!ProcessingEventsWhileBlocked) { + if (moduleCount()) moduleAcquireGIL(); + } } /* =========================== Server initialization ======================== */ @@ -2737,6 +2768,7 @@ void initServer(void) { server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; + server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { @@ -2879,6 +2911,11 @@ void initServer(void) { "blocked clients subsystem."); } + /* Register before and after sleep handlers (note this needs to be done + * before loading persistence since it is used by processEventsWhileBlocked. */ + aeSetBeforeSleepProc(server.el,beforeSleep); + aeSetAfterSleepProc(server.el,afterSleep); + /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, @@ -5135,8 +5172,6 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(server.server_cpulist); - aeSetBeforeSleepProc(server.el,beforeSleep); - aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; diff --git a/src/server.h b/src/server.h index 59cf1370e..55ee2d300 100644 --- a/src/server.h +++ b/src/server.h @@ -1095,6 +1095,7 @@ struct redisServer { queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ + long long events_processed_while_blocked; /* processEventsWhileBlocked() */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); -void freeClientsInAsyncFreeQueue(void); +int freeClientsInAsyncFreeQueue(void); void asyncCloseClientOnOutputBufferLimitReached(client *c); int getClientType(client *c); int getClientTypeByName(char *name); @@ -1662,7 +1663,7 @@ void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); -int processEventsWhileBlocked(void); +void processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); diff --git a/src/tls.c b/src/tls.c index c18aafebe..ee85bd302 100644 --- a/src/tls.c +++ b/src/tls.c @@ -768,15 +768,17 @@ int tlsHasPendingData() { return listLength(pending_list) > 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { listIter li; listNode *ln; + int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { tls_connection *conn = listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } + return processed; } #else /* USE_OPENSSL */ @@ -804,7 +806,8 @@ int tlsHasPendingData() { return 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { + return 0; } #endif diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index b364291ee..123e9c8b6 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -128,4 +128,56 @@ start_server {} { # make sure the server is still writable r set x xx } +} + +test {client freed during loading} { + start_server [list overrides [list key-load-delay 10 rdbcompression no]] { + # create a big rdb that will take long to load. it is important + # for keys to be big since the server processes events only once in 2mb. + # 100mb of rdb, 100k keys will load in more than 1 second + r debug populate 100000 key 1000 + + catch { + r debug restart + } + + set stdout [srv 0 stdout] + while 1 { + # check that the new server actually started and is ready for connections + if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} { + break + } + after 10 + } + # make sure it's still loading + assert_equal [s loading] 1 + + # connect and disconnect 10 clients + set clients {} + for {set j 0} {$j < 10} {incr j} { + lappend clients [redis_deferring_client] + } + foreach rd $clients { + $rd debug log bla + } + foreach rd $clients { + $rd read + } + foreach rd $clients { + $rd close + } + + # make sure the server freed the clients + wait_for_condition 100 100 { + [s connected_clients] < 3 + } else { + fail "clients didn't disconnect" + } + + # make sure it's still loading + assert_equal [s loading] 1 + + # no need to keep waiting for loading to complete + exec kill [srv 0 pid] + } } \ No newline at end of file