Threaded IO: read side WIP 3.

This commit is contained in:
antirez 2019-03-31 21:59:50 +02:00
parent a2245f8ff1
commit 63a0ffd36a
3 changed files with 55 additions and 6 deletions

View File

@ -1711,12 +1711,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} else { } else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c); freeClientAsync(c);
return; return;
} }
} else if (nread == 0) { } else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection"); serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c); freeClientAsync(c);
return; return;
} else if (c->flags & CLIENT_MASTER) { } else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer /* Append the query buffer to the pending (not applied) buffer
@ -1739,7 +1739,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); // serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci); sdsfree(ci);
sdsfree(bytes); sdsfree(bytes);
freeClient(c); freeClientAsync(c);
return; return;
} }
@ -2538,8 +2538,10 @@ void *IOThreadMain(void *myid) {
client *c = listNodeValue(ln); client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) { if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c->fd,c,0); writeToClient(c->fd,c,0);
} else { } else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(NULL,c->fd,c,0); readQueryFromClient(NULL,c->fd,c,0);
} else {
serverPanic("io_threads_op value is unknown");
} }
} }
listEmpty(io_threads_list[id]); listEmpty(io_threads_list[id]);
@ -2632,7 +2634,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Start threads if needed. */ /* Start threads if needed. */
if (!io_threads_active) startThreadedIO(); if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL pending clients\n", processed); if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */ /* Distribute the clients across N different lists. */
listIter li; listIter li;
@ -2649,6 +2651,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the /* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */ * start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 0; j < server.io_threads_num; j++) { for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]); int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count; io_threads_pending[j] = count;
@ -2661,7 +2664,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
pending += io_threads_pending[j]; pending += io_threads_pending[j];
if (pending == 0) break; if (pending == 0) break;
} }
if (tio_debug) printf("All threads finshed\n"); if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where /* Run the list of clients again to install the write handler where
* needed. */ * needed. */
@ -2699,4 +2702,48 @@ int postponeClientRead(client *c) {
} }
int handleClientsWithPendingReadsUsingThreads(void) { int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 0; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Wait for all threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 0; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
return processed;
} }

View File

@ -2092,6 +2092,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
void afterSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL(); if (moduleCount()) moduleAcquireGIL();
handleClientsWithPendingReadsUsingThreads();
} }
/* =========================== Server initialization ======================== */ /* =========================== Server initialization ======================== */

View File

@ -1578,6 +1578,7 @@ int clientsArePaused(void);
int processEventsWhileBlocked(void); int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void); int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void); int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c); int clientHasPendingReplies(client *c);
void unlinkClient(client *c); void unlinkClient(client *c);