deadlock fixes
This commit is contained in:
parent
2526d51d1a
commit
acbad0c04e
38
src/ae.cpp
38
src/ae.cpp
@ -121,9 +121,17 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
|
||||
case AE_ASYNC_OP::CreateFileEvent:
|
||||
{
|
||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv);
|
||||
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
|
||||
cmd.pctl->cv.notify_all();
|
||||
if (cmd.pctl != nullptr)
|
||||
{
|
||||
cmd.pctl->mutexcv.lock();
|
||||
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
|
||||
cmd.pctl->cv.notify_all();
|
||||
cmd.pctl->mutexcv.unlock();
|
||||
}
|
||||
else
|
||||
{
|
||||
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@ -145,8 +153,8 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
}
|
||||
}
|
||||
|
||||
int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData)
|
||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData, int fSynchronous)
|
||||
{
|
||||
if (eventLoop == g_eventLoopThisThread)
|
||||
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
||||
@ -157,14 +165,22 @@ int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask,
|
||||
cmd.mask = mask;
|
||||
cmd.fproc = proc;
|
||||
cmd.clientData = clientData;
|
||||
cmd.pctl = new aeCommandControl();
|
||||
cmd.pctl = nullptr;
|
||||
if (fSynchronous)
|
||||
cmd.pctl = new aeCommandControl();
|
||||
|
||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv);
|
||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
|
||||
if (fSynchronous)
|
||||
cmd.pctl->mutexcv.lock();
|
||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||
AE_ASSERT(size == sizeof(cmd));
|
||||
cmd.pctl->cv.wait(ulock);
|
||||
int ret = cmd.pctl->rval;
|
||||
delete cmd.pctl;
|
||||
int ret = AE_OK;
|
||||
if (fSynchronous)
|
||||
{
|
||||
cmd.pctl->cv.wait(ulock);
|
||||
ret = cmd.pctl->rval;
|
||||
delete cmd.pctl;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -691,7 +707,9 @@ void aeMain(aeEventLoop *eventLoop) {
|
||||
ulock.lock();
|
||||
eventLoop->beforesleep(eventLoop);
|
||||
}
|
||||
AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
||||
AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
||||
}
|
||||
}
|
||||
|
||||
|
6
src/ae.h
6
src/ae.h
@ -138,8 +138,10 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
||||
void aeStop(aeEventLoop *eventLoop);
|
||||
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData);
|
||||
int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData);
|
||||
|
||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData, int fSynchronous);
|
||||
|
||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
|
||||
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
|
||||
|
@ -97,7 +97,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
aofrwblock *block;
|
||||
ssize_t nwritten;
|
||||
serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el);
|
||||
|
||||
|
||||
UNUSED(el);
|
||||
UNUSED(fd);
|
||||
UNUSED(privdata);
|
||||
@ -1497,7 +1497,7 @@ int aofCreatePipes(void) {
|
||||
/* Parent -> children data is non blocking. */
|
||||
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
|
||||
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
|
||||
if (aeCreateRemoteFileEventSync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
|
||||
if (aeCreateRemoteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL, TRUE) == AE_ERR) goto error;
|
||||
|
||||
server.aof_pipe_write_data_to_child = fds[1];
|
||||
server.aof_pipe_read_data_from_parent = fds[0];
|
||||
|
@ -110,6 +110,8 @@ void blockClient(client *c, int btype) {
|
||||
* in order to process the pending input buffer of clients that were
|
||||
* unblocked after a blocking operation. */
|
||||
void processUnblockedClients(int iel) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
|
||||
listNode *ln;
|
||||
client *c;
|
||||
list *unblocked_clients = server.rgthreadvar[iel].unblocked_clients;
|
||||
@ -127,11 +129,9 @@ void processUnblockedClients(int iel) {
|
||||
* client is not blocked before to proceed, but things may change and
|
||||
* the code is conceptually more correct this way. */
|
||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||
aeAcquireLock();
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||
processInputBufferAndReplicate(c);
|
||||
}
|
||||
aeReleaseLock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -155,7 +155,7 @@ void processUnblockedClients(int iel) {
|
||||
void queueClientForReprocessing(client *c) {
|
||||
/* The client may already be into the unblocked list because of a previous
|
||||
* blocking operation, don't add back it into the list multiple times. */
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
||||
c->flags |= CLIENT_UNBLOCKED;
|
||||
listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c);
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <sys/uio.h>
|
||||
#include <math.h>
|
||||
#include <ctype.h>
|
||||
#include <vector>
|
||||
|
||||
static void setProtocolError(const char *errstr, client *c);
|
||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
||||
@ -233,13 +234,10 @@ void clientInstallWriteHandler(client *c) {
|
||||
}
|
||||
|
||||
void clientInstallAsyncWriteHandler(client *c) {
|
||||
UNUSED(c);
|
||||
#if 0 // Not yet
|
||||
if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) {
|
||||
c->flags |= CLIENT_PENDING_ASYNCWRITE;
|
||||
listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/* This function is called every time we are going to transmit new data
|
||||
@ -436,6 +434,10 @@ void addReplyProto(client *c, const char *s, size_t len) {
|
||||
addReplyProtoCore(c, s, len, false);
|
||||
}
|
||||
|
||||
void addReplyProtoAsync(client *c, const char *s, size_t len) {
|
||||
addReplyProtoCore(c, s, len, true);
|
||||
}
|
||||
|
||||
/* Low level function called by the addReplyError...() functions.
|
||||
* It emits the protocol for a Redis error, in the form:
|
||||
*
|
||||
@ -1127,10 +1129,24 @@ static void freeClientArgv(client *c) {
|
||||
* when we resync with our own master and want to force all our slaves to
|
||||
* resync with us as well. */
|
||||
void disconnectSlaves(void) {
|
||||
while (listLength(server.slaves)) {
|
||||
listNode *ln = listFirst(server.slaves);
|
||||
freeClient((client*)ln->value);
|
||||
std::vector<client*> vecfreeImmediate;
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
listRewind(server.slaves, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
client *c = (client*)ln->value;
|
||||
if (c->iel == serverTL - server.rgthreadvar)
|
||||
{
|
||||
vecfreeImmediate.push_back(c);
|
||||
}
|
||||
else
|
||||
{
|
||||
freeClientAsync(c);
|
||||
}
|
||||
}
|
||||
|
||||
for (client *c : vecfreeImmediate)
|
||||
freeClient(c);
|
||||
}
|
||||
|
||||
/* Remove the specified client from global lists where the client could
|
||||
@ -2706,6 +2722,9 @@ int clientsArePaused(void) {
|
||||
int processEventsWhileBlocked(int iel) {
|
||||
int iterations = 4; /* See the function top-comment. */
|
||||
int count = 0;
|
||||
|
||||
// BUGBUG - This function isn't fair - why should clients on this thread get to run, but not clients elsewhere?
|
||||
// We mix up replies when releasing the lock here so more work is needed to fix this
|
||||
while (iterations--) {
|
||||
int events = 0;
|
||||
events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||
|
12
src/pubsub.c
12
src/pubsub.c
@ -51,13 +51,13 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
||||
* this message format also includes the pattern that matched the message. */
|
||||
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
||||
if (c->resp == 2)
|
||||
addReply(c,shared.mbulkhdr[4]);
|
||||
addReplyAsync(c,shared.mbulkhdr[4]);
|
||||
else
|
||||
addReplyPushLen(c,4);
|
||||
addReply(c,shared.pmessagebulk);
|
||||
addReplyBulk(c,pat);
|
||||
addReplyBulk(c,channel);
|
||||
addReplyBulk(c,msg);
|
||||
addReplyPushLenAsync(c,4);
|
||||
addReplyAsync(c,shared.pmessagebulk);
|
||||
addReplyBulkAsync(c,pat);
|
||||
addReplyBulkAsync(c,channel);
|
||||
addReplyBulkAsync(c,msg);
|
||||
}
|
||||
|
||||
/* Send the pubsub subscription notification to the client. */
|
||||
|
@ -296,7 +296,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
addReplyProto(slave,buf,buflen);
|
||||
addReplyProtoAsync(slave,buf,buflen);
|
||||
}
|
||||
}
|
||||
|
||||
@ -334,7 +334,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
||||
listRewind(monitors,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *monitor = (client*)ln->value;
|
||||
addReply(monitor,cmdobj);
|
||||
addReplyAsync(monitor,cmdobj);
|
||||
}
|
||||
decrRefCount(cmdobj);
|
||||
}
|
||||
|
@ -368,6 +368,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
struct redisCommand *cmd;
|
||||
client *c = server.lua_client;
|
||||
sds reply;
|
||||
|
||||
// Ensure our client is on the right thread
|
||||
c->iel = serverTL - server.rgthreadvar;
|
||||
|
||||
/* Cached across calls. */
|
||||
static robj **argv = NULL;
|
||||
@ -1278,7 +1281,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
|
||||
* here when the EVAL command will return. */
|
||||
protectClient(server.lua_caller);
|
||||
}
|
||||
if (server.lua_timedout) processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
|
||||
if (server.lua_timedout) processEventsWhileBlocked(serverTL - server.rgthreadvar);
|
||||
if (server.lua_kill) {
|
||||
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
|
||||
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
|
||||
|
@ -2104,9 +2104,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
|
||||
{
|
||||
aeReleaseLock();
|
||||
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
|
||||
aeAcquireLock();
|
||||
}
|
||||
|
||||
/* Write the AOF buffer on disk */
|
||||
@ -2129,9 +2127,12 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
|
||||
aeAcquireLock();
|
||||
processUnblockedClients(iel);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWrites(iel);
|
||||
}
|
||||
@ -3213,7 +3214,7 @@ static void ProcessPendingAsyncWrites()
|
||||
ae_flags |= AE_BARRIER;
|
||||
}
|
||||
|
||||
if (aeCreateRemoteFileEventSync(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
|
||||
if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR)
|
||||
freeClientAsync(c);
|
||||
}
|
||||
}
|
||||
@ -5045,7 +5046,7 @@ int main(int argc, char **argv) {
|
||||
|
||||
initServer();
|
||||
|
||||
server.cthreads = 1; //testing
|
||||
server.cthreads = 2; //testing
|
||||
initNetworking(1 /* fReusePort */);
|
||||
|
||||
if (background || server.pidfile) createPidFile();
|
||||
|
@ -1044,7 +1044,7 @@ struct redisServerThreadVars {
|
||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||
int ipfd_count; /* Used slots in ipfd[] */
|
||||
list *clients_pending_write; /* There is to write or install handler. */
|
||||
list *unblocked_clients; /* list of clients to unblock before next loop */
|
||||
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
||||
list *clients_pending_asyncwrite;
|
||||
};
|
||||
|
||||
@ -1628,6 +1628,7 @@ void unprotectClient(client *c);
|
||||
// Special Thread-safe addReply() commands for posting messages to clients from a different thread
|
||||
void addReplyAsync(client *c, robj *obj);
|
||||
void addReplyArrayLenAsync(client *c, long length);
|
||||
void addReplyProtoAsync(client *c, const char *s, size_t len);
|
||||
void addReplyBulkAsync(client *c, robj *obj);
|
||||
void addReplyBulkCBufferAsync(client *c, const void *p, size_t len);
|
||||
void addReplyErrorAsync(client *c, const char *err);
|
||||
|
Loading…
x
Reference in New Issue
Block a user