diff --git a/src/aof.c b/src/aof.c index 7d53bdfdb..133d1e7e9 100644 --- a/src/aof.c +++ b/src/aof.c @@ -641,6 +641,7 @@ struct client *createFakeClient(void) { c->argv = NULL; c->bufpos = 0; c->flags = 0; + c->fPendingAsyncWrite = FALSE; c->btype = BLOCKED_NONE; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ diff --git a/src/blocked.c b/src/blocked.c index 05fd9ee8a..9344cfeb2 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -124,6 +124,8 @@ void processUnblockedClients(int iel) { c = ln->value; listDelNode(unblocked_clients,ln); AssertCorrectThread(c); + + fastlock_lock(&c->lock); c->flags &= ~CLIENT_UNBLOCKED; /* Process remaining data in the input buffer, unless the client @@ -135,6 +137,7 @@ void processUnblockedClients(int iel) { processInputBufferAndReplicate(c); } } + fastlock_unlock(&c->lock); } } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index ced0b3e31..bdff8e777 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -31,15 +31,16 @@ extern "C" void fastlock_lock(struct fastlock *lock) return; } + int cloops = 1; while (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1)) { - sched_yield(); + if ((++cloops % 1024*1024) == 0) + sched_yield(); } } lock->m_depth = 1; lock->m_pidOwner = gettid(); - __sync_synchronize(); } extern "C" void fastlock_unlock(struct fastlock *lock) diff --git a/src/john@18.191.254.20 b/src/john@18.191.254.20 new file mode 100755 index 000000000..c57ce149b Binary files /dev/null and b/src/john@18.191.254.20 differ diff --git a/src/module.c b/src/module.c index 756fe5538..d2601e16f 100644 --- a/src/module.c +++ b/src/module.c @@ -3700,6 +3700,7 @@ void moduleHandleBlockedClients(void) { !(c->flags & CLIENT_PENDING_WRITE)) { c->flags |= CLIENT_PENDING_WRITE; + AssertCorrectThread(c); listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); } } diff --git a/src/networking.cpp b/src/networking.cpp index f5b95b352..031fab5f9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -33,6 +33,7 @@ #include #include #include +#include static void setProtocolError(const char *errstr, client *c); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); @@ -145,6 +146,7 @@ client *createClient(int fd, int iel) { uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); c->iel = iel; + fastlock_init(&c->lock); c->id = client_id; c->resp = 2; c->fd = fd; @@ -164,6 +166,7 @@ client *createClient(int fd, int iel) { c->sentlen = 0; c->sentlenAsync = 0; c->flags = 0; + c->fPendingAsyncWrite = FALSE; c->ctime = c->lastinteraction = server.unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -223,7 +226,6 @@ void clientInstallWriteHandler(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ - serverAssert(aeThreadOwnsLock()); if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -242,8 +244,8 @@ void clientInstallWriteHandler(client *c) { void clientInstallAsyncWriteHandler(client *c) { serverAssert(aeThreadOwnsLock()); - if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) { - c->flags |= CLIENT_PENDING_ASYNCWRITE; + if (!(c->fPendingAsyncWrite)) { + c->fPendingAsyncWrite = TRUE; listAddNodeHead(serverTL->clients_pending_asyncwrite,c); } } @@ -271,8 +273,8 @@ void clientInstallAsyncWriteHandler(client *c) { * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(client *c, bool fAsync) { - serverAssert(aeThreadOwnsLock()); fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread + serverAssert(!fAsync || aeThreadOwnsLock()); /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ @@ -291,7 +293,7 @@ int prepareClientToWrite(client *c, bool fAsync) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c); - if (fAsync && !(c->flags & CLIENT_PENDING_ASYNCWRITE)) clientInstallAsyncWriteHandler(c); + if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -1213,7 +1215,7 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_UNBLOCKED; } - if (c->flags & CLIENT_PENDING_ASYNCWRITE) { + if (c->fPendingAsyncWrite) { ln = NULL; int iel = 0; for (; iel < server.cthreads; ++iel) @@ -1224,7 +1226,7 @@ void unlinkClient(client *c) { } serverAssert(ln != NULL); listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln); - c->flags &= ~CLIENT_PENDING_ASYNCWRITE; + c->fPendingAsyncWrite = FALSE; } } @@ -1329,6 +1331,7 @@ void freeClient(client *c) { zfree(c->argv); freeClientMultiState(c); sdsfree(c->peerid); + fastlock_free(&c->lock); zfree(c); } @@ -1381,6 +1384,7 @@ int writeToClient(int fd, client *c, int handler_installed) { // a transmission on another thread while transmitting the thread local buffer, resulting in us // overlapping messages AeLocker locker(true); + std::lock_guardlock)> lock(c->lock); // To prevent deadlocks this must be after we acquire the global lock int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0); if (!fSendAsyncBuffer) locker.disarm(); @@ -1525,8 +1529,9 @@ void ProcessPendingAsyncWrites() while(listLength(serverTL->clients_pending_asyncwrite)) { client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite)); + std::lock_guardlock)> lock(c->lock); - serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE); + serverAssert(c->fPendingAsyncWrite); // TODO: Append to end of reply block? @@ -1541,7 +1546,7 @@ void ProcessPendingAsyncWrites() c->buflenAsync = 0; zfree(c->bufAsync); c->bufAsync = nullptr; - c->flags &= ~CLIENT_PENDING_ASYNCWRITE; + c->fPendingAsyncWrite = FALSE; // Now install the write event handler int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; @@ -1572,8 +1577,6 @@ void ProcessPendingAsyncWrites() int handleClientsWithPendingWrites(int iel) { listIter li; listNode *ln; - - AeLocker locker(true); list *list = server.rgthreadvar[iel].clients_pending_write; int processed = listLength(list); @@ -1582,6 +1585,8 @@ int handleClientsWithPendingWrites(int iel) { listRewind(list,&li); while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); + std::lock_guardlock)> lock(c->lock); + c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(list,ln); AssertCorrectThread(c); @@ -1613,6 +1618,7 @@ int handleClientsWithPendingWrites(int iel) { } } + AeLocker locker(true); ProcessPendingAsyncWrites(); return processed; @@ -1921,8 +1927,8 @@ int processMultibulkBuffer(client *c) { * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { - server.current_client = c; - + AssertCorrectThread(c); + /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ @@ -1960,6 +1966,8 @@ void processInputBuffer(client *c) { } else { serverPanic("Unknown request type"); } + AeLocker locker(true); + server.current_client = c; /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { @@ -2007,8 +2015,10 @@ void processInputBufferAndReplicate(client *c) { processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { + aeAcquireLock(); replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); + aeReleaseLock(); sdsrange(c->pending_querybuf,applied,-1); } } @@ -2024,6 +2034,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverAssert(c->iel == ielFromEventLoop(el)); AeLocker locker; AssertCorrectThread(c); + std::lock_guardlock)> lock(c->lock); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2047,19 +2058,22 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen); - locker.arm(); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -2080,7 +2094,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } @@ -2091,7 +2107,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); + aeAcquireLock(); ProcessPendingAsyncWrites(); + aeReleaseLock(); } void getClientsMaxBuffers(unsigned long *longest_output_list, diff --git a/src/replication.cpp b/src/replication.cpp index 337cb0889..b51a674a9 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -864,7 +864,7 @@ void putSlaveOnline(client *slave) { slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ //AssertCorrectThread(slave); - if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, + if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, slave) == AE_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(slave); @@ -2288,7 +2288,7 @@ void replicationResurrectCachedMaster(int newfd) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ if (clientHasPendingReplies(server.master, TRUE)) { - if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE, + if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ diff --git a/src/server.c b/src/server.c index 44fa01e7f..3ee93d4fb 100644 --- a/src/server.c +++ b/src/server.c @@ -4839,7 +4839,7 @@ void *workerThreadMain(void *parg) int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = server.rgthreadvar[iel].el; - aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, /*isMainThread ? 0 : AE_SLEEP_THREADSAFE*/ 0); + aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0); aeMain(el); aeDeleteEventLoop(el); @@ -5017,7 +5017,7 @@ int main(int argc, char **argv) { initServer(); server.cthreads = 4; //testing - initNetworking(0 /* fReusePort */); + initNetworking(1 /* fReusePort */); if (background || server.pidfile) createPidFile(); redisSetProcTitle(argv[0]); diff --git a/src/server.h b/src/server.h index 155ac10db..db7f7366d 100644 --- a/src/server.h +++ b/src/server.h @@ -293,7 +293,6 @@ extern "C" { #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -#define CLIENT_PENDING_ASYNCWRITE (1<<29) /* client is in the async write list */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -833,6 +832,7 @@ typedef struct client { time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int flags; /* Client flags: CLIENT_* macros. */ + int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ @@ -873,6 +873,7 @@ typedef struct client { list *listbufferDoneAsync; int iel; /* the event loop index we're registered with */ + struct fastlock lock; } client; struct saveparam {