From 0a3410db7005c509cfb2ee1923b095c6d5ab7e83 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 21 May 2019 11:37:13 +0800 Subject: [PATCH 1/5] Threaded IO: use main thread to handle write work --- src/networking.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index a558ae91a..900e8cf87 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2656,7 +2656,7 @@ pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; int io_threads_active; /* Are the threads currently spinning waiting I/O? */ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ -list *io_threads_list[IO_THREADS_MAX_NUM]; +list *io_threads_list[IO_THREADS_MAX_NUM+1]; void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is @@ -2729,6 +2729,7 @@ void initThreadedIO(void) { } io_threads[i] = tid; } + io_threads_list[server.io_threads_num] = listCreate(); } void startThreadedIO(void) { @@ -2800,7 +2801,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; - int target_id = item_id % server.io_threads_num; + int target_id = item_id % (server.io_threads_num+1); listAddNodeTail(io_threads_list[target_id],c); item_id++; } @@ -2813,6 +2814,13 @@ int handleClientsWithPendingWritesUsingThreads(void) { io_threads_pending[j] = count; } + listRewind(io_threads_list[server.io_threads_num],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + } + listEmpty(io_threads_list[server.io_threads_num]); + /* Wait for all threads to end their work. */ while(1) { unsigned long pending = 0; From 1f76ff2fb92a2eb4717a847d304b8f2c3711a5b8 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 21 May 2019 11:42:10 +0800 Subject: [PATCH 2/5] Threaded IO: use main thread to handle read work --- src/networking.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 900e8cf87..f1a6b9910 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2885,7 +2885,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); - int target_id = item_id % server.io_threads_num; + int target_id = item_id % (server.io_threads_num+1); listAddNodeTail(io_threads_list[target_id],c); item_id++; } @@ -2898,6 +2898,13 @@ int handleClientsWithPendingReadsUsingThreads(void) { io_threads_pending[j] = count; } + listRewind(io_threads_list[server.io_threads_num],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + readQueryFromClient(NULL,c->fd,c,0); + } + listEmpty(io_threads_list[server.io_threads_num]); + /* Wait for all threads to end their work. */ while(1) { unsigned long pending = 0; From a6eb3f8bfb4f8e9a78081f50ff2f562d407d5e65 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 13 Jan 2020 12:50:26 +0100 Subject: [PATCH 3/5] A few comments about main thread serving I/O as well. Related to #6110. --- src/networking.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index f1a6b9910..96ab8e592 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2656,6 +2656,10 @@ pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; int io_threads_active; /* Are the threads currently spinning waiting I/O? */ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ + +/* This is the list of clients each thread will serve when threaded I/O is + * used. We spawn N threads, and the N+1 list is used for the clients that + * are processed by the main thread itself (this is why ther is "+1"). */ list *io_threads_list[IO_THREADS_MAX_NUM+1]; void *IOThreadMain(void *myid) { @@ -2729,7 +2733,7 @@ void initThreadedIO(void) { } io_threads[i] = tid; } - io_threads_list[server.io_threads_num] = listCreate(); + io_threads_list[server.io_threads_num] = listCreate(); /* For main thread */ } void startThreadedIO(void) { @@ -2814,6 +2818,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { io_threads_pending[j] = count; } + /* Also use the main thread to process a slide of clients. */ listRewind(io_threads_list[server.io_threads_num],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); @@ -2898,6 +2903,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { io_threads_pending[j] = count; } + /* Also use the main thread to process a slide of clients. */ listRewind(io_threads_list[server.io_threads_num],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); From 0b02b27705dfe5afc24581403f9accbe1cc5f1d5 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 13 Jan 2020 12:54:39 +0100 Subject: [PATCH 4/5] Port PR #6110 to new connection object code. --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 96ab8e592..73dc4afca 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2822,7 +2822,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { listRewind(io_threads_list[server.io_threads_num],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); + writeToClient(c,0); } listEmpty(io_threads_list[server.io_threads_num]); @@ -2907,7 +2907,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { listRewind(io_threads_list[server.io_threads_num],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - readQueryFromClient(NULL,c->fd,c,0); + readQueryFromClient(c->conn); } listEmpty(io_threads_list[server.io_threads_num]); From 87d059a7454aad2c3a9810ddafd33c3a9e4be383 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 13 Jan 2020 13:16:13 +0100 Subject: [PATCH 5/5] Jump to right label on AOF parsing error. Related to #6054. --- src/aof.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/aof.c b/src/aof.c index 9c2fa838b..63b34b43f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -789,12 +789,14 @@ int loadAppendOnlyFile(char *filename) { for (j = 0; j < argc; j++) { /* Parse the argument len. */ - if (fgets(buf,sizeof(buf),fp) == NULL || - buf[0] != '$') - { + char *readres = fgets(buf,sizeof(buf),fp); + if (readres == NULL || buf[0] != '$') { fakeClient->argc = j; /* Free up to j-1. */ freeFakeClientArgv(fakeClient); - goto readerr; + if (readres == NULL) + goto readerr; + else + goto fmterr; } len = strtol(buf+1,NULL,10);