Threaded IO: parsing WIP 2: refactoring to parse from thread.

This commit is contained in:
antirez 2019-04-26 19:29:50 +02:00
parent 647a66ebba
commit 6ab6a97fe6
2 changed files with 61 additions and 29 deletions

View File

@ -1558,13 +1558,47 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}
/* This function calls processCommand(), but also performs a few sub tasks
* that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
* 2. In the case of master clients, the replication offset is updated.
* 3. The client is reset unless there are reasons to avoid doing it.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c);
}
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
int deadclient = 0;
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
@ -1573,6 +1607,10 @@ void processInputBuffer(client *c) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
@ -1618,35 +1656,26 @@ void processInputBuffer(client *c) {
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
server.current_client = c;
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c);
}
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
if (deadclient) break;
}
}
/* Trim to pos */
if (!deadclient && c->qb_pos) {
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
@ -1737,9 +1766,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
// FIXME: This may be called from an I/O thread and it is not safe to
// log from there for now.
// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
@ -2747,6 +2774,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
processCommandAndResetClient(c);
}
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);

View File

@ -288,6 +288,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
#define CLIENT_PENDING_COMMAND (1<<30) /* */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */