diff --git a/src/ae.cpp b/src/ae.cpp index 490826b8e..26b6085e9 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -109,9 +109,8 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) auto cb = read(fd, &cmd, sizeof(aeCommand)); if (cb != sizeof(cmd)) { - if (errno == EAGAIN) - break; - fprintf(stderr, "Failed to read pipe.\n"); + AE_ASSERT(errno == EAGAIN); + break; } switch (cmd.op) { @@ -167,6 +166,8 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, { if (eventLoop == g_eventLoopThisThread) return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); + + int ret = AE_OK; aeCommand cmd; cmd.op = AE_ASYNC_OP::CreateFileEvent; @@ -182,14 +183,19 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) cmd.pctl->mutexcv.lock(); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); - int ret = AE_OK; + if (size != sizeof(cmd)) + { + AE_ASSERT(errno == EAGAIN); + ret = AE_ERR; + } + if (fSynchronous) { cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; delete cmd.pctl; } + return ret; } @@ -265,7 +271,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { if (pipe(rgfd) < 0) goto err; eventLoop->fdCmdRead = rgfd[0]; - eventLoop->fdCmdWrite = rgfd[1];; + eventLoop->fdCmdWrite = rgfd[1]; fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK); fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); eventLoop->cevents = 0; @@ -389,6 +395,7 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) } extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { + //AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); if (fd >= eventLoop->setsize) return 0; aeFileEvent *fe = &eventLoop->events[fd]; @@ -468,6 +475,7 @@ extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) */ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; @@ -629,6 +637,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); int processed = 0, numevents; /* Nothing to do? return ASAP */ diff --git a/src/aof.c b/src/aof.c index b0f52e2a6..7d53bdfdb 100644 --- a/src/aof.c +++ b/src/aof.c @@ -96,7 +96,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; - serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el); + serverAssert(aeThreadOwnsLock()); UNUSED(el); UNUSED(fd); @@ -123,15 +123,6 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } -static void QueueAofPipeWrite(void *arg) -{ - UNUSED(arg); - if (aeGetFileEvents(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.aof_pipe_write_data_to_child, - AE_WRITABLE, aofChildWriteDiffData, NULL); - } -} - /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); @@ -173,7 +164,10 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, QueueAofPipeWrite, NULL); + if (aeGetFileEvents(serverTL->el,server.aof_pipe_write_data_to_child) == 0) { + aeCreateFileEvent(serverTL->el, server.aof_pipe_write_data_to_child, + AE_WRITABLE, aofChildWriteDiffData, NULL); + } } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -691,6 +685,7 @@ int loadAppendOnlyFile(char *filename) { long loops = 0; off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */ off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */ + serverAssert(serverTL != NULL); // This happens early in boot, ensure serverTL was setup if (fp == NULL) { serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); @@ -747,7 +742,7 @@ int loadAppendOnlyFile(char *filename) { /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); - processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); + processEventsWhileBlocked(serverTL - server.rgthreadvar); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -1497,12 +1492,13 @@ int aofCreatePipes(void) { /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; - if (aeCreateRemoteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL, TRUE) == AE_ERR) goto error; + if (aeCreateFileEvent(serverTL->el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; server.aof_pipe_write_ack_to_parent = fds[3]; server.aof_pipe_read_ack_from_child = fds[2]; + server.el_alf_pip_read_ack_from_child = serverTL->el; server.aof_pipe_write_ack_to_child = fds[5]; server.aof_pipe_read_ack_from_parent = fds[4]; server.aof_stop_sending_diff = 0; @@ -1516,8 +1512,8 @@ error: } void aofClosePipes(void) { - aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE); + aeDeleteFileEventAsync(server.el_alf_pip_read_ack_from_child,server.aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEventAsync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE); close(server.aof_pipe_write_data_to_child); close(server.aof_pipe_read_data_from_parent); close(server.aof_pipe_write_ack_to_parent); diff --git a/src/blocked.c b/src/blocked.c index d96e817d9..05fd9ee8a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -100,6 +100,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { + serverAssert(aeThreadOwnsLock()); c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; @@ -122,6 +123,7 @@ void processUnblockedClients(int iel) { serverAssert(ln != NULL); c = ln->value; listDelNode(unblocked_clients,ln); + AssertCorrectThread(c); c->flags &= ~CLIENT_UNBLOCKED; /* Process remaining data in the input buffer, unless the client @@ -165,6 +167,7 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { + serverAssert(aeThreadOwnsLock()); if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { @@ -210,6 +213,7 @@ void replyToBlockedClientTimedOut(client *c) { * The semantics is to send an -UNBLOCKED error to the client, disconnecting * it at the same time. */ void disconnectAllBlockedClients(void) { + serverAssert(aeThreadOwnsLock()); listNode *ln; listIter li; diff --git a/src/cluster.c b/src/cluster.c index 5dc0abb2b..08beda483 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5390,6 +5390,7 @@ socket_err: * the target instance. See the Redis Cluster specification for more * information. */ void askingCommand(client *c) { + serverAssert(aeThreadOwnsLock()); if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); return; @@ -5402,6 +5403,7 @@ void askingCommand(client *c) { * In this mode slaves will not redirect clients as long as clients access * with read-only commands to keys that are served by the slave's master. */ void readonlyCommand(client *c) { + serverAssert(aeThreadOwnsLock()); if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); return; @@ -5412,6 +5414,7 @@ void readonlyCommand(client *c) { /* The READWRITE command just clears the READONLY command state. */ void readwriteCommand(client *c) { + serverAssert(aeThreadOwnsLock()); c->flags &= ~CLIENT_READONLY; addReply(c,shared.ok); } @@ -5455,6 +5458,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in multiState *ms, _ms; multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; + serverAssert(aeThreadOwnsLock()); /* Allow any key to be set if a module disabled cluster redirections. */ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) @@ -5663,6 +5667,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * longer handles, the client is sent a redirection error, and the function * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { + serverAssert(aeThreadOwnsLock()); if (c->flags & CLIENT_BLOCKED && (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || diff --git a/src/config.c b/src/config.c index 721f343b1..c9fd2dc74 100644 --- a/src/config.c +++ b/src/config.c @@ -961,6 +961,7 @@ void configSetCommand(client *c) { /* Try to check if the OS is capable of supporting so many FDs. */ server.maxclients = ll; + serverAssert(FALSE); if (ll > orig_value) { adjustOpenFilesLimit(); if (server.maxclients != ll) { diff --git a/src/db.c b/src/db.c index c8553c985..c31d4898e 100644 --- a/src/db.c +++ b/src/db.c @@ -99,6 +99,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * expiring our key via DELs in the replication link. */ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { robj *val; + serverAssert(aeThreadOwnsLock()); if (expireIfNeeded(db,key) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() @@ -1072,6 +1073,7 @@ int removeExpire(redisDb *db, robj *key) { * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; + serverAssert(aeThreadOwnsLock()); /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->pdict,ptrFromObj(key)); diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4f9af9ab5..ced0b3e31 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -3,6 +3,7 @@ #include #include #include +#include static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); @@ -22,18 +23,23 @@ extern "C" void fastlock_init(struct fastlock *lock) extern "C" void fastlock_lock(struct fastlock *lock) { - if (lock->m_pidOwner == gettid()) + if (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1)) { - ++lock->m_depth; - return; + if (lock->m_pidOwner == gettid()) + { + ++lock->m_depth; + return; + } + + while (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1)) + { + sched_yield(); + } } - while (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1)) - { - sched_yield(); - } lock->m_depth = 1; lock->m_pidOwner = gettid(); + __sync_synchronize(); } extern "C" void fastlock_unlock(struct fastlock *lock) @@ -42,7 +48,9 @@ extern "C" void fastlock_unlock(struct fastlock *lock) if (lock->m_depth == 0) { lock->m_pidOwner = -1; - __sync_bool_compare_and_swap(&lock->m_lock, 1, 0); + __sync_synchronize(); + if (!__sync_bool_compare_and_swap(&lock->m_lock, 1, 0)) + *((volatile int*)0) = -1; } } @@ -55,5 +63,10 @@ extern "C" void fastlock_free(struct fastlock *lock) bool fastlock::fOwnLock() { + if (__sync_bool_compare_and_swap(&m_lock, 0, 1)) + { + __sync_bool_compare_and_swap(&m_lock, 1, 0); + return false; // it was never locked + } return gettid() == m_pidOwner; } \ No newline at end of file diff --git a/src/module.c b/src/module.c index b932a8da3..756fe5538 100644 --- a/src/module.c +++ b/src/module.c @@ -484,6 +484,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) { * details needed to correctly replicate commands. */ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { client *c = ctx->client; + serverAssert(aeThreadOwnsLock()); if (c->flags & CLIENT_LUA) return; @@ -3623,6 +3624,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec void moduleHandleBlockedClients(void) { listNode *ln; RedisModuleBlockedClient *bc; + serverAssert(aeThreadOwnsLock()); pthread_mutex_lock(&moduleUnblockedClientsMutex); /* Here we unblock all the pending clients blocked in modules operations diff --git a/src/multi.c b/src/multi.c index 6d722b8af..4f7711f6c 100644 --- a/src/multi.c +++ b/src/multi.c @@ -72,6 +72,7 @@ void queueMultiCommand(client *c) { } void discardTransaction(client *c) { + serverAssert(aeThreadOwnsLock()); freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); @@ -81,11 +82,13 @@ void discardTransaction(client *c) { /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(client *c) { + serverAssert(aeThreadOwnsLock()); if (c->flags & CLIENT_MULTI) c->flags |= CLIENT_DIRTY_EXEC; } void multiCommand(client *c) { + serverAssert(aeThreadOwnsLock()); if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; @@ -291,6 +294,7 @@ void unwatchAllKeys(client *c) { /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { + serverAssert(aeThreadOwnsLock()); list *clients; listIter li; listNode *ln; @@ -316,6 +320,7 @@ void touchWatchedKey(redisDb *db, robj *key) { void touchWatchedKeysOnFlush(int dbid) { listIter li1, li2; listNode *ln; + serverAssert(aeThreadOwnsLock()); /* For every client, check all the waited keys */ listRewind(server.clients,&li1); @@ -350,6 +355,7 @@ void watchCommand(client *c) { void unwatchCommand(client *c) { unwatchAllKeys(c); + serverAssert(aeThreadOwnsLock()); c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } diff --git a/src/networking.cpp b/src/networking.cpp index 659fb65ba..f5b95b352 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -51,12 +51,16 @@ public: void arm() { - m_fArmed = true; - aeAcquireLock(); + if (!m_fArmed) + { + m_fArmed = true; + aeAcquireLock(); + } } void disarm() { + serverAssert(m_fArmed); m_fArmed = false; aeReleaseLock(); } @@ -118,6 +122,7 @@ void linkClient(client *c) { client *createClient(int fd, int iel) { client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL); + c->iel = iel; /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other @@ -203,6 +208,7 @@ client *createClient(int fd, int iel) { listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) linkClient(c); initClientMultiState(c); + AssertCorrectThread(c); return c; } @@ -217,6 +223,7 @@ void clientInstallWriteHandler(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the slave can actually receive * writes at this stage. */ + serverAssert(aeThreadOwnsLock()); if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -234,6 +241,7 @@ void clientInstallWriteHandler(client *c) { } void clientInstallAsyncWriteHandler(client *c) { + serverAssert(aeThreadOwnsLock()); if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) { c->flags |= CLIENT_PENDING_ASYNCWRITE; listAddNodeHead(serverTL->clients_pending_asyncwrite,c); @@ -263,6 +271,7 @@ void clientInstallAsyncWriteHandler(client *c) { * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(client *c, bool fAsync) { + serverAssert(aeThreadOwnsLock()); fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread /* If it's the Lua client we always return ok without installing any @@ -302,7 +311,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { if ((c->buflenAsync - c->bufposAsync) < (int)len) { int minsize = len + c->bufposAsync; - c->buflenAsync = std::max(minsize, c->buflenAsync*2); + c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync); c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); } memcpy(c->bufAsync+c->bufposAsync,s,len); @@ -595,7 +604,7 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi serverAssert(idxSplice <= c->bufposAsync); if (c->buflenAsync < (c->bufposAsync + lenstr_len)) { - c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2); + c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2 - c->buflenAsync); c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); } @@ -713,13 +722,21 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { addReplyLongLongWithPrefixCore(c, ll, prefix, false); } -void addReplyLongLong(client *c, long long ll) { +void addReplyLongLongCore(client *c, long long ll, bool fAsync) { if (ll == 0) - addReply(c,shared.czero); + addReplyCore(c,shared.czero, fAsync); else if (ll == 1) - addReply(c,shared.cone); + addReplyCore(c,shared.cone, fAsync); else - addReplyLongLongWithPrefix(c,ll,':'); + addReplyLongLongWithPrefixCore(c,ll,':', fAsync); +} + +void addReplyLongLong(client *c, long long ll) { + addReplyLongLongCore(c, ll, false); +} + +void addReplyLongLongAsync(client *c, long long ll) { + addReplyLongLongCore(c, ll, true); } void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) { @@ -1155,6 +1172,7 @@ void disconnectSlaves(void) { void unlinkClient(client *c) { listNode *ln; AssertCorrectThread(c); + serverAssert(aeThreadOwnsLock()); /* If this is marked as current client unset it. */ if (server.current_client == c) server.current_client = NULL; @@ -1163,8 +1181,6 @@ void unlinkClient(client *c) { * If the client was already unlinked or if it's a "fake client" the * fd is already set to -1. */ if (c->fd != -1) { - AssertCorrectThread(c); - /* Remove from the list of active clients. */ if (c->client_list_node) { uint64_t id = htonu64(c->id); @@ -1191,7 +1207,6 @@ void unlinkClient(client *c) { /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { - AssertCorrectThread(c); ln = listSearchKey(server.rgthreadvar[c->iel].unblocked_clients,c); serverAssert(ln != NULL); listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln); @@ -1199,9 +1214,16 @@ void unlinkClient(client *c) { } if (c->flags & CLIENT_PENDING_ASYNCWRITE) { - ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_asyncwrite,c); + ln = NULL; + int iel = 0; + for (; iel < server.cthreads; ++iel) + { + ln = listSearchKey(server.rgthreadvar[iel].clients_pending_asyncwrite,c); + if (ln) + break; + } serverAssert(ln != NULL); - listDelNode(server.rgthreadvar[c->iel].clients_pending_asyncwrite,ln); + listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln); c->flags &= ~CLIENT_PENDING_ASYNCWRITE; } } @@ -1316,8 +1338,8 @@ void freeClient(client *c) { * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; - c->flags |= CLIENT_CLOSE_ASAP; aeAcquireLock(); + c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(server.clients_to_close,c); aeReleaseLock(); } @@ -1368,6 +1390,11 @@ int writeToClient(int fd, client *c, int handler_installed) { o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync)); if (o->used == 0) { listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync)); + if (listLength(c->listbufferDoneAsync) == 0) + { + fSendAsyncBuffer = 0; + locker.disarm(); + } continue; } @@ -1485,17 +1512,59 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Write event handler. Just send data to the client. */ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); - AeLocker locker; client *c = (client*)privdata; serverAssert(ielFromEventLoop(el) == c->iel); - - if (c->flags | CLIENT_SLAVE) - locker.arm(); - writeToClient(fd,c,1); } +void ProcessPendingAsyncWrites() +{ + serverAssert(aeThreadOwnsLock()); + + while(listLength(serverTL->clients_pending_asyncwrite)) { + client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); + listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite)); + + serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE); + + // TODO: Append to end of reply block? + + size_t size = c->bufposAsync; + clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL); + /* take over the allocation's internal fragmentation */ + reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock); + reply->used = c->bufposAsync; + memcpy(reply->buf(), c->bufAsync, c->bufposAsync); + listAddNodeTail(c->listbufferDoneAsync, reply); + c->bufposAsync = 0; + c->buflenAsync = 0; + zfree(c->bufAsync); + c->bufAsync = nullptr; + c->flags &= ~CLIENT_PENDING_ASYNCWRITE; + + // Now install the write event handler + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. AE_BARRIER ensures that. */ + if (server.aof_state == AOF_ON && + server.aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_flags |= AE_BARRIER; + } + + if (!((c->replstate == REPL_STATE_NONE || + (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) + continue; + + if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) + continue; // We can retry later in the cron + } +} + /* This function is called just before entering the event loop, in the hope * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, @@ -1504,8 +1573,11 @@ int handleClientsWithPendingWrites(int iel) { listIter li; listNode *ln; + AeLocker locker(true); + list *list = server.rgthreadvar[iel].clients_pending_write; int processed = listLength(list); + serverAssert(iel == (serverTL - server.rgthreadvar)); listRewind(list,&li); while((ln = listNext(&li))) { @@ -1524,7 +1596,7 @@ int handleClientsWithPendingWrites(int iel) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c, TRUE)) { - int ae_flags = AE_WRITABLE; + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it @@ -1540,6 +1612,9 @@ int handleClientsWithPendingWrites(int iel) { freeClientAsync(c); } } + + ProcessPendingAsyncWrites(); + return processed; } @@ -1947,10 +2022,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); serverAssert(mask & AE_READ_THREADSAFE); serverAssert(c->iel == ielFromEventLoop(el)); - - AeLocker lockerSlave; - if (c->flags & CLIENT_SLAVE) // slaves are not async capable - lockerSlave.arm(); + AeLocker locker; + AssertCorrectThread(c); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -1974,22 +2047,19 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen); + locker.arm(); if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - aeAcquireLock(); freeClient(c); - aeReleaseLock(); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - aeAcquireLock(); freeClient(c); - aeReleaseLock(); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -2010,9 +2080,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); - aeAcquireLock(); freeClient(c); - aeReleaseLock(); return; } @@ -2022,9 +2090,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ - aeAcquireLock(); processInputBufferAndReplicate(c); - aeReleaseLock(); + ProcessPendingAsyncWrites(); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2649,6 +2716,9 @@ void flushSlavesOutputBuffers(void) { client *slave = (client*)listNodeValue(ln); int events; + if (!FCorrectThread(slave)) + continue; // we cannot synchronously flush other thread's clients + /* Note that the following will not flush output buffers of slaves * in STATE_ONLINE but having put_online_on_ack set to true: in this * case the writable event is never installed, since the purpose diff --git a/src/object.c b/src/object.c index 14c43fe1a..ec211cbf0 100644 --- a/src/object.c +++ b/src/object.c @@ -394,7 +394,7 @@ robj *resetRefCount(robj *obj) { int checkType(client *c, robj *o, int type) { if (o->type != type) { - addReply(c,shared.wrongtypeerr); + addReplyAsync(c,shared.wrongtypeerr); return 1; } return 0; diff --git a/src/pubsub.c b/src/pubsub.c index 0fee17bdc..ccd631c28 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -325,6 +325,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { void subscribeCommand(client *c) { int j; + serverAssert(aeThreadOwnsLock()); for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); @@ -345,6 +346,7 @@ void unsubscribeCommand(client *c) { void psubscribeCommand(client *c) { int j; + serverAssert(aeThreadOwnsLock()); for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); diff --git a/src/rdb.c b/src/rdb.c index 590cc7cd5..f863da6f5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1862,7 +1862,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); - processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); + processEventsWhileBlocked(serverTL - server.rgthreadvar); } } diff --git a/src/replication.cpp b/src/replication.cpp index 179a48028..337cb0889 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -124,6 +124,7 @@ void freeReplicationBacklog(void) { * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ void feedReplicationBacklog(void *ptr, size_t len) { + serverAssert(aeThreadOwnsLock()); unsigned char *p = (unsigned char*)ptr; server.master_repl_offset += len; @@ -175,6 +176,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listIter li; int j, len; char llstr[LONG_STR_SIZE]; + serverAssert(aeThreadOwnsLock()); /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to @@ -310,6 +312,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, sds cmdrepr = sdsnew("+"); robj *cmdobj; struct timeval tv; + serverAssert(aeThreadOwnsLock()); gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); @@ -717,6 +720,7 @@ void syncCommand(client *c) { slave = (client*)ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; } + /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { @@ -883,6 +887,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { * replication process. Currently the preamble is just the bulk count of * the file in the form "$\r\n". */ if (slave->replpreamble) { + serverAssert(slave->replpreamble[0] == '$'); nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); if (nwritten == -1) { serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s", @@ -942,13 +947,14 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { * otherwise C_ERR is passed to the function. * The 'type' argument is the type of the child that terminated * (if it had a disk or socket target). */ -void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel); void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; listIter li; int startbgsave = 0; int mincapa = -1; + serverAssert(aeThreadOwnsLock()); + listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = (client*)ln->value; @@ -957,34 +963,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) startbgsave = 1; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa); - } - } - - for (int iel = 0; iel < server.cthreads; ++iel) - { - if (iel == IDX_EVENT_LOOP_MAIN) - updateSlavesWaitingBgsaveThread(bgsaveerr, type, iel); - else - aePostFunction(server.rgthreadvar[iel].el, [=]{ - updateSlavesWaitingBgsaveThread(bgsaveerr, type, iel); - }); - } - - if (startbgsave) - startBgsaveForReplication(mincapa); -} - -void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) { - listNode *ln; - listIter li; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = (client*)ln->value; - if (slave->iel != iel) - continue; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { + } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { struct redis_stat buf; /* If this was an RDB on disk save, we have to prepare to send @@ -1006,13 +985,19 @@ void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) { slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ } else { if (bgsaveerr != C_OK) { - freeClient(slave); + if (FCorrectThread(slave)) + freeClient(slave); + else + freeClientAsync(slave); serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { - freeClient(slave); + if (FCorrectThread(slave)) + freeClient(slave); + else + freeClientAsync(slave); serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } @@ -1022,14 +1007,28 @@ void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) { slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", (unsigned long long) slave->repldbsize); - aeDeleteFileEvent(server.rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { - freeClient(slave); - continue; + if (FCorrectThread(slave)) + { + aeDeleteFileEvent(server.rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + freeClient(slave); + } + } + else + { + aePostFunction(server.rgthreadvar[slave->iel].el, [slave]{ + aeDeleteFileEvent(server.rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + freeClient(slave); + } + }); } } } } + + if (startbgsave) + startBgsaveForReplication(mincapa); } /* Change the current instance replication ID with a new, random one. @@ -1107,7 +1106,7 @@ void replicationEmptyDbCallback(void *privdata) { * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ void replicationCreateMasterClient(int fd, int dbid) { - server.master = createClient(fd, IDX_EVENT_LOOP_MAIN); + server.master = createClient(fd, serverTL - server.rgthreadvar); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; @@ -1144,6 +1143,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(privdata); UNUSED(mask); + serverAssert(aeThreadOwnsLock()); + /* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; @@ -2042,7 +2043,6 @@ void replicationHandleMasterDisconnection(void) { * the slaves only if we'll have to do a full resync with our master. */ } -void replicaofCommandCore(client *c); void replicaofCommand(client *c) { // Changing the master needs to be done on the main thread. @@ -2053,23 +2053,6 @@ void replicaofCommand(client *c) { return; } - if ((serverTL - server.rgthreadvar) == IDX_EVENT_LOOP_MAIN) - { - replicaofCommandCore(c); - } - else - { - aeReleaseLock(); - aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [=]{ - replicaofCommandCore(c); - }, true /*fSync*/); - aeAcquireLock(); - } -} - -void replicaofCommandCore(client *c) { - - /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && @@ -2164,7 +2147,8 @@ void roleCommand(client *c) { /* Send a REPLCONF ACK command to the master to inform it about the current * processed offset. If we are not connected with a master, the command has * no effects. */ -void replicationSendAck(void) { +void replicationSendAck(void) +{ client *c = server.master; if (c != NULL) { @@ -2200,6 +2184,7 @@ void replicationSendAck(void) { void replicationCacheMaster(client *c) { serverAssert(server.master != NULL && server.cached_master == NULL); serverLog(LL_NOTICE,"Caching the disconnected master state."); + AssertCorrectThread(c); /* Unlink the client from the server structures. */ unlinkClient(c); @@ -2288,9 +2273,13 @@ void replicationResurrectCachedMaster(int newfd) { server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* Normally changing the thread of a client is a BIG NONO, + but this client was unlinked so its OK here */ + server.master->iel = serverTL - server.rgthreadvar; // martial to this thread + /* Re-add to the list of clients. */ linkClient(server.master); - if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, newfd, AE_READABLE|AE_READ_THREADSAFE, + if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_READABLE|AE_READ_THREADSAFE, readQueryFromClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ @@ -2299,7 +2288,7 @@ void replicationResurrectCachedMaster(int newfd) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ if (clientHasPendingReplies(server.master, TRUE)) { - if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, newfd, AE_WRITABLE, + if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ @@ -2535,7 +2524,7 @@ void processClientsWaitingReplicas(void) { last_numreplicas > c->bpop.numreplicas) { unblockClient(c); - addReplyLongLong(c,last_numreplicas); + addReplyLongLongAsync(c,last_numreplicas); } else { int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); @@ -2543,7 +2532,7 @@ void processClientsWaitingReplicas(void) { last_offset = c->bpop.reploffset; last_numreplicas = numreplicas; unblockClient(c); - addReplyLongLong(c,numreplicas); + addReplyLongLongAsync(c,numreplicas); } } } @@ -2680,7 +2669,7 @@ void replicationCron(void) { { serverLog(LL_WARNING, "Disconnecting timedout replica: %s", replicationGetSlaveName(slave)); - freeClient(slave); + freeClientAsync(slave); } } } diff --git a/src/scripting.c b/src/scripting.c index a3d290bbf..87577fb7d 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -370,6 +370,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { sds reply; // Ensure our client is on the right thread + serverAssert(!(c->flags & CLIENT_PENDING_WRITE)); + serverAssert(!(c->flags & CLIENT_UNBLOCKED)); + serverAssert(aeThreadOwnsLock()); c->iel = serverTL - server.rgthreadvar; /* Cached across calls. */ diff --git a/src/sentinel.c b/src/sentinel.c index 4ad79fbac..c3c905058 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -3976,6 +3976,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { /* Setup the master state to start a failover. */ void sentinelStartFailover(sentinelRedisInstance *master) { + serverAssert(aeThreadOwnsLock()); serverAssert(master->flags & SRI_MASTER); master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; @@ -4168,6 +4169,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { } void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { + serverAssert(aeThreadOwnsLock()); sentinelRedisInstance *slave = sentinelSelectSlave(ri); /* We don't handle the timeout in this state as the function aborts @@ -4292,6 +4294,7 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; int in_progress = 0; + serverAssert(aeThreadOwnsLock()); di = dictGetIterator(master->slaves); while((de = dictNext(di)) != NULL) { diff --git a/src/server.c b/src/server.c index a2fd4b337..44fa01e7f 100644 --- a/src/server.c +++ b/src/server.c @@ -1526,6 +1526,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + AssertCorrectThread(c); size_t querybuf_size = sdsAllocSize(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -1620,16 +1621,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { *out_usage = o; } - -void AsyncClientCron(void *pv) -{ - client *c = (client*)pv; - mstime_t now = mstime(); - if (clientsCronHandleTimeout(c,now)) return; - if (clientsCronResizeQueryBuffer(c)) return; - if (clientsCronTrackExpansiveClients(c)) return; -} - /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance * we use this function in order to disconnect clients after a timeout, including @@ -1646,7 +1637,7 @@ void AsyncClientCron(void *pv) * of clients per second, turning this function into a source of latency. */ #define CLIENTS_CRON_MIN_ITERATIONS 5 -void clientsCron(void) { +void clientsCron(int iel) { /* Try to process at least numclients/server.hz of clients * per call. Since normally (if there are no big latency events) this * function is called server.hz times per second, in the average case we @@ -1672,7 +1663,7 @@ void clientsCron(void) { listRotate(server.clients); head = listFirst(server.clients); c = listNodeValue(head); - if (c->iel == IDX_EVENT_LOOP_MAIN) + if (c->iel == iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was @@ -1680,12 +1671,7 @@ void clientsCron(void) { if (clientsCronHandleTimeout(c,now)) continue; if (clientsCronResizeQueryBuffer(c)) continue; if (clientsCronTrackExpansiveClients(c)) continue; - } - else if (IDX_EVENT_LOOP_MAIN > 1) - { - aePostFunction(server.rgthreadvar[c->iel].el, AsyncClientCron, c); - } - + } } } @@ -1787,6 +1773,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); + ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up + /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); @@ -1898,7 +1886,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* We need to do a few operations on clients asynchronously. */ - clientsCron(); + clientsCron(IDX_EVENT_LOOP_MAIN); /* Handle background operations on Redis databases. */ databasesCron(); @@ -2055,7 +2043,13 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); + + aeAcquireLock(); + ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves + clientsCron(iel); + freeClientsInAsyncFreeQueue(iel); + aeReleaseLock(); return 1000/server.hz; } @@ -2126,13 +2120,12 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) int iel = ielFromEventLoop(eventLoop); /* Try to process pending commands for clients that were just unblocked. */ + aeAcquireLock(); if (listLength(server.rgthreadvar[iel].unblocked_clients)) { - aeAcquireLock(); processUnblockedClients(iel); - aeReleaseLock(); } - - + aeReleaseLock(); + /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); } @@ -2752,9 +2745,18 @@ void resetServerStats(void) { static void initNetworkingThread(int iel, int fReusePort) { /* Open the TCP listening socket for the user commands. */ - if (server.port != 0 && - listenToPort(server.port,server.rgthreadvar[iel].ipfd,&server.rgthreadvar[iel].ipfd_count, fReusePort) == C_ERR) - exit(1); + if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN)) + { + if (server.port != 0 && + listenToPort(server.port,server.rgthreadvar[iel].ipfd,&server.rgthreadvar[iel].ipfd_count, fReusePort) == C_ERR) + exit(1); + } + else + { + // We use the main threads file descriptors + memcpy(server.rgthreadvar[iel].ipfd, server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd, sizeof(int)*CONFIG_BINDADDR_MAX); + server.rgthreadvar[iel].ipfd_count = server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count; + } /* Create an event handler for accepting new connections in TCP */ for (int j = 0; j < server.rgthreadvar[iel].ipfd_count; j++) { @@ -3181,44 +3183,6 @@ void preventCommandReplication(client *c) { c->flags |= CLIENT_PREVENT_REPL_PROP; } -void ProcessPendingAsyncWrites() -{ - while(listLength(serverTL->clients_pending_asyncwrite)) { - client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); - listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite)); - - serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE); - - // TODO: Append to end of reply block? - - size_t size = c->bufposAsync; - clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL); - /* take over the allocation's internal fragmentation */ - reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock); - reply->used = c->bufposAsync; - memcpy(reply->buf, c->bufAsync, c->bufposAsync); - listAddNodeTail(c->listbufferDoneAsync, reply); - c->bufposAsync = 0; - c->flags &= ~CLIENT_PENDING_ASYNCWRITE; - - // Now install the write event handler - int ae_flags = AE_WRITABLE; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ - if (server.aof_state == AOF_ON && - server.aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_flags |= AE_BARRIER; - } - - if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) - freeClientAsync(c); - } -} - /* Call() is the core of Redis execution of a command. * * The following flags can be passed: @@ -3260,6 +3224,7 @@ void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; + serverAssert(aeThreadOwnsLock()); /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ @@ -3389,6 +3354,7 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { + serverAssert(aeThreadOwnsLock()); /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in @@ -4517,6 +4483,7 @@ void infoCommand(client *c) { void monitorCommand(client *c) { /* ignore MONITOR if already slave or in monitor mode */ + serverAssert(aeThreadOwnsLock()); if (c->flags & CLIENT_SLAVE) return; c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); @@ -4872,7 +4839,7 @@ void *workerThreadMain(void *parg) int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = server.rgthreadvar[iel].el; - aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); + aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, /*isMainThread ? 0 : AE_SLEEP_THREADSAFE*/ 0); aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0); aeMain(el); aeDeleteEventLoop(el); @@ -4932,6 +4899,9 @@ int main(int argc, char **argv) { { initServerThread(server.rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); } + serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN]; + aeAcquireLock(); // We own the lock on boot + ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); @@ -5046,9 +5016,8 @@ int main(int argc, char **argv) { initServer(); - server.cthreads = 1; //testing - initNetworking(1 /* fReusePort */); - serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN]; + server.cthreads = 4; //testing + initNetworking(0 /* fReusePort */); if (background || server.pidfile) createPidFile(); redisSetProcTitle(argv[0]); @@ -5089,6 +5058,8 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } + aeReleaseLock(); //Finally we can dump the lock + serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; for (int iel = 0; iel < server.cthreads; ++iel) diff --git a/src/server.h b/src/server.h index 10fa38c55..155ac10db 100644 --- a/src/server.h +++ b/src/server.h @@ -1205,6 +1205,7 @@ struct redisServer { int aof_pipe_read_data_from_parent; int aof_pipe_write_ack_to_parent; int aof_pipe_read_ack_from_child; + aeEventLoop *el_alf_pip_read_ack_from_child; int aof_pipe_write_ack_to_child; int aof_pipe_read_ack_from_parent; int aof_stop_sending_diff; /* If true stop sending accumulated diffs @@ -1640,6 +1641,7 @@ void setDeferredArrayLenAsync(client *c, void *node, long length); void addReplySdsAsync(client *c, sds s); void addReplyBulkSdsAsync(client *c, sds s); void addReplyPushLenAsync(client *c, long length); +void addReplyLongLongAsync(client *c, long long ll); void ProcessPendingAsyncWrites(void);