Track events processed while blocked globally.

Related to #7234.
This commit is contained in:
antirez 2020-05-14 10:02:57 +02:00
parent 27e25e9d1e
commit bc4667acbc
5 changed files with 32 additions and 17 deletions

View File

@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
/* Helpers for tls special considerations */ /* Helpers for tls special considerations */
int tlsHasPendingData(); int tlsHasPendingData();
void tlsProcessPendingData(); int tlsProcessPendingData();
#endif /* __REDIS_CONNECTION_H */ #endif /* __REDIS_CONNECTION_H */

View File

@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) {
pthread_mutex_unlock(&async_free_queue_mutex); pthread_mutex_unlock(&async_free_queue_mutex);
} }
void freeClientsInAsyncFreeQueue(void) { /* Free the clietns marked as CLOSE_ASAP, return the number of clients
* freed. */
int freeClientsInAsyncFreeQueue(void) {
int freed = listLength(server.clients_to_close);
while (listLength(server.clients_to_close)) { while (listLength(server.clients_to_close)) {
listNode *ln = listFirst(server.clients_to_close); listNode *ln = listFirst(server.clients_to_close);
client *c = listNodeValue(ln); client *c = listNodeValue(ln);
@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) {
freeClient(c); freeClient(c);
listDelNode(server.clients_to_close,ln); listDelNode(server.clients_to_close,ln);
} }
return freed;
} }
/* Return a client by ID, or NULL if the client ID is not in the set /* Return a client by ID, or NULL if the client ID is not in the set
@ -2852,9 +2856,8 @@ int clientsArePaused(void) {
* write, close sequence needed to serve a client. * write, close sequence needed to serve a client.
* *
* The function returns the total number of events processed. */ * The function returns the total number of events processed. */
int processEventsWhileBlocked(void) { void processEventsWhileBlocked(void) {
int iterations = 4; /* See the function top-comment. */ int iterations = 4; /* See the function top-comment. */
int count = 0;
/* Note: when we are processing events while blocked (for instance during /* Note: when we are processing events while blocked (for instance during
* busy Lua scripts), we set a global flag. When such flag is set, we * busy Lua scripts), we set a global flag. When such flag is set, we
@ -2862,15 +2865,17 @@ int processEventsWhileBlocked(void) {
* See https://github.com/antirez/redis/issues/6988 for more info. */ * See https://github.com/antirez/redis/issues/6988 for more info. */
ProcessingEventsWhileBlocked = 1; ProcessingEventsWhileBlocked = 1;
while (iterations--) { while (iterations--) {
int events = 0; long long startval = server.events_processed_while_blocked;
events += aeProcessEvents(server.el, long long ae_events = aeProcessEvents(server.el,
AE_FILE_EVENTS|AE_DONT_WAIT| AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that server.events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
server.events_processed_while_blocked += ae_events;
long long events = server.events_processed_while_blocked - startval;
if (!events) break; if (!events) break;
count += events;
} }
ProcessingEventsWhileBlocked = 0; ProcessingEventsWhileBlocked = 0;
return count;
} }
/* ========================================================================== /* ==========================================================================

View File

@ -2101,12 +2101,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
/* Just call a subset of vital functions in case we are re-entering /* Just call a subset of vital functions in case we are re-entering
* the event loop from processEventsWhileBlocked(). */ * the event loop from processEventsWhileBlocked(). Note that in this
* case we keep track of the number of events we are processing, since
* processEventsWhileBlocked() wants to stop ASAP if there are no longer
* events to handle. */
if (ProcessingEventsWhileBlocked) { if (ProcessingEventsWhileBlocked) {
handleClientsWithPendingReadsUsingThreads(); uint64_t processed = 0;
tlsProcessPendingData(); processed += handleClientsWithPendingReadsUsingThreads();
handleClientsWithPendingWrites(); processed += tlsProcessPendingData();
freeClientsInAsyncFreeQueue(); processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
return; return;
} }
@ -2757,6 +2762,7 @@ void initServer(void) {
server.clients_waiting_acks = listCreate(); server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0; server.get_ack_from_slaves = 0;
server.clients_paused = 0; server.clients_paused = 0;
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size(); server.system_memory_size = zmalloc_get_memory_size();
if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) {

View File

@ -1095,6 +1095,7 @@ struct redisServer {
queries. Will still serve RESP2 queries. */ queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */ int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_do_reads; /* Read and parse from IO threads? */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */ int loading; /* We are loading data from disk if true */
@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval); void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv); void replaceClientCommandVector(client *c, int argc, robj **argv);
unsigned long getClientOutputBufferMemoryUsage(client *c); unsigned long getClientOutputBufferMemoryUsage(client *c);
void freeClientsInAsyncFreeQueue(void); int freeClientsInAsyncFreeQueue(void);
void asyncCloseClientOnOutputBufferLimitReached(client *c); void asyncCloseClientOnOutputBufferLimitReached(client *c);
int getClientType(client *c); int getClientType(client *c);
int getClientTypeByName(char *name); int getClientTypeByName(char *name);
@ -1662,7 +1663,7 @@ void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count); int listenToPort(int port, int *fds, int *count);
void pauseClients(mstime_t duration); void pauseClients(mstime_t duration);
int clientsArePaused(void); int clientsArePaused(void);
int processEventsWhileBlocked(void); void processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void); int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void);

View File

@ -768,15 +768,17 @@ int tlsHasPendingData() {
return listLength(pending_list) > 0; return listLength(pending_list) > 0;
} }
void tlsProcessPendingData() { int tlsProcessPendingData() {
listIter li; listIter li;
listNode *ln; listNode *ln;
int processed = listLength(pending_list);
listRewind(pending_list,&li); listRewind(pending_list,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
tls_connection *conn = listNodeValue(ln); tls_connection *conn = listNodeValue(ln);
tlsHandleEvent(conn, AE_READABLE); tlsHandleEvent(conn, AE_READABLE);
} }
return processed;
} }
#else /* USE_OPENSSL */ #else /* USE_OPENSSL */
@ -804,7 +806,8 @@ int tlsHasPendingData() {
return 0; return 0;
} }
void tlsProcessPendingData() { int tlsProcessPendingData() {
return 0;
} }
#endif #endif