From bdb338cf77c1eb59363d165eb46bb7259367642c Mon Sep 17 00:00:00 2001 From: antirez Date: Sun, 15 Mar 2020 16:10:37 +0100 Subject: [PATCH 1/4] Aesthetic changes in PR #6989. --- src/networking.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 261c9b24a..c7dd2f23c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -36,7 +36,7 @@ static void setProtocolError(const char *errstr, client *c); int postponeClientRead(client *c); -int process_while_blocked; +int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -2739,7 +2739,12 @@ int clientsArePaused(void) { int processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ int count = 0; - process_while_blocked = 1; + + /* Note: when we are processing events while blocked (for instance during + * busy Lua scripts), we set a global flag. When such flag is set, we + * avoid handling the read part of clients using threaded I/O. + * See https://github.com/antirez/redis/issues/6988 for more info. */ + ProcessingEventsWhileBlocked = 1; while (iterations--) { int events = 0; events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); @@ -2747,7 +2752,7 @@ int processEventsWhileBlocked(void) { if (!events) break; count += events; } - process_while_blocked = 0; + ProcessingEventsWhileBlocked = 0; return count; } @@ -2819,7 +2824,6 @@ void *IOThreadMain(void *myid) { /* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { io_threads_active = 0; /* We start with threads not active. */ - process_while_blocked = 0; /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ @@ -2974,7 +2978,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { int postponeClientRead(client *c) { if (io_threads_active && server.io_threads_do_reads && - !process_while_blocked && + !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; From da8c7c49bf534f6c75c2cb2724a3060e8b6bbab2 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 16 Mar 2020 13:48:29 +0100 Subject: [PATCH 2/4] Example sentinel conf: document requirepass. --- sentinel.conf | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sentinel.conf b/sentinel.conf index bc9a705ac..796f45088 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -112,6 +112,14 @@ sentinel monitor mymaster 127.0.0.1 6379 2 # Default is 30 seconds. sentinel down-after-milliseconds mymaster 30000 +# requirepass +# +# You can configure Sentinel itself to require a password, however when doing +# so Sentinel will try to authenticate with the same password to all the +# other Sentinels. So you need to configure all your Sentinels in a given +# group with the same "requirepass" password. Check the following documentation +# for more info: https://redis.io/topics/sentinel + # sentinel parallel-syncs # # How many replicas we can reconfigure to point to the new replica simultaneously From 9cc7038e5473cdab8232e58002e9741c3637366f Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Mon, 16 Mar 2020 11:20:48 +0800 Subject: [PATCH 3/4] Threaded IO: handle pending reads clients ASAP after event loop --- src/server.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server.c b/src/server.c index a6d4b357e..f702da94a 100644 --- a/src/server.c +++ b/src/server.c @@ -2088,6 +2088,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* We should handle pending reads clients ASAP after event loop. */ + handleClientsWithPendingReadsUsingThreads(); + /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); /* If tls still has pending unread data don't sleep at all. */ @@ -2157,7 +2160,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); - handleClientsWithPendingReadsUsingThreads(); } /* =========================== Server initialization ======================== */ From 7c07841632500dfdff31f5a69e5bd140abea646c Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Sun, 15 Mar 2020 23:30:25 +0800 Subject: [PATCH 4/4] Threaded IO: bugfix client kill may crash redis --- src/networking.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index c7dd2f23c..0690bbdf6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3040,16 +3040,22 @@ int handleClientsWithPendingReadsUsingThreads(void) { if (tio_debug) printf("I/O READ All threads finshed\n"); /* Run the list of clients again to process the new buffers. */ - listRewind(server.clients_pending_read,&li); - while((ln = listNext(&li))) { + while(listLength(server.clients_pending_read)) { + ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; + listDelNode(server.clients_pending_read,ln); + if (c->flags & CLIENT_PENDING_COMMAND) { - c->flags &= ~ CLIENT_PENDING_COMMAND; - processCommandAndResetClient(c); + c->flags &= ~CLIENT_PENDING_COMMAND; + if (processCommandAndResetClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid + * processing the client later. So we just go + * to the next. */ + continue; + } } processInputBufferAndReplicate(c); } - listEmpty(server.clients_pending_read); return processed; }