Multithreading works!

This commit is contained in:
John Sully 2019-02-20 01:20:26 -05:00
parent 357a9e92e2
commit 29c1105132
18 changed files with 269 additions and 191 deletions

View File

@ -109,9 +109,8 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
auto cb = read(fd, &cmd, sizeof(aeCommand));
if (cb != sizeof(cmd))
{
if (errno == EAGAIN)
AE_ASSERT(errno == EAGAIN);
break;
fprintf(stderr, "Failed to read pipe.\n");
}
switch (cmd.op)
{
@ -168,6 +167,8 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
if (eventLoop == g_eventLoopThisThread)
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
int ret = AE_OK;
aeCommand cmd;
cmd.op = AE_ASYNC_OP::CreateFileEvent;
cmd.fd = fd;
@ -182,14 +183,19 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
if (fSynchronous)
cmd.pctl->mutexcv.lock();
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(size == sizeof(cmd));
int ret = AE_OK;
if (size != sizeof(cmd))
{
AE_ASSERT(errno == EAGAIN);
ret = AE_ERR;
}
if (fSynchronous)
{
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
delete cmd.pctl;
}
return ret;
}
@ -265,7 +271,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (pipe(rgfd) < 0)
goto err;
eventLoop->fdCmdRead = rgfd[0];
eventLoop->fdCmdWrite = rgfd[1];;
eventLoop->fdCmdWrite = rgfd[1];
fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK);
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
eventLoop->cevents = 0;
@ -389,6 +395,7 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
}
extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
//AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];
@ -468,6 +475,7 @@ extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
@ -629,6 +637,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
int processed = 0, numevents;
/* Nothing to do? return ASAP */

View File

@ -96,7 +96,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el);
serverAssert(aeThreadOwnsLock());
UNUSED(el);
UNUSED(fd);
@ -123,15 +123,6 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
static void QueueAofPipeWrite(void *arg)
{
UNUSED(arg);
if (aeGetFileEvents(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
@ -173,7 +164,10 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
/* Install a file event to send data to the rewrite child if there is
* not one already. */
aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, QueueAofPipeWrite, NULL);
if (aeGetFileEvents(serverTL->el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(serverTL->el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
@ -691,6 +685,7 @@ int loadAppendOnlyFile(char *filename) {
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
serverAssert(serverTL != NULL); // This happens early in boot, ensure serverTL was setup
if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
@ -747,7 +742,7 @@ int loadAppendOnlyFile(char *filename) {
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
processEventsWhileBlocked(serverTL - server.rgthreadvar);
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
@ -1497,12 +1492,13 @@ int aofCreatePipes(void) {
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateRemoteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL, TRUE) == AE_ERR) goto error;
if (aeCreateFileEvent(serverTL->el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.el_alf_pip_read_ack_from_child = serverTL->el;
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
@ -1516,8 +1512,8 @@ error:
}
void aofClosePipes(void) {
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE);
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
aeDeleteFileEventAsync(server.el_alf_pip_read_ack_from_child,server.aof_pipe_read_ack_from_child,AE_READABLE);
aeDeleteFileEventAsync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
close(server.aof_pipe_write_data_to_child);
close(server.aof_pipe_read_data_from_parent);
close(server.aof_pipe_write_ack_to_parent);

View File

@ -100,6 +100,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
serverAssert(aeThreadOwnsLock());
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
@ -122,6 +123,7 @@ void processUnblockedClients(int iel) {
serverAssert(ln != NULL);
c = ln->value;
listDelNode(unblocked_clients,ln);
AssertCorrectThread(c);
c->flags &= ~CLIENT_UNBLOCKED;
/* Process remaining data in the input buffer, unless the client
@ -165,6 +167,7 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
serverAssert(aeThreadOwnsLock());
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
@ -210,6 +213,7 @@ void replyToBlockedClientTimedOut(client *c) {
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
serverAssert(aeThreadOwnsLock());
listNode *ln;
listIter li;

View File

@ -5390,6 +5390,7 @@ socket_err:
* the target instance. See the Redis Cluster specification for more
* information. */
void askingCommand(client *c) {
serverAssert(aeThreadOwnsLock());
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
@ -5402,6 +5403,7 @@ void askingCommand(client *c) {
* In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */
void readonlyCommand(client *c) {
serverAssert(aeThreadOwnsLock());
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
@ -5412,6 +5414,7 @@ void readonlyCommand(client *c) {
/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
serverAssert(aeThreadOwnsLock());
c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok);
}
@ -5455,6 +5458,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
serverAssert(aeThreadOwnsLock());
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
@ -5663,6 +5667,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* longer handles, the client is sent a redirection error, and the function
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
serverAssert(aeThreadOwnsLock());
if (c->flags & CLIENT_BLOCKED &&
(c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||

View File

@ -961,6 +961,7 @@ void configSetCommand(client *c) {
/* Try to check if the OS is capable of supporting so many FDs. */
server.maxclients = ll;
serverAssert(FALSE);
if (ll > orig_value) {
adjustOpenFilesLimit();
if (server.maxclients != ll) {

View File

@ -99,6 +99,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
* expiring our key via DELs in the replication link. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
serverAssert(aeThreadOwnsLock());
if (expireIfNeeded(db,key) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
@ -1072,6 +1073,7 @@ int removeExpire(redisDb *db, robj *key) {
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
serverAssert(aeThreadOwnsLock());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));

View File

@ -3,6 +3,7 @@
#include <sys/syscall.h>
#include <sys/types.h>
#include <sched.h>
#include <atomic>
static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough");
@ -21,6 +22,8 @@ extern "C" void fastlock_init(struct fastlock *lock)
}
extern "C" void fastlock_lock(struct fastlock *lock)
{
if (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1))
{
if (lock->m_pidOwner == gettid())
{
@ -32,8 +35,11 @@ extern "C" void fastlock_lock(struct fastlock *lock)
{
sched_yield();
}
}
lock->m_depth = 1;
lock->m_pidOwner = gettid();
__sync_synchronize();
}
extern "C" void fastlock_unlock(struct fastlock *lock)
@ -42,7 +48,9 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
if (lock->m_depth == 0)
{
lock->m_pidOwner = -1;
__sync_bool_compare_and_swap(&lock->m_lock, 1, 0);
__sync_synchronize();
if (!__sync_bool_compare_and_swap(&lock->m_lock, 1, 0))
*((volatile int*)0) = -1;
}
}
@ -55,5 +63,10 @@ extern "C" void fastlock_free(struct fastlock *lock)
bool fastlock::fOwnLock()
{
if (__sync_bool_compare_and_swap(&m_lock, 0, 1))
{
__sync_bool_compare_and_swap(&m_lock, 1, 0);
return false; // it was never locked
}
return gettid() == m_pidOwner;
}

View File

@ -484,6 +484,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
* details needed to correctly replicate commands. */
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
client *c = ctx->client;
serverAssert(aeThreadOwnsLock());
if (c->flags & CLIENT_LUA) return;
@ -3623,6 +3624,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
void moduleHandleBlockedClients(void) {
listNode *ln;
RedisModuleBlockedClient *bc;
serverAssert(aeThreadOwnsLock());
pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations

View File

@ -72,6 +72,7 @@ void queueMultiCommand(client *c) {
}
void discardTransaction(client *c) {
serverAssert(aeThreadOwnsLock());
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
@ -81,11 +82,13 @@ void discardTransaction(client *c) {
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
* Should be called every time there is an error while queueing a command. */
void flagTransaction(client *c) {
serverAssert(aeThreadOwnsLock());
if (c->flags & CLIENT_MULTI)
c->flags |= CLIENT_DIRTY_EXEC;
}
void multiCommand(client *c) {
serverAssert(aeThreadOwnsLock());
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
@ -291,6 +294,7 @@ void unwatchAllKeys(client *c) {
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
serverAssert(aeThreadOwnsLock());
list *clients;
listIter li;
listNode *ln;
@ -316,6 +320,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
void touchWatchedKeysOnFlush(int dbid) {
listIter li1, li2;
listNode *ln;
serverAssert(aeThreadOwnsLock());
/* For every client, check all the waited keys */
listRewind(server.clients,&li1);
@ -350,6 +355,7 @@ void watchCommand(client *c) {
void unwatchCommand(client *c) {
unwatchAllKeys(c);
serverAssert(aeThreadOwnsLock());
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
}

View File

@ -50,13 +50,17 @@ public:
}
void arm()
{
if (!m_fArmed)
{
m_fArmed = true;
aeAcquireLock();
}
}
void disarm()
{
serverAssert(m_fArmed);
m_fArmed = false;
aeReleaseLock();
}
@ -118,6 +122,7 @@ void linkClient(client *c) {
client *createClient(int fd, int iel) {
client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL);
c->iel = iel;
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
@ -203,6 +208,7 @@ client *createClient(int fd, int iel) {
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) linkClient(c);
initClientMultiState(c);
AssertCorrectThread(c);
return c;
}
@ -217,6 +223,7 @@ void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
serverAssert(aeThreadOwnsLock());
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
@ -234,6 +241,7 @@ void clientInstallWriteHandler(client *c) {
}
void clientInstallAsyncWriteHandler(client *c) {
serverAssert(aeThreadOwnsLock());
if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) {
c->flags |= CLIENT_PENDING_ASYNCWRITE;
listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
@ -263,6 +271,7 @@ void clientInstallAsyncWriteHandler(client *c) {
* data to the clients output buffers. If the function returns C_ERR no
* data should be appended to the output buffers. */
int prepareClientToWrite(client *c, bool fAsync) {
serverAssert(aeThreadOwnsLock());
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
/* If it's the Lua client we always return ok without installing any
@ -302,7 +311,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
if ((c->buflenAsync - c->bufposAsync) < (int)len)
{
int minsize = len + c->bufposAsync;
c->buflenAsync = std::max(minsize, c->buflenAsync*2);
c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync);
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
}
memcpy(c->bufAsync+c->bufposAsync,s,len);
@ -595,7 +604,7 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi
serverAssert(idxSplice <= c->bufposAsync);
if (c->buflenAsync < (c->bufposAsync + lenstr_len))
{
c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2);
c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2 - c->buflenAsync);
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
}
@ -713,13 +722,21 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
addReplyLongLongWithPrefixCore(c, ll, prefix, false);
}
void addReplyLongLong(client *c, long long ll) {
void addReplyLongLongCore(client *c, long long ll, bool fAsync) {
if (ll == 0)
addReply(c,shared.czero);
addReplyCore(c,shared.czero, fAsync);
else if (ll == 1)
addReply(c,shared.cone);
addReplyCore(c,shared.cone, fAsync);
else
addReplyLongLongWithPrefix(c,ll,':');
addReplyLongLongWithPrefixCore(c,ll,':', fAsync);
}
void addReplyLongLong(client *c, long long ll) {
addReplyLongLongCore(c, ll, false);
}
void addReplyLongLongAsync(client *c, long long ll) {
addReplyLongLongCore(c, ll, true);
}
void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) {
@ -1155,6 +1172,7 @@ void disconnectSlaves(void) {
void unlinkClient(client *c) {
listNode *ln;
AssertCorrectThread(c);
serverAssert(aeThreadOwnsLock());
/* If this is marked as current client unset it. */
if (server.current_client == c) server.current_client = NULL;
@ -1163,8 +1181,6 @@ void unlinkClient(client *c) {
* If the client was already unlinked or if it's a "fake client" the
* fd is already set to -1. */
if (c->fd != -1) {
AssertCorrectThread(c);
/* Remove from the list of active clients. */
if (c->client_list_node) {
uint64_t id = htonu64(c->id);
@ -1191,7 +1207,6 @@ void unlinkClient(client *c) {
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
AssertCorrectThread(c);
ln = listSearchKey(server.rgthreadvar[c->iel].unblocked_clients,c);
serverAssert(ln != NULL);
listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln);
@ -1199,9 +1214,16 @@ void unlinkClient(client *c) {
}
if (c->flags & CLIENT_PENDING_ASYNCWRITE) {
ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_asyncwrite,c);
ln = NULL;
int iel = 0;
for (; iel < server.cthreads; ++iel)
{
ln = listSearchKey(server.rgthreadvar[iel].clients_pending_asyncwrite,c);
if (ln)
break;
}
serverAssert(ln != NULL);
listDelNode(server.rgthreadvar[c->iel].clients_pending_asyncwrite,ln);
listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln);
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
}
}
@ -1316,8 +1338,8 @@ void freeClient(client *c) {
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
aeAcquireLock();
c->flags |= CLIENT_CLOSE_ASAP;
listAddNodeTail(server.clients_to_close,c);
aeReleaseLock();
}
@ -1368,6 +1390,11 @@ int writeToClient(int fd, client *c, int handler_installed) {
o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync));
if (o->used == 0) {
listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync));
if (listLength(c->listbufferDoneAsync) == 0)
{
fSendAsyncBuffer = 0;
locker.disarm();
}
continue;
}
@ -1485,17 +1512,59 @@ int writeToClient(int fd, client *c, int handler_installed) {
/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask);
AeLocker locker;
client *c = (client*)privdata;
serverAssert(ielFromEventLoop(el) == c->iel);
if (c->flags | CLIENT_SLAVE)
locker.arm();
writeToClient(fd,c,1);
}
void ProcessPendingAsyncWrites()
{
serverAssert(aeThreadOwnsLock());
while(listLength(serverTL->clients_pending_asyncwrite)) {
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite));
serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE);
// TODO: Append to end of reply block?
size_t size = c->bufposAsync;
clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL);
/* take over the allocation's internal fragmentation */
reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock);
reply->used = c->bufposAsync;
memcpy(reply->buf(), c->bufAsync, c->bufposAsync);
listAddNodeTail(c->listbufferDoneAsync, reply);
c->bufposAsync = 0;
c->buflenAsync = 0;
zfree(c->bufAsync);
c->bufAsync = nullptr;
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
// Now install the write event handler
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
if (!((c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
continue;
if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR)
continue; // We can retry later in the cron
}
}
/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
@ -1504,8 +1573,11 @@ int handleClientsWithPendingWrites(int iel) {
listIter li;
listNode *ln;
AeLocker locker(true);
list *list = server.rgthreadvar[iel].clients_pending_write;
int processed = listLength(list);
serverAssert(iel == (serverTL - server.rgthreadvar));
listRewind(list,&li);
while((ln = listNext(&li))) {
@ -1524,7 +1596,7 @@ int handleClientsWithPendingWrites(int iel) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c, TRUE)) {
int ae_flags = AE_WRITABLE;
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
@ -1540,6 +1612,9 @@ int handleClientsWithPendingWrites(int iel) {
freeClientAsync(c);
}
}
ProcessPendingAsyncWrites();
return processed;
}
@ -1947,10 +2022,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask);
serverAssert(mask & AE_READ_THREADSAFE);
serverAssert(c->iel == ielFromEventLoop(el));
AeLocker lockerSlave;
if (c->flags & CLIENT_SLAVE) // slaves are not async capable
lockerSlave.arm();
AeLocker locker;
AssertCorrectThread(c);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
@ -1974,22 +2047,19 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
locker.arm();
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
aeAcquireLock();
freeClient(c);
aeReleaseLock();
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
aeAcquireLock();
freeClient(c);
aeReleaseLock();
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
@ -2010,9 +2080,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
aeAcquireLock();
freeClient(c);
aeReleaseLock();
return;
}
@ -2022,9 +2090,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
* was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
aeAcquireLock();
processInputBufferAndReplicate(c);
aeReleaseLock();
ProcessPendingAsyncWrites();
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
@ -2649,6 +2716,9 @@ void flushSlavesOutputBuffers(void) {
client *slave = (client*)listNodeValue(ln);
int events;
if (!FCorrectThread(slave))
continue; // we cannot synchronously flush other thread's clients
/* Note that the following will not flush output buffers of slaves
* in STATE_ONLINE but having put_online_on_ack set to true: in this
* case the writable event is never installed, since the purpose

View File

@ -394,7 +394,7 @@ robj *resetRefCount(robj *obj) {
int checkType(client *c, robj *o, int type) {
if (o->type != type) {
addReply(c,shared.wrongtypeerr);
addReplyAsync(c,shared.wrongtypeerr);
return 1;
}
return 0;

View File

@ -325,6 +325,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
void subscribeCommand(client *c) {
int j;
serverAssert(aeThreadOwnsLock());
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
@ -345,6 +346,7 @@ void unsubscribeCommand(client *c) {
void psubscribeCommand(client *c) {
int j;
serverAssert(aeThreadOwnsLock());
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);

View File

@ -1862,7 +1862,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
processEventsWhileBlocked(serverTL - server.rgthreadvar);
}
}

View File

@ -124,6 +124,7 @@ void freeReplicationBacklog(void) {
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) {
serverAssert(aeThreadOwnsLock());
unsigned char *p = (unsigned char*)ptr;
server.master_repl_offset += len;
@ -175,6 +176,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
serverAssert(aeThreadOwnsLock());
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
@ -310,6 +312,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
serverAssert(aeThreadOwnsLock());
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
@ -717,6 +720,7 @@ void syncCommand(client *c) {
slave = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
@ -883,6 +887,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
serverAssert(slave->replpreamble[0] == '$');
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
@ -942,13 +947,14 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
* otherwise C_ERR is passed to the function.
* The 'type' argument is the type of the child that terminated
* (if it had a disk or socket target). */
void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel);
void updateSlavesWaitingBgsave(int bgsaveerr, int type)
{
listNode *ln;
listIter li;
int startbgsave = 0;
int mincapa = -1;
serverAssert(aeThreadOwnsLock());
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
@ -957,34 +963,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
for (int iel = 0; iel < server.cthreads; ++iel)
{
if (iel == IDX_EVENT_LOOP_MAIN)
updateSlavesWaitingBgsaveThread(bgsaveerr, type, iel);
else
aePostFunction(server.rgthreadvar[iel].el, [=]{
updateSlavesWaitingBgsaveThread(bgsaveerr, type, iel);
});
}
if (startbgsave)
startBgsaveForReplication(mincapa);
}
void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) {
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
if (slave->iel != iel)
continue;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
/* If this was an RDB on disk save, we have to prepare to send
@ -1006,13 +985,19 @@ void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) {
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
} else {
if (bgsaveerr != C_OK) {
if (FCorrectThread(slave))
freeClient(slave);
else
freeClientAsync(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
if (FCorrectThread(slave))
freeClient(slave);
else
freeClientAsync(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
@ -1022,14 +1007,28 @@ void updateSlavesWaitingBgsaveThread(int bgsaveerr, int type, int iel) {
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
if (FCorrectThread(slave))
{
aeDeleteFileEvent(server.rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
else
{
aePostFunction(server.rgthreadvar[slave->iel].el, [slave]{
aeDeleteFileEvent(server.rgthreadvar[slave->iel].el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
}
});
}
}
}
}
if (startbgsave)
startBgsaveForReplication(mincapa);
}
/* Change the current instance replication ID with a new, random one.
@ -1107,7 +1106,7 @@ void replicationEmptyDbCallback(void *privdata) {
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd, IDX_EVENT_LOOP_MAIN);
server.master = createClient(fd, serverTL - server.rgthreadvar);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
@ -1144,6 +1143,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(privdata);
UNUSED(mask);
serverAssert(aeThreadOwnsLock());
/* Static vars used to hold the EOF mark, and the last bytes received
* form the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
@ -2042,7 +2043,6 @@ void replicationHandleMasterDisconnection(void) {
* the slaves only if we'll have to do a full resync with our master. */
}
void replicaofCommandCore(client *c);
void replicaofCommand(client *c) {
// Changing the master needs to be done on the main thread.
@ -2053,23 +2053,6 @@ void replicaofCommand(client *c) {
return;
}
if ((serverTL - server.rgthreadvar) == IDX_EVENT_LOOP_MAIN)
{
replicaofCommandCore(c);
}
else
{
aeReleaseLock();
aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [=]{
replicaofCommandCore(c);
}, true /*fSync*/);
aeAcquireLock();
}
}
void replicaofCommandCore(client *c) {
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") &&
@ -2164,7 +2147,8 @@ void roleCommand(client *c) {
/* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset. If we are not connected with a master, the command has
* no effects. */
void replicationSendAck(void) {
void replicationSendAck(void)
{
client *c = server.master;
if (c != NULL) {
@ -2200,6 +2184,7 @@ void replicationSendAck(void) {
void replicationCacheMaster(client *c) {
serverAssert(server.master != NULL && server.cached_master == NULL);
serverLog(LL_NOTICE,"Caching the disconnected master state.");
AssertCorrectThread(c);
/* Unlink the client from the server structures. */
unlinkClient(c);
@ -2288,9 +2273,13 @@ void replicationResurrectCachedMaster(int newfd) {
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* Normally changing the thread of a client is a BIG NONO,
but this client was unlinked so its OK here */
server.master->iel = serverTL - server.rgthreadvar; // martial to this thread
/* Re-add to the list of clients. */
linkClient(server.master);
if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, newfd, AE_READABLE|AE_READ_THREADSAFE,
if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_READABLE|AE_READ_THREADSAFE,
readQueryFromClient, server.master)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
@ -2299,7 +2288,7 @@ void replicationResurrectCachedMaster(int newfd) {
/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (clientHasPendingReplies(server.master, TRUE)) {
if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, newfd, AE_WRITABLE,
if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
@ -2535,7 +2524,7 @@ void processClientsWaitingReplicas(void) {
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
addReplyLongLongAsync(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
@ -2543,7 +2532,7 @@ void processClientsWaitingReplicas(void) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
addReplyLongLongAsync(c,numreplicas);
}
}
}
@ -2680,7 +2669,7 @@ void replicationCron(void) {
{
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
freeClientAsync(slave);
}
}
}

View File

@ -370,6 +370,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
sds reply;
// Ensure our client is on the right thread
serverAssert(!(c->flags & CLIENT_PENDING_WRITE));
serverAssert(!(c->flags & CLIENT_UNBLOCKED));
serverAssert(aeThreadOwnsLock());
c->iel = serverTL - server.rgthreadvar;
/* Cached across calls. */

View File

@ -3976,6 +3976,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
/* Setup the master state to start a failover. */
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(aeThreadOwnsLock());
serverAssert(master->flags & SRI_MASTER);
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
@ -4168,6 +4169,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
}
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
serverAssert(aeThreadOwnsLock());
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
/* We don't handle the timeout in this state as the function aborts
@ -4292,6 +4294,7 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
serverAssert(aeThreadOwnsLock());
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {

View File

@ -1526,6 +1526,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
AssertCorrectThread(c);
size_t querybuf_size = sdsAllocSize(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;
@ -1620,16 +1621,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
*out_usage = o;
}
void AsyncClientCron(void *pv)
{
client *c = (client*)pv;
mstime_t now = mstime();
if (clientsCronHandleTimeout(c,now)) return;
if (clientsCronResizeQueryBuffer(c)) return;
if (clientsCronTrackExpansiveClients(c)) return;
}
/* This function is called by serverCron() and is used in order to perform
* operations on clients that are important to perform constantly. For instance
* we use this function in order to disconnect clients after a timeout, including
@ -1646,7 +1637,7 @@ void AsyncClientCron(void *pv)
* of clients per second, turning this function into a source of latency.
*/
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
void clientsCron(int iel) {
/* Try to process at least numclients/server.hz of clients
* per call. Since normally (if there are no big latency events) this
* function is called server.hz times per second, in the average case we
@ -1672,7 +1663,7 @@ void clientsCron(void) {
listRotate(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
if (c->iel == IDX_EVENT_LOOP_MAIN)
if (c->iel == iel)
{
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
@ -1681,11 +1672,6 @@ void clientsCron(void) {
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c)) continue;
}
else if (IDX_EVENT_LOOP_MAIN > 1)
{
aePostFunction(server.rgthreadvar[c->iel].el, AsyncClientCron, c);
}
}
}
@ -1787,6 +1773,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(id);
UNUSED(clientData);
ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
@ -1898,7 +1886,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
/* We need to do a few operations on clients asynchronously. */
clientsCron();
clientsCron(IDX_EVENT_LOOP_MAIN);
/* Handle background operations on Redis databases. */
databasesCron();
@ -2055,7 +2043,13 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN);
aeAcquireLock();
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
clientsCron(iel);
freeClientsInAsyncFreeQueue(iel);
aeReleaseLock();
return 1000/server.hz;
}
@ -2126,12 +2120,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
int iel = ielFromEventLoop(eventLoop);
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
aeAcquireLock();
if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
processUnblockedClients(iel);
aeReleaseLock();
}
aeReleaseLock();
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(iel);
@ -2752,9 +2745,18 @@ void resetServerStats(void) {
static void initNetworkingThread(int iel, int fReusePort)
{
/* Open the TCP listening socket for the user commands. */
if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN))
{
if (server.port != 0 &&
listenToPort(server.port,server.rgthreadvar[iel].ipfd,&server.rgthreadvar[iel].ipfd_count, fReusePort) == C_ERR)
exit(1);
}
else
{
// We use the main threads file descriptors
memcpy(server.rgthreadvar[iel].ipfd, server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd, sizeof(int)*CONFIG_BINDADDR_MAX);
server.rgthreadvar[iel].ipfd_count = server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count;
}
/* Create an event handler for accepting new connections in TCP */
for (int j = 0; j < server.rgthreadvar[iel].ipfd_count; j++) {
@ -3181,44 +3183,6 @@ void preventCommandReplication(client *c) {
c->flags |= CLIENT_PREVENT_REPL_PROP;
}
void ProcessPendingAsyncWrites()
{
while(listLength(serverTL->clients_pending_asyncwrite)) {
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite));
serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE);
// TODO: Append to end of reply block?
size_t size = c->bufposAsync;
clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL);
/* take over the allocation's internal fragmentation */
reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock);
reply->used = c->bufposAsync;
memcpy(reply->buf, c->bufAsync, c->bufposAsync);
listAddNodeTail(c->listbufferDoneAsync, reply);
c->bufposAsync = 0;
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
// Now install the write event handler
int ae_flags = AE_WRITABLE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR)
freeClientAsync(c);
}
}
/* Call() is the core of Redis execution of a command.
*
* The following flags can be passed:
@ -3260,6 +3224,7 @@ void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
serverAssert(aeThreadOwnsLock());
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
@ -3389,6 +3354,7 @@ void call(client *c, int flags) {
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
serverAssert(aeThreadOwnsLock());
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
@ -4517,6 +4483,7 @@ void infoCommand(client *c) {
void monitorCommand(client *c) {
/* ignore MONITOR if already slave or in monitor mode */
serverAssert(aeThreadOwnsLock());
if (c->flags & CLIENT_SLAVE) return;
c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
@ -4872,7 +4839,7 @@ void *workerThreadMain(void *parg)
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = server.rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, /*isMainThread ? 0 : AE_SLEEP_THREADSAFE*/ 0);
aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0);
aeMain(el);
aeDeleteEventLoop(el);
@ -4932,6 +4899,9 @@ int main(int argc, char **argv) {
{
initServerThread(server.rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
}
serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN];
aeAcquireLock(); // We own the lock on boot
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */
moduleInitModulesSystem();
@ -5046,9 +5016,8 @@ int main(int argc, char **argv) {
initServer();
server.cthreads = 1; //testing
initNetworking(1 /* fReusePort */);
serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN];
server.cthreads = 4; //testing
initNetworking(0 /* fReusePort */);
if (background || server.pidfile) createPidFile();
redisSetProcTitle(argv[0]);
@ -5089,6 +5058,8 @@ int main(int argc, char **argv) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
aeReleaseLock(); //Finally we can dump the lock
serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS];
for (int iel = 0; iel < server.cthreads; ++iel)

View File

@ -1205,6 +1205,7 @@ struct redisServer {
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent;
int aof_pipe_read_ack_from_child;
aeEventLoop *el_alf_pip_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /* If true stop sending accumulated diffs
@ -1640,6 +1641,7 @@ void setDeferredArrayLenAsync(client *c, void *node, long length);
void addReplySdsAsync(client *c, sds s);
void addReplyBulkSdsAsync(client *c, sds s);
void addReplyPushLenAsync(client *c, long length);
void addReplyLongLongAsync(client *c, long long ll);
void ProcessPendingAsyncWrites(void);