diff --git a/src/debug.cpp b/src/debug.cpp index 4e588a254..f4791f593 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -706,7 +706,7 @@ void _serverAssertPrintClientInfo(const client *c) { bugReportStart(); serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ==="); - serverLog(LL_WARNING,"client->flags = %d", c->flags); + serverLog(LL_WARNING,"client->flags = %d", static_cast(c->flags)); serverLog(LL_WARNING,"client->fd = %d", c->fd); serverLog(LL_WARNING,"client->argc = %d", c->argc); for (j=0; j < c->argc; j++) { diff --git a/src/networking.cpp b/src/networking.cpp index 36b2aa9de..7cf0ae55f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1476,15 +1476,7 @@ int writeToClient(int fd, client *c, int handler_installed) { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); lock.unlock(); - if (aeTryAcquireLock(true /*fWeak*/)) - { - freeClient(c); - aeReleaseLock(); - } - else - { - freeClientAsync(c); - } + freeClientAsync(c); return C_ERR; } @@ -1503,15 +1495,7 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { lock.unlock(); - if (aeTryAcquireLock(true /*fWeak*/)) - { - freeClient(c); - aeReleaseLock(); - } - else - { - freeClientAsync(c); - } + freeClientAsync(c); return C_ERR; } } @@ -1524,7 +1508,14 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*)privdata; serverAssert(ielFromEventLoop(el) == c->iel); - writeToClient(fd,c,1); + if (writeToClient(fd,c,1) == C_ERR) + { + AeLocker ae; + c->lock.lock(); + ae.arm(c); + if (c->flags & CLIENT_CLOSE_ASAP) + freeClient(c); + } } void ProcessPendingAsyncWrites() @@ -1591,39 +1582,46 @@ int handleClientsWithPendingWrites(int iel) { int processed = (int)vec.size(); serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. AE_BARRIER ensures that. */ + if (g_pserver->aof_state == AOF_ON && + g_pserver->aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_flags |= AE_BARRIER; + } + while(!vec.empty()) { client *c = vec.back(); - std::unique_locklock)> lock(c->lock); + AssertCorrectThread(c); c->flags &= ~CLIENT_PENDING_WRITE; vec.pop_back(); - AssertCorrectThread(c); /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue; + std::unique_locklock)> lock(c->lock); + /* Try to write buffers to the client socket. */ if (writeToClient(c->fd,c,0) == C_ERR) { - lock.release(); // client is free'd + if (c->flags & CLIENT_CLOSE_ASAP) + { + c->lock.lock(); + AeLocker ae; + ae.arm(c); + freeClient(c); // writeToClient will only async close, but there's no need to wait + } continue; } /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ - if (g_pserver->aof_state == AOF_ON && - g_pserver->aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_flags |= AE_BARRIER; - } - if (aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) freeClientAsync(c); } diff --git a/src/server.h b/src/server.h index 824eef04f..8a762e239 100644 --- a/src/server.h +++ b/src/server.h @@ -921,7 +921,7 @@ typedef struct client { time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; - int flags; /* Client flags: CLIENT_* macros. */ + std::atomic flags; /* Client flags: CLIENT_* macros. */ int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */