From 647a66ebba5d12d461e830f174a1c90a4e96c5cd Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 12 Apr 2019 17:18:10 +0200 Subject: [PATCH] Threaded IO: parsing WIP 1: set current_client in a better scoped way. --- src/networking.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0e11e1f3f..3faaf4a12 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1563,7 +1563,7 @@ int processMultibulkBuffer(client *c) { * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { - server.current_client = c; + int deadclient = 0; /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { @@ -1619,6 +1619,7 @@ void processInputBuffer(client *c) { resetClient(c); } else { /* Only reset the client when the command was executed. */ + server.current_client = c; if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -1629,23 +1630,26 @@ void processInputBuffer(client *c) { * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { resetClient(c); + } } + if (server.current_client == NULL) deadclient = 1; + server.current_client = NULL; /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ - if (server.current_client == NULL) break; + if (deadclient) break; } } /* Trim to pos */ - if (server.current_client != NULL && c->qb_pos) { + if (!deadclient && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } - - server.current_client = NULL; } /* This is a wrapper for processInputBuffer that also cares about handling @@ -1743,11 +1747,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { } /* There is more data in the client input buffer, continue parsing it - * in case to check if there is a full command to execute. - * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means - * we are currently in the context of an I/O thread. */ - if (!(c->flags & CLIENT_PENDING_READ)) - processInputBufferAndReplicate(c); + * in case to check if there is a full command to execute. */ + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list,