diff --git a/src/networking.c b/src/networking.c index 9b41ea56d..04389986d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -215,6 +215,23 @@ client *createClient(connection *conn) { return c; } +void installClientWriteHandler(client *c) { + int ae_barrier = 0; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. the write barrier ensures that. */ + if (server.aof_state == AOF_ON && + server.aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_barrier = 1; + } + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { + freeClientAsync(c); + } +} + /* This function puts the client in the queue of clients that should write * their output buffers to the socket. Note that it does not *yet* install * the write handler, to start clients are put in a queue of clients that need @@ -222,7 +239,7 @@ client *createClient(connection *conn) { * handleClientsWithPendingWrites() function). * If we fail and there is more data to write, compared to what the socket * buffers can hold, then we'll really install the handler. */ -void clientInstallWriteHandler(client *c) { +void putClientInPendingWriteQueue(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ @@ -285,11 +302,11 @@ int prepareClientToWrite(client *c) { * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should - * not install a write handler. Instead, it will be done by - * handleClientsWithPendingReadsUsingThreads() upon return. + * not put the client in pending write queue. Instead, it will be + * done by handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -1995,20 +2012,7 @@ int handleClientsWithPendingWrites(void) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_barrier = 0; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. the write barrier ensures that. */ - if (server.aof_state == AOF_ON && - server.aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_barrier = 1; - } - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { - freeClientAsync(c); - } + installClientWriteHandler(c); } } return processed; @@ -2075,7 +2079,7 @@ void unprotectClient(client *c) { c->flags &= ~CLIENT_PROTECTED; if (c->conn) { connSetReadHandler(c->conn,readQueryFromClient); - if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); } } } @@ -4212,10 +4216,8 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Install the write handler if there are pending writes in some * of the clients. */ - if (clientHasPendingReplies(c) && - connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) - { - freeClientAsync(c); + if (clientHasPendingReplies(c)) { + installClientWriteHandler(c); } } listEmpty(server.clients_pending_write); @@ -4327,10 +4329,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { } /* We may have pending replies if a thread readQueryFromClient() produced - * replies and did not install a write handler (it can't). + * replies and did not put the client in pending write queue (it can't). */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) - clientInstallWriteHandler(c); + putClientInPendingWriteQueue(c); } /* Update processed count on server */ diff --git a/src/replication.c b/src/replication.c index e9a754ab4..d183b7d4a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1285,7 +1285,7 @@ void replicaStartCommandStream(client *slave) { return; } - clientInstallWriteHandler(slave); + putClientInPendingWriteQueue(slave); } /* We call this function periodically to remove an RDB file that was diff --git a/src/server.c b/src/server.c index 21c96a453..fd5c84b49 100644 --- a/src/server.c +++ b/src/server.c @@ -1509,6 +1509,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); processed += tlsProcessPendingData(); + if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) + flushAppendOnlyFile(0); processed += handleClientsWithPendingWrites(); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; @@ -1584,15 +1586,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); - /* Write the AOF buffer on disk */ + /* Try to process blocked clients every once in while. + * + * Example: A module calls RM_SignalKeyAsReady from within a timer callback + * (So we don't visit processCommand() at all). + * + * must be done before flushAppendOnlyFile, in case of appendfsync=always, + * since the unblocked clients may write data. */ + handleClientsBlockedOnKeys(); + + /* Write the AOF buffer on disk, + * must be done before handleClientsWithPendingWritesUsingThreads, + * in case of appendfsync=always. */ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0); - /* Try to process blocked clients every once in while. Example: A module - * calls RM_SignalKeyAsReady from within a timer callback (So we don't - * visit processCommand() at all). */ - handleClientsBlockedOnKeys(); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); diff --git a/src/server.h b/src/server.h index 02c00de5c..66f03c054 100644 --- a/src/server.h +++ b/src/server.h @@ -2510,7 +2510,7 @@ void unprotectClient(client *c); void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); -void clientInstallWriteHandler(client *c); +void putClientInPendingWriteQueue(client *c); #ifdef __GNUC__ void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...)