Fix more locking deadlocks
Former-commit-id: 3081b6f98b5e7a9f3ef7cfe040236070398b081c
This commit is contained in:
parent
8f1e7d9d5b
commit
e78ed8cc64
@ -706,7 +706,7 @@ void _serverAssertPrintClientInfo(const client *c) {
|
|||||||
|
|
||||||
bugReportStart();
|
bugReportStart();
|
||||||
serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ===");
|
serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ===");
|
||||||
serverLog(LL_WARNING,"client->flags = %d", c->flags);
|
serverLog(LL_WARNING,"client->flags = %d", static_cast<int>(c->flags));
|
||||||
serverLog(LL_WARNING,"client->fd = %d", c->fd);
|
serverLog(LL_WARNING,"client->fd = %d", c->fd);
|
||||||
serverLog(LL_WARNING,"client->argc = %d", c->argc);
|
serverLog(LL_WARNING,"client->argc = %d", c->argc);
|
||||||
for (j=0; j < c->argc; j++) {
|
for (j=0; j < c->argc; j++) {
|
||||||
|
@ -1476,15 +1476,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
serverLog(LL_VERBOSE,
|
serverLog(LL_VERBOSE,
|
||||||
"Error writing to client: %s", strerror(errno));
|
"Error writing to client: %s", strerror(errno));
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
if (aeTryAcquireLock(true /*fWeak*/))
|
freeClientAsync(c);
|
||||||
{
|
|
||||||
freeClient(c);
|
|
||||||
aeReleaseLock();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
freeClientAsync(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
@ -1503,15 +1495,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
/* Close connection after entire reply has been sent. */
|
/* Close connection after entire reply has been sent. */
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
if (aeTryAcquireLock(true /*fWeak*/))
|
freeClientAsync(c);
|
||||||
{
|
|
||||||
freeClient(c);
|
|
||||||
aeReleaseLock();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
freeClientAsync(c);
|
|
||||||
}
|
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1524,7 +1508,14 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
client *c = (client*)privdata;
|
client *c = (client*)privdata;
|
||||||
|
|
||||||
serverAssert(ielFromEventLoop(el) == c->iel);
|
serverAssert(ielFromEventLoop(el) == c->iel);
|
||||||
writeToClient(fd,c,1);
|
if (writeToClient(fd,c,1) == C_ERR)
|
||||||
|
{
|
||||||
|
AeLocker ae;
|
||||||
|
c->lock.lock();
|
||||||
|
ae.arm(c);
|
||||||
|
if (c->flags & CLIENT_CLOSE_ASAP)
|
||||||
|
freeClient(c);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ProcessPendingAsyncWrites()
|
void ProcessPendingAsyncWrites()
|
||||||
@ -1591,39 +1582,46 @@ int handleClientsWithPendingWrites(int iel) {
|
|||||||
int processed = (int)vec.size();
|
int processed = (int)vec.size();
|
||||||
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
|
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
|
||||||
|
|
||||||
|
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
||||||
|
/* For the fsync=always policy, we want that a given FD is never
|
||||||
|
* served for reading and writing in the same event loop iteration,
|
||||||
|
* so that in the middle of receiving the query, and serving it
|
||||||
|
* to the client, we'll call beforeSleep() that will do the
|
||||||
|
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
||||||
|
if (g_pserver->aof_state == AOF_ON &&
|
||||||
|
g_pserver->aof_fsync == AOF_FSYNC_ALWAYS)
|
||||||
|
{
|
||||||
|
ae_flags |= AE_BARRIER;
|
||||||
|
}
|
||||||
|
|
||||||
while(!vec.empty()) {
|
while(!vec.empty()) {
|
||||||
client *c = vec.back();
|
client *c = vec.back();
|
||||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
vec.pop_back();
|
vec.pop_back();
|
||||||
AssertCorrectThread(c);
|
|
||||||
|
|
||||||
/* If a client is protected, don't do anything,
|
/* If a client is protected, don't do anything,
|
||||||
* that may trigger write error or recreate handler. */
|
* that may trigger write error or recreate handler. */
|
||||||
if (c->flags & CLIENT_PROTECTED) continue;
|
if (c->flags & CLIENT_PROTECTED) continue;
|
||||||
|
|
||||||
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
|
|
||||||
/* Try to write buffers to the client socket. */
|
/* Try to write buffers to the client socket. */
|
||||||
if (writeToClient(c->fd,c,0) == C_ERR) {
|
if (writeToClient(c->fd,c,0) == C_ERR) {
|
||||||
lock.release(); // client is free'd
|
if (c->flags & CLIENT_CLOSE_ASAP)
|
||||||
|
{
|
||||||
|
c->lock.lock();
|
||||||
|
AeLocker ae;
|
||||||
|
ae.arm(c);
|
||||||
|
freeClient(c); // writeToClient will only async close, but there's no need to wait
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If after the synchronous writes above we still have data to
|
/* If after the synchronous writes above we still have data to
|
||||||
* output to the client, we need to install the writable handler. */
|
* output to the client, we need to install the writable handler. */
|
||||||
if (clientHasPendingReplies(c)) {
|
if (clientHasPendingReplies(c)) {
|
||||||
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
|
||||||
/* For the fsync=always policy, we want that a given FD is never
|
|
||||||
* served for reading and writing in the same event loop iteration,
|
|
||||||
* so that in the middle of receiving the query, and serving it
|
|
||||||
* to the client, we'll call beforeSleep() that will do the
|
|
||||||
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
|
||||||
if (g_pserver->aof_state == AOF_ON &&
|
|
||||||
g_pserver->aof_fsync == AOF_FSYNC_ALWAYS)
|
|
||||||
{
|
|
||||||
ae_flags |= AE_BARRIER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
|
if (aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
}
|
}
|
||||||
|
@ -921,7 +921,7 @@ typedef struct client {
|
|||||||
time_t ctime; /* Client creation time. */
|
time_t ctime; /* Client creation time. */
|
||||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||||
time_t obuf_soft_limit_reached_time;
|
time_t obuf_soft_limit_reached_time;
|
||||||
int flags; /* Client flags: CLIENT_* macros. */
|
std::atomic<int> flags; /* Client flags: CLIENT_* macros. */
|
||||||
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
|
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
|
||||||
int authenticated; /* Needed when the default user requires auth. */
|
int authenticated; /* Needed when the default user requires auth. */
|
||||||
int replstate; /* Replication state if this is a slave. */
|
int replstate; /* Replication state if this is a slave. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user