diff --git a/src/aof.c b/src/aof.c index 39195b510..0a7fdd9ab 100644 --- a/src/aof.c +++ b/src/aof.c @@ -96,6 +96,8 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; + serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el); + UNUSED(el); UNUSED(fd); UNUSED(privdata); @@ -105,7 +107,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { ln = listFirst(server.aof_rewrite_buf_blocks); block = ln ? ln->value : NULL; if (server.aof_stop_sending_diff || !block) { - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child, + aeDeleteFileEvent(el,server.aof_pipe_write_data_to_child, AE_WRITABLE); return; } @@ -121,6 +123,15 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +static void QueueAofPipeWrite(void *arg) +{ + UNUSED(arg); + if (aeGetFileEvents(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child) == 0) { + aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.aof_pipe_write_data_to_child, + AE_WRITABLE, aofChildWriteDiffData, NULL); + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); @@ -162,10 +173,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.aof_pipe_write_data_to_child, - AE_WRITABLE, aofChildWriteDiffData, NULL); - } + aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, QueueAofPipeWrite, NULL); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -631,6 +639,7 @@ struct client *createFakeClient(void) { selectDb(c,0); c->fd = -1; + c->iel = IDX_EVENT_LOOP_MAIN; c->name = NULL; c->querybuf = sdsempty(); c->querybuf_peak = 0; @@ -1470,7 +1479,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Remove the handler since this can be called only one time during a * rewrite. */ - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEventAsync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE); } /* Create the pipes used for parent - child process IPC during rewrite. @@ -1488,7 +1497,7 @@ int aofCreatePipes(void) { /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; - if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; + if (aeCreateRemoteFileEventSync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; @@ -1507,8 +1516,8 @@ error: } void aofClosePipes(void) { - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,AE_WRITABLE); + aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE); close(server.aof_pipe_write_data_to_child); close(server.aof_pipe_read_data_from_parent); close(server.aof_pipe_write_ack_to_parent); diff --git a/src/blocked.c b/src/blocked.c index 7b9800496..aad18ee31 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -112,12 +112,14 @@ void blockClient(client *c, int btype) { void processUnblockedClients(int iel) { listNode *ln; client *c; + list *unblocked_clients = server.rgthreadvar[iel].unblocked_clients; + serverAssert(iel == (serverTL - server.rgthreadvar)); - while (listLength(server.rgunblocked_clients[iel])) { - ln = listFirst(server.rgunblocked_clients[iel]); + while (listLength(unblocked_clients)) { + ln = listFirst(unblocked_clients); serverAssert(ln != NULL); c = ln->value; - listDelNode(server.rgunblocked_clients[iel],ln); + listDelNode(unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; /* Process remaining data in the input buffer, unless the client @@ -125,9 +127,11 @@ void processUnblockedClients(int iel) { * client is not blocked before to proceed, but things may change and * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { + aeAcquireLock(); if (c->querybuf && sdslen(c->querybuf) > 0) { processInputBufferAndReplicate(c); } + aeReleaseLock(); } } } @@ -151,9 +155,10 @@ void processUnblockedClients(int iel) { void queueClientForReprocessing(client *c) { /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ + AssertCorrectThread(c); if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; - listAddNodeTail(server.rgunblocked_clients[c->iel],c); + listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c); } } @@ -213,7 +218,7 @@ void disconnectAllBlockedClients(void) { client *c = listNodeValue(ln); if (c->flags & CLIENT_BLOCKED) { - addReplySds(c,sdsnew( + addReplySdsAsync(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> replica?)\r\n")); unblockClient(c); @@ -407,7 +412,7 @@ void handleClientsBlockedOnKeys(void) { /* If the group was not found, send an error * to the consumer. */ if (!group) { - addReplyError(receiver, + addReplyErrorAsync(receiver, "-NOGROUP the consumer group this client " "was blocked on no longer exists"); unblockClient(receiver); @@ -437,12 +442,12 @@ void handleClientsBlockedOnKeys(void) { * extracted from it. Wrapped in a single-item * array, since we have just one key. */ if (receiver->resp == 2) { - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); + addReplyArrayLenAsync(receiver,1); + addReplyArrayLenAsync(receiver,2); } else { - addReplyMapLen(receiver,1); + addReplyMapLenAsync(receiver,1); } - addReplyBulk(receiver,rl->key); + addReplyBulkAsync(receiver,rl->key); streamPropInfo pi = { rl->key, diff --git a/src/cluster.c b/src/cluster.c index 6cadecc39..5dc0abb2b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -493,7 +493,7 @@ void clusterInit(void) { int j; for (j = 0; j < server.cfd_count; j++) { - if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.cfd[j], AE_READABLE, + if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL) == AE_ERR) serverPanic("Unrecoverable error creating Redis Cluster " "file event."); @@ -601,7 +601,7 @@ clusterLink *createClusterLink(clusterNode *node) { * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { if (link->fd != -1) { - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_READABLE|AE_WRITABLE); + aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE); } sdsfree(link->sndbuf); sdsfree(link->rcvbuf); @@ -645,7 +645,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * node identity. */ link = createClusterLink(NULL); link->fd = cfd; - aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],cfd,AE_READABLE,clusterReadHandler,link); + aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,cfd,AE_READABLE,clusterReadHandler,link); } } @@ -2132,7 +2132,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } sdsrange(link->sndbuf,nwritten,-1); if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_WRITABLE); + aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE); } /* Read data. Try to read the first field of the header first to check the @@ -2208,7 +2208,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_WRITABLE|AE_BARRIER, + aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER, clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); @@ -3402,7 +3402,7 @@ void clusterCron(void) { link = createClusterLink(node); link->fd = fd; node->link = link; - aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_READABLE, + aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. diff --git a/src/config.c b/src/config.c index bc1b88cc9..721f343b1 100644 --- a/src/config.c +++ b/src/config.c @@ -968,12 +968,12 @@ void configSetCommand(client *c) { server.maxclients = orig_value; return; } - if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) < + if ((unsigned int) aeGetSetSize(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el) < server.maxclients + CONFIG_FDSET_INCR) { - for (int iel = 0; iel < server.cel; ++iel) + for (int iel = 0; iel < server.cthreads; ++iel) { - if (aeResizeSetSize(server.rgel[iel], + if (aeResizeSetSize(server.rgthreadvar[iel].el, server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) { addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients"); diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 9c4e4f1c7..4f9af9ab5 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -51,3 +51,9 @@ extern "C" void fastlock_free(struct fastlock *lock) // NOP (void)lock; } + + +bool fastlock::fOwnLock() +{ + return gettid() == m_pidOwner; +} \ No newline at end of file diff --git a/src/fastlock.h b/src/fastlock.h index b5ddc597f..553bfbecc 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -37,5 +37,7 @@ struct fastlock { fastlock_unlock(this); } + + bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only #endif }; diff --git a/src/module.c b/src/module.c index 64c9fb917..9f9c5d15c 100644 --- a/src/module.c +++ b/src/module.c @@ -3633,9 +3633,15 @@ void moduleHandleBlockedClients(void) { ln = listFirst(moduleUnblockedClients); bc = ln->value; client *c = bc->client; + serverAssert(c->iel == IDX_EVENT_LOOP_MAIN); listDelNode(moduleUnblockedClients,ln); pthread_mutex_unlock(&moduleUnblockedClientsMutex); + if (c) + { + AssertCorrectThread(c); + } + /* Release the lock during the loop, as long as we don't * touch the shared list. */ @@ -3688,11 +3694,11 @@ void moduleHandleBlockedClients(void) { /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ - if (clientHasPendingReplies(c) && + if (clientHasPendingReplies(c, TRUE) && !(c->flags & CLIENT_PENDING_WRITE)) { c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.rgclients_pending_write[IDX_EVENT_LOOP_MAIN],c); + listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); } } @@ -4300,7 +4306,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aeDeleteTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],aeTimer); + aeDeleteTimeEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); aeTimer = -1; } raxStop(&ri); @@ -4309,7 +4315,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod /* If we have no main timer (the old one was invalidated, or this is the * first module timer we have), install one. */ if (aeTimer == -1) - aeTimer = aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],period,moduleTimerHandler,NULL,NULL); + aeTimer = aeCreateTimeEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); return key; } diff --git a/src/networking.cpp b/src/networking.cpp index 7188631be..2363ae2a6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -34,6 +34,38 @@ #include static void setProtocolError(const char *errstr, client *c); +void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); +void addReplyBulkCStringCore(client *c, const char *s, bool fAsync); + +class AeLocker +{ + bool m_fArmed = false; + +public: + AeLocker(bool fArm = false) + { + if (fArm) + arm(); + } + + void arm() + { + m_fArmed = true; + aeAcquireLock(); + } + + void disarm() + { + m_fArmed = false; + aeReleaseLock(); + } + + ~AeLocker() + { + if (m_fArmed) + aeReleaseLock(); + } +}; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -94,7 +126,7 @@ client *createClient(int fd, int iel) { anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); - if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_READ_THREADSAFE, + if (aeCreateFileEvent(server.rgthreadvar[iel].el,fd,AE_READABLE|AE_READ_THREADSAFE, readQueryFromClient, c) == AE_ERR) { close(fd); @@ -124,6 +156,7 @@ client *createClient(int fd, int iel) { c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; + c->sentlenAsync = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; /* If the default user does not require authentication, the user is @@ -158,6 +191,13 @@ client *createClient(int fd, int iel) { c->pubsub_patterns = listCreate(); c->peerid = NULL; c->client_list_node = NULL; + c->bufAsync = NULL; + c->buflenAsync = 0; + c->bufposAsync = 0; + c->listbufferDoneAsync = listCreate(); + listSetFreeMethod(c->listbufferDoneAsync,freeClientReplyValue); + listSetDupMethod(c->listbufferDoneAsync,dupClientReplyValue); + listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) linkClient(c); @@ -180,6 +220,7 @@ void clientInstallWriteHandler(client *c) { (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { + AssertCorrectThread(c); /* Here instead of installing the write handler, we just flag the * client and put it into a list of clients that have something * to write to the socket. This way before re-entering the event @@ -187,10 +228,20 @@ void clientInstallWriteHandler(client *c) { * a system call. We'll only really install the write handler if * we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.rgclients_pending_write[c->iel],c); + listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); } } +void clientInstallAsyncWriteHandler(client *c) { + UNUSED(c); +#if 0 // Not yet + if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) { + c->flags |= CLIENT_PENDING_ASYNCWRITE; + listAddNodeHead(serverTL->clients_pending_asyncwrite,c); + } +#endif +} + /* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * @@ -213,7 +264,9 @@ void clientInstallWriteHandler(client *c) { * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */ -int prepareClientToWrite(client *c) { +int prepareClientToWrite(client *c, bool fAsync) { + fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread + /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; @@ -230,7 +283,8 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c); + if (fAsync && !(c->flags & CLIENT_PENDING_ASYNCWRITE)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -240,25 +294,42 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -int _addReplyToBuffer(client *c, const char *s, size_t len) { - size_t available = sizeof(c->buf)-c->bufpos; - +int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; - /* If there already are entries in the reply list, we cannot - * add anything more to the static buffer. */ - if (listLength(c->reply) > 0) return C_ERR; + fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread + if (fAsync) + { + serverAssert(aeThreadOwnsLock()); + if ((c->buflenAsync - c->bufposAsync) < (int)len) + { + int minsize = len + c->bufposAsync; + c->buflenAsync = std::max(minsize, c->buflenAsync*2); + c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); + } + memcpy(c->bufAsync+c->bufposAsync,s,len); + c->bufposAsync += len; + } + else + { + size_t available = sizeof(c->buf)-c->bufpos; - /* Check that the buffer has enough space available for this string. */ - if (len > available) return C_ERR; + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return C_ERR; - memcpy(c->buf+c->bufpos,s,len); - c->bufpos+=len; + /* Check that the buffer has enough space available for this string. */ + if (len > available) return C_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + } return C_OK; } void _addReplyProtoToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; + AssertCorrectThread(c); listNode *ln = listLast(c->reply); clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL); @@ -297,13 +368,11 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) { * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */ - -/* Add the object 'obj' string representation to the client output buffer. */ -void addReply(client *c, robj *obj) { - if (prepareClientToWrite(c) != C_OK) return; +void addReplyCore(client *c, robj *obj, bool fAsync) { + if (prepareClientToWrite(c, fAsync) != C_OK) return; if (sdsEncodedObject(obj)) { - if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))) != C_OK) + if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)),fAsync) != C_OK) _addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))); } else if (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string @@ -311,26 +380,44 @@ void addReply(client *c, robj *obj) { * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj)); - if (_addReplyToBuffer(c,buf,len) != C_OK) + if (_addReplyToBuffer(c,buf,len,fAsync) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } } +/* Add the object 'obj' string representation to the client output buffer. */ +void addReply(client *c, robj *obj) +{ + addReplyCore(c, obj, false); +} +void addReplyAsync(client *c, robj *obj) +{ + addReplyCore(c, obj, true); +} + /* Add the SDS 's' string to the client output buffer, as a side effect * the SDS string is freed. */ -void addReplySds(client *c, sds s) { - if (prepareClientToWrite(c) != C_OK) { +void addReplySdsCore(client *c, sds s, bool fAsync) { + if (prepareClientToWrite(c, fAsync) != C_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } - if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) + if (_addReplyToBuffer(c,s,sdslen(s), fAsync) != C_OK) _addReplyProtoToList(c,s,sdslen(s)); sdsfree(s); } +void addReplySds(client *c, sds s) { + addReplySdsCore(c, s, false); +} + +void addReplySdsAsync(client *c, sds s) { + addReplySdsCore(c, s, true); +} + /* This low level function just adds whatever protocol you send it to the * client buffer, trying the static buffer initially, and using the string * of objects if not possible. @@ -339,12 +426,16 @@ void addReplySds(client *c, sds s) { * if not needed. The object will only be created by calling * _addReplyProtoToList() if we fail to extend the existing tail object * in the list of objects. */ -void addReplyProto(client *c, const char *s, size_t len) { - if (prepareClientToWrite(c) != C_OK) return; - if (_addReplyToBuffer(c,s,len) != C_OK) +void addReplyProtoCore(client *c, const char *s, size_t len, bool fAsync) { + if (prepareClientToWrite(c, fAsync) != C_OK) return; + if (_addReplyToBuffer(c,s,len,fAsync) != C_OK) _addReplyProtoToList(c,s,len); } +void addReplyProto(client *c, const char *s, size_t len) { + addReplyProtoCore(c, s, len, false); +} + /* Low level function called by the addReplyError...() functions. * It emits the protocol for a Redis error, in the form: * @@ -353,12 +444,12 @@ void addReplyProto(client *c, const char *s, size_t len) { * If the error code is already passed in the string 's', the error * code provided is used, otherwise the string "-ERR " for the generic * error code is automatically added. */ -void addReplyErrorLength(client *c, const char *s, size_t len) { +void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) { /* If the string already starts with "-..." then the error code * is provided by the caller. Otherwise we use "-ERR". */ - if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); + if (!len || s[0] != '-') addReplyProtoCore(c,"-ERR ",5,fAsync); + addReplyProtoCore(c,s,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); /* Sometimes it could be normal that a slave replies to a master with * an error and this function gets called. Actually the error will never @@ -380,8 +471,17 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { } } +void addReplyErrorLength(client *c, const char *s, size_t len) +{ + addReplyErrorLengthCore(c, s, len, false); +} + void addReplyError(client *c, const char *err) { - addReplyErrorLength(c,err,strlen(err)); + addReplyErrorLengthCore(c,err,strlen(err), false); +} + +void addReplyErrorAsync(client *c, const char *err) { + addReplyErrorLengthCore(c, err, strlen(err), true); } void addReplyErrorFormat(client *c, const char *fmt, ...) { @@ -425,11 +525,18 @@ void *addReplyDeferredLen(client *c) { /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ - if (prepareClientToWrite(c) != C_OK) return NULL; + if (prepareClientToWrite(c, false) != C_OK) return NULL; listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } +void *addReplyDeferredLenAsync(client *c) { + if (FCorrectThread(c)) + return addReplyDeferredLen(c); + + return (void*)((ssize_t)c->bufposAsync); +} + /* Populate the length object and try gluing it to the next chunk. */ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { listNode *ln = (listNode*)node; @@ -472,10 +579,37 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { asyncCloseClientOnOutputBufferLimitReached(c); } +void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefix) +{ + if (FCorrectThread(c)) { + setDeferredAggregateLen(c, node, length, prefix); + return; + } + + char lenstr[128]; + int lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); + + ssize_t idxSplice = (ssize_t)node; + serverAssert(idxSplice <= c->bufposAsync); + if (c->buflenAsync < (c->bufposAsync + lenstr_len)) + { + c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2); + c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); + } + + memmove(c->bufAsync + idxSplice + lenstr_len, c->bufAsync + idxSplice, c->bufposAsync - idxSplice); + memcpy(c->bufAsync + idxSplice, lenstr, lenstr_len); + c->bufposAsync += lenstr_len; +} + void setDeferredArrayLen(client *c, void *node, long length) { setDeferredAggregateLen(c,node,length,'*'); } +void setDeferredArrayLenAsync(client *c, void *node, long length) { + setDeferredAggregateLenAsync(c, node, length, '*'); +} + void setDeferredMapLen(client *c, void *node, long length) { int prefix = c->resp == 2 ? '*' : '%'; if (c->resp == 2) length *= 2; @@ -499,15 +633,15 @@ void setDeferredPushLen(client *c, void *node, long length) { } /* Add a double as a bulk reply */ -void addReplyDouble(client *c, double d) { +void addReplyDoubleCore(client *c, double d, bool fAsync) { if (isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ if (c->resp == 2) { - addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); + addReplyBulkCStringCore(c, d > 0 ? "inf" : "-inf", fAsync); } else { - addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n", - d > 0 ? 6 : 7); + addReplyProtoCore(c, d > 0 ? ",inf\r\n" : "-inf\r\n", + d > 0 ? 6 : 7, fAsync); } } else { char dbuf[MAX_LONG_DOUBLE_CHARS+3], @@ -516,14 +650,22 @@ void addReplyDouble(client *c, double d) { if (c->resp == 2) { dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); - addReplyProto(c,sbuf,slen); + addReplyProtoCore(c,sbuf,slen,fAsync); } else { dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d); - addReplyProto(c,dbuf,dlen); + addReplyProtoCore(c,dbuf,dlen,fAsync); } } } +void addReplyDouble(client *c, double d) { + addReplyDoubleCore(c, d, false); +} + +void addReplyDoubleAsync(client *c, double d) { + addReplyDoubleCore(c, d, true); +} + /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ @@ -543,7 +685,7 @@ void addReplyHumanLongDouble(client *c, long double d) { /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ -void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { +void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { char buf[128]; int len; @@ -551,10 +693,10 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { * so we have a few shared objects to use if the integer is small * like it is most of the times. */ if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.mbulkhdr[ll]); + addReplyCore(c,shared.mbulkhdr[ll], fAsync); return; } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.bulkhdr[ll]); + addReplyCore(c,shared.bulkhdr[ll], fAsync); return; } @@ -562,7 +704,11 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplyProto(c,buf,len+3); + addReplyProtoCore(c,buf,len+3, fAsync); +} + +void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { + addReplyLongLongWithPrefixCore(c, ll, prefix, false); } void addReplyLongLong(client *c, long long ll) { @@ -574,21 +720,41 @@ void addReplyLongLong(client *c, long long ll) { addReplyLongLongWithPrefix(c,ll,':'); } -void addReplyAggregateLen(client *c, long length, int prefix) { +void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) { if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN) - addReply(c,shared.mbulkhdr[length]); + addReplyCore(c,shared.mbulkhdr[length], fAsync); else - addReplyLongLongWithPrefix(c,length,prefix); + addReplyLongLongWithPrefixCore(c,length,prefix, fAsync); +} + +void addReplyAggregateLen(client *c, long length, int prefix) { + addReplyAggregateLenCore(c, length, prefix, false); +} + +void addReplyArrayLenCore(client *c, long length, bool fAsync) { + addReplyAggregateLenCore(c,length,'*', fAsync); } void addReplyArrayLen(client *c, long length) { - addReplyAggregateLen(c,length,'*'); + addReplyArrayLenCore(c, length, false); +} + +void addReplyArrayLenAsync(client *c, long length) { + addReplyArrayLenCore(c, length, true); +} + +void addReplyMapLenCore(client *c, long length, bool fAsync) { + int prefix = c->resp == 2 ? '*' : '%'; + if (c->resp == 2) length *= 2; + addReplyAggregateLenCore(c,length,prefix,fAsync); } void addReplyMapLen(client *c, long length) { - int prefix = c->resp == 2 ? '*' : '%'; - if (c->resp == 2) length *= 2; - addReplyAggregateLen(c,length,prefix); + addReplyMapLenCore(c, length, false); +} + +void addReplyMapLenAsync(client *c, long length) { + addReplyMapLenCore(c, length, true); } void addReplySetLen(client *c, long length) { @@ -602,17 +768,33 @@ void addReplyAttributeLen(client *c, long length) { addReplyAggregateLen(c,length,prefix); } -void addReplyPushLen(client *c, long length) { +void addReplyPushLenCore(client *c, long length, bool fAsync) { int prefix = c->resp == 2 ? '*' : '>'; - addReplyAggregateLen(c,length,prefix); + addReplyAggregateLenCore(c,length,prefix, fAsync); +} + +void addReplyPushLen(client *c, long length) { + addReplyPushLenCore(c, length, false); +} + +void addReplyPushLenAsync(client *c, long length) { + addReplyPushLenCore(c, length, true); +} + +void addReplyNullCore(client *c, bool fAsync) { + if (c->resp == 2) { + addReplyProtoCore(c,"$-1\r\n",5,fAsync); + } else { + addReplyProtoCore(c,"_\r\n",3,fAsync); + } } void addReplyNull(client *c) { - if (c->resp == 2) { - addReplyProto(c,"$-1\r\n",5); - } else { - addReplyProto(c,"_\r\n",3); - } + addReplyNullCore(c, false); +} + +void addReplyNullAsync(client *c) { + addReplyNullCore(c, true); } void addReplyBool(client *c, int b) { @@ -636,7 +818,7 @@ void addReplyNullArray(client *c) { } /* Create the length prefix of a bulk reply, example: $2234 */ -void addReplyBulkLen(client *c, robj *obj) { +void addReplyBulkLenCore(client *c, robj *obj, bool fAsync) { size_t len; if (sdsEncodedObject(obj)) { @@ -656,41 +838,76 @@ void addReplyBulkLen(client *c, robj *obj) { } if (len < OBJ_SHARED_BULKHDR_LEN) - addReply(c,shared.bulkhdr[len]); + addReplyCore(c,shared.bulkhdr[len], fAsync); else - addReplyLongLongWithPrefix(c,len,'$'); + addReplyLongLongWithPrefixCore(c,len,'$', fAsync); +} + +void addReplyBulkLen(client *c, robj *obj) +{ + addReplyBulkLenCore(c, obj, false); } /* Add a Redis Object as a bulk reply */ -void addReplyBulk(client *c, robj *obj) { - addReplyBulkLen(c,obj); - addReply(c,obj); - addReply(c,shared.crlf); +void addReplyBulkCore(client *c, robj *obj, bool fAsync) { + addReplyBulkLenCore(c,obj,fAsync); + addReplyCore(c,obj,fAsync); + addReplyCore(c,shared.crlf,fAsync); +} + +void addReplyBulk(client *c, robj *obj) +{ + addReplyBulkCore(c, obj, false); +} + +void addReplyBulkAsync(client *c, robj *obj) +{ + addReplyBulkCore(c, obj, true); } /* Add a C buffer as bulk reply */ +void addReplyBulkCBufferCore(client *c, const void *p, size_t len, bool fAsync) { + addReplyLongLongWithPrefixCore(c,len,'$',fAsync); + addReplyProtoCore(c,(const char*)p,len,fAsync); + addReplyCore(c,shared.crlf,fAsync); +} + void addReplyBulkCBuffer(client *c, const void *p, size_t len) { - addReplyLongLongWithPrefix(c,len,'$'); - addReplyProto(c,(const char*)p,len); - addReply(c,shared.crlf); + addReplyBulkCBufferCore(c, p, len, false); +} + +void addReplyBulkCBufferAsync(client *c, const void *p, size_t len) { + addReplyBulkCBufferCore(c, p, len, true); } /* Add sds to reply (takes ownership of sds and frees it) */ -void addReplyBulkSds(client *c, sds s) { - addReplyLongLongWithPrefix(c,sdslen(s),'$'); - addReplySds(c,s); - addReply(c,shared.crlf); +void addReplyBulkSdsCore(client *c, sds s, bool fAsync) { + addReplyLongLongWithPrefixCore(c,sdslen(s),'$', fAsync); + addReplySdsCore(c,s,fAsync); + addReplyCore(c,shared.crlf,fAsync); +} + +void addReplyBulkSds(client *c, sds s) { + addReplyBulkSdsCore(c, s, false); +} + +void addReplyBulkSdsAsync(client *c, sds s) { + addReplyBulkSdsCore(c, s, true); } /* Add a C null term string as bulk reply */ -void addReplyBulkCString(client *c, const char *s) { +void addReplyBulkCStringCore(client *c, const char *s, bool fAsync) { if (s == NULL) { - addReplyNull(c); + addReplyNullCore(c,fAsync); } else { - addReplyBulkCBuffer(c,s,strlen(s)); + addReplyBulkCBufferCore(c,s,strlen(s),fAsync); } } +void addReplyBulkCString(client *c, const char *s) { + addReplyBulkCStringCore(c, s, false); +} + /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -775,8 +992,8 @@ void copyClientOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ -int clientHasPendingReplies(client *c) { - return c->bufpos || listLength(c->reply); +int clientHasPendingReplies(client *c, int fIncludeAsync) { + return c->bufpos || listLength(c->reply) || (fIncludeAsync && listLength(c->listbufferDoneAsync)); } #define MAX_ACCEPTS_PER_CALL 1000 @@ -929,6 +1146,8 @@ void unlinkClient(client *c) { * If the client was already unlinked or if it's a "fake client" the * fd is already set to -1. */ if (c->fd != -1) { + AssertCorrectThread(c); + /* Remove from the list of active clients. */ if (c->client_list_node) { uint64_t id = htonu64(c->id); @@ -938,32 +1157,35 @@ void unlinkClient(client *c) { } /* Unregister async I/O handlers and close the socket. */ - aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_READABLE); - aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_WRITABLE); + aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); close(c->fd); c->fd = -1; } /* Remove from the list of pending writes if needed. */ if (c->flags & CLIENT_PENDING_WRITE) { - ln = listSearchKey(server.rgclients_pending_write[c->iel],c); + ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_write,c); serverAssert(ln != NULL); - listDelNode(server.rgclients_pending_write[c->iel],ln); + listDelNode(server.rgthreadvar[c->iel].clients_pending_write,ln); c->flags &= ~CLIENT_PENDING_WRITE; } /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { - ln = listSearchKey(server.rgunblocked_clients[c->iel],c); + AssertCorrectThread(c); + ln = listSearchKey(server.rgthreadvar[c->iel].unblocked_clients,c); serverAssert(ln != NULL); - listDelNode(server.rgunblocked_clients[c->iel],ln); + listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; } } void freeClient(client *c) { listNode *ln; + serverAssert(aeThreadOwnsLock()); + AssertCorrectThread(c); /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ @@ -1055,6 +1277,8 @@ void freeClient(client *c) { /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ + zfree(c->bufAsync); + listRelease(c->listbufferDoneAsync); if (c->name) decrRefCount(c->name); zfree(c->argv); freeClientMultiState(c); @@ -1069,17 +1293,25 @@ void freeClient(client *c) { void freeClientAsync(client *c) { if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; + aeAcquireLock(); listAddNodeTail(server.clients_to_close,c); + aeReleaseLock(); } -void freeClientsInAsyncFreeQueue(void) { - while (listLength(server.clients_to_close)) { - listNode *ln = listFirst(server.clients_to_close); +void freeClientsInAsyncFreeQueue(int iel) { + listIter li; + listNode *ln; + listRewind(server.clients_to_close,&li); + + while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); + if (c->iel != iel) + continue; // wrong thread c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); listDelNode(server.clients_to_close,ln); + listRewind(server.clients_to_close,&li); } } @@ -1096,11 +1328,45 @@ client *lookupClientByID(uint64_t id) { * is still valid after the call, C_ERR if it was freed. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; - size_t objlen; clientReplyBlock *o; + AssertCorrectThread(c); - while(clientHasPendingReplies(c)) { - if (c->bufpos > 0) { + // Decide up front if we are sending the done buffer. This prevents us from completing + // a transmission on another thread while transmitting the thread local buffer, resulting in us + // overlapping messages + AeLocker locker(true); + int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0); + if (!fSendAsyncBuffer) + locker.disarm(); + + while(fSendAsyncBuffer || clientHasPendingReplies(c, FALSE)) { + if (fSendAsyncBuffer) { + o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync)); + if (o->used == 0) { + listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync)); + continue; + } + + nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen); + if (nwritten <= 0) + break; + c->sentlenAsync += nwritten; + totwritten += nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlenAsync == o->used) { + listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync)); + c->sentlenAsync = 0; + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->listbufferDoneAsync) == 0) + { + fSendAsyncBuffer = 0; + locker.disarm(); + continue; + } + } + } else if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; @@ -1115,27 +1381,26 @@ int writeToClient(int fd, client *c, int handler_installed) { } } else { o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); - objlen = o->used; - - if (objlen == 0) { + if (o->used == 0) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } - nwritten = write(fd, o->buf() + c->sentlen, objlen - c->sentlen); - - if (nwritten <= 0) break; + nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen); + if (nwritten <= 0) + break; + c->sentlen += nwritten; totwritten += nwritten; - + /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { + if (c->sentlen == o->used) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ + * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0); } @@ -1165,7 +1430,9 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return C_ERR; } } @@ -1176,13 +1443,15 @@ int writeToClient(int fd, client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } - if (!clientHasPendingReplies(c)) { + if (!clientHasPendingReplies(c, TRUE)) { c->sentlen = 0; - if (handler_installed) aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); + if (handler_installed) aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return C_ERR; } } @@ -1191,9 +1460,16 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Write event handler. Just send data to the client. */ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { - UNUSED(el); UNUSED(mask); - writeToClient(fd,(client*)privdata,1); + AeLocker locker; + client *c = (client*)privdata; + + serverAssert(ielFromEventLoop(el) == c->iel); + + if (c->flags | CLIENT_SLAVE) + locker.arm(); + + writeToClient(fd,c,1); } /* This function is called just before entering the event loop, in the hope @@ -1203,14 +1479,16 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { int handleClientsWithPendingWrites(int iel) { listIter li; listNode *ln; - list *pending_writes = server.rgclients_pending_write[iel]; - int processed = listLength(pending_writes); + + list *list = server.rgthreadvar[iel].clients_pending_write; + int processed = listLength(list); - listRewind(pending_writes,&li); + listRewind(list,&li); while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; - listDelNode(pending_writes,ln); + listDelNode(list,ln); + AssertCorrectThread(c); /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ @@ -1221,7 +1499,7 @@ int handleClientsWithPendingWrites(int iel) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { + if (clientHasPendingReplies(c, TRUE)) { int ae_flags = AE_WRITABLE; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, @@ -1233,11 +1511,9 @@ int handleClientsWithPendingWrites(int iel) { { ae_flags |= AE_BARRIER; } - if (aeCreateFileEvent(server.rgel[c->iel], c->fd, ae_flags, - sendReplyToClient, c) == AE_ERR) - { - freeClientAsync(c); - } + + if (aeCreateFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) + freeClientAsync(c); } } return processed; @@ -1282,16 +1558,18 @@ void resetClient(client *c) { * path, it is not really released, but only marked for later release. */ void protectClient(client *c) { c->flags |= CLIENT_PROTECTED; - aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_READABLE); - aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); + AssertCorrectThread(c); + aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); } /* This will undo the client protection done by protectClient() */ void unprotectClient(client *c) { + AssertCorrectThread(c); if (c->flags & CLIENT_PROTECTED) { c->flags &= ~CLIENT_PROTECTED; - aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c); - if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); + aeCreateFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c); + if (clientHasPendingReplies(c, TRUE)) clientInstallWriteHandler(c); } } @@ -1645,6 +1923,10 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); serverAssert(mask & AE_READ_THREADSAFE); serverAssert(c->iel == ielFromEventLoop(el)); + + AeLocker lockerSlave; + if (c->flags & CLIENT_SLAVE) // slaves are not async capable + lockerSlave.arm(); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -1801,7 +2083,7 @@ sds catClientInfoString(sds s, client *client) { if (p == flags) *p++ = 'N'; *p++ = '\0'; - emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgel[client->iel],client->fd); + emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgthreadvar[client->iel].el,client->fd); p = events; if (emask & AE_READABLE) *p++ = 'r'; if (emask & AE_WRITABLE) *p++ = 'w'; @@ -1969,7 +2251,7 @@ NULL if (c == client) { close_this_client = 1; } else { - freeClient(client); + freeClientAsync(client); } killed++; } @@ -2349,10 +2631,10 @@ void flushSlavesOutputBuffers(void) { * of put_online_on_ack is to postpone the moment it is installed. * This is what we want since slaves in this state should not receive * writes before the first ACK. */ - events = aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd); + events = aeGetFileEvents(server.rgthreadvar[slave->iel].el,slave->fd); if (events & AE_WRITABLE && slave->replstate == SLAVE_STATE_ONLINE && - clientHasPendingReplies(slave)) + clientHasPendingReplies(slave, TRUE)) { writeToClient(slave->fd,slave,0); } @@ -2426,7 +2708,7 @@ int processEventsWhileBlocked(int iel) { int count = 0; while (iterations--) { int events = 0; - events += aeProcessEvents(server.rgel[iel], AE_FILE_EVENTS|AE_DONT_WAIT); + events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); events += handleClientsWithPendingWrites(iel); if (!events) break; count += events; diff --git a/src/pubsub.c b/src/pubsub.c index c2c3c9a82..e516bde4b 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -38,12 +38,12 @@ int clientSubscriptionsCount(client *c); /* Send a pubsub message of type "message" to the client. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) - addReply(c,shared.mbulkhdr[3]); + addReplyAsync(c,shared.mbulkhdr[3]); else - addReplyPushLen(c,3); - addReply(c,shared.messagebulk); - addReplyBulk(c,channel); - addReplyBulk(c,msg); + addReplyPushLenAsync(c,3); + addReplyAsync(c,shared.messagebulk); + addReplyBulkAsync(c,channel); + addReplyBulkAsync(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference diff --git a/src/replication.c b/src/replication.c deleted file mode 100644 index add44d318..000000000 --- a/src/replication.c +++ /dev/null @@ -1,2720 +0,0 @@ -/* Asynchronous replication implementation. - * - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -#include "server.h" - -#include -#include -#include -#include -#include - -void replicationDiscardCachedMaster(void); -void replicationResurrectCachedMaster(int newfd); -void replicationSendAck(void); -void putSlaveOnline(client *slave); -int cancelReplicationHandshake(void); - -/* --------------------------- Utility functions ---------------------------- */ - -/* Return the pointer to a string representing the slave ip:listening_port - * pair. Mostly useful for logging, since we want to log a slave using its - * IP address and its listening port which is more clear for the user, for - * example: "Closing connection with replica 10.1.2.3:6380". */ -char *replicationGetSlaveName(client *c) { - static char buf[NET_PEER_ID_LEN]; - char ip[NET_IP_STR_LEN]; - - ip[0] = '\0'; - buf[0] = '\0'; - if (c->slave_ip[0] != '\0' || - anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) - { - /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */ - if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip)); - - if (c->slave_listening_port) - anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); - else - snprintf(buf,sizeof(buf),"%s:",ip); - } else { - snprintf(buf,sizeof(buf),"client id #%llu", - (unsigned long long) c->id); - } - return buf; -} - -/* ---------------------------------- MASTER -------------------------------- */ - -void createReplicationBacklog(void) { - serverAssert(server.repl_backlog == NULL); - server.repl_backlog = zmalloc(server.repl_backlog_size, MALLOC_LOCAL); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - - /* We don't have any data inside our buffer, but virtually the first - * byte we have is the next byte that will be generated for the - * replication stream. */ - server.repl_backlog_off = server.master_repl_offset+1; -} - -/* This function is called when the user modifies the replication backlog - * size at runtime. It is up to the function to both update the - * server.repl_backlog_size and to resize the buffer and setup it so that - * it contains the same data as the previous one (possibly less data, but - * the most recent bytes, or the same data and more free space in case the - * buffer is enlarged). */ -void resizeReplicationBacklog(long long newsize) { - if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) - newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; - if (server.repl_backlog_size == newsize) return; - - server.repl_backlog_size = newsize; - if (server.repl_backlog != NULL) { - /* What we actually do is to flush the old buffer and realloc a new - * empty one. It will refill with new data incrementally. - * The reason is that copying a few gigabytes adds latency and even - * worse often we need to alloc additional space before freeing the - * old buffer. */ - zfree(server.repl_backlog); - server.repl_backlog = zmalloc(server.repl_backlog_size, MALLOC_LOCAL); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - server.repl_backlog_off = server.master_repl_offset+1; - } -} - -void freeReplicationBacklog(void) { - serverAssert(listLength(server.slaves) == 0); - zfree(server.repl_backlog); - server.repl_backlog = NULL; -} - -/* Add data to the replication backlog. - * This function also increments the global replication offset stored at - * server.master_repl_offset, because there is no case where we want to feed - * the backlog without incrementing the offset. */ -void feedReplicationBacklog(void *ptr, size_t len) { - unsigned char *p = ptr; - - server.master_repl_offset += len; - - /* This is a circular buffer, so write as much data we can at every - * iteration and rewind the "idx" index if we reach the limit. */ - while(len) { - size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; - if (thislen > len) thislen = len; - memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); - server.repl_backlog_idx += thislen; - if (server.repl_backlog_idx == server.repl_backlog_size) - server.repl_backlog_idx = 0; - len -= thislen; - p += thislen; - server.repl_backlog_histlen += thislen; - } - if (server.repl_backlog_histlen > server.repl_backlog_size) - server.repl_backlog_histlen = server.repl_backlog_size; - /* Set the offset of the first byte we have in the backlog. */ - server.repl_backlog_off = server.master_repl_offset - - server.repl_backlog_histlen + 1; -} - -/* Wrapper for feedReplicationBacklog() that takes Redis string objects - * as input. */ -void feedReplicationBacklogWithObject(robj *o) { - char llstr[LONG_STR_SIZE]; - void *p; - size_t len; - - if (o->encoding == OBJ_ENCODING_INT) { - len = ll2string(llstr,sizeof(llstr),(long)ptrFromObj(o)); - p = llstr; - } else { - len = sdslen(ptrFromObj(o)); - p = ptrFromObj(o); - } - feedReplicationBacklog(p,len); -} - -/* Propagate write commands to slaves, and populate the replication backlog - * as well. This function is used if the instance is a master: we use - * the commands received by our clients in order to create the replication - * stream. Instead if the instance is a slave and has sub-slaves attached, - * we use replicationFeedSlavesFromMaster() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln; - listIter li; - int j, len; - char llstr[LONG_STR_SIZE]; - - /* If the instance is not a top level master, return ASAP: we'll just proxy - * the stream of data we receive from our master instead, in order to - * propagate *identical* replication stream. In this way this slave can - * advertise the same replication ID as the master (since it shares the - * master replication history and has the same backlog and offsets). */ - if (server.masterhost != NULL) return; - - /* If there aren't slaves, and there is no backlog buffer to populate, - * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) return; - - /* We can't have slaves attached and no backlog. */ - serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); - - /* Send SELECT command to every slave if needed. */ - if (server.slaveseldb != dictid) { - robj *selectcmd; - - /* For a few DBs we have pre-computed SELECT command. */ - if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { - selectcmd = shared.select[dictid]; - } else { - int dictid_len; - - dictid_len = ll2string(llstr,sizeof(llstr),dictid); - selectcmd = createObject(OBJ_STRING, - sdscatprintf(sdsempty(), - "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", - dictid_len, llstr)); - } - - /* Add the SELECT command into the backlog. */ - if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); - - /* Send it to slaves. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReply(slave,selectcmd); - } - - if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) - decrRefCount(selectcmd); - } - server.slaveseldb = dictid; - - /* Write the command to the replication backlog if any. */ - if (server.repl_backlog) { - char aux[LONG_STR_SIZE+3]; - - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - - for (j = 0; j < argc; j++) { - long objlen = stringObjectLen(argv[j]); - - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - feedReplicationBacklogWithObject(argv[j]); - feedReplicationBacklog(aux+len+1,2); - } - } - - /* Write the command to every slave. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ - - /* Add the multi bulk length. */ - addReplyArrayLen(slave,argc); - - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (j = 0; j < argc; j++) - addReplyBulk(slave,argv[j]); - } -} - -/* This function is used in order to proxy what we receive from our master - * to our sub-slaves. */ -#include -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { - listNode *ln; - listIter li; - - /* Debugging: this is handy to see the stream sent from master - * to slaves. Disabled with if(0). */ - if (0) { - printf("%zu:",buflen); - for (size_t j = 0; j < buflen; j++) { - printf("%c", isprint(buf[j]) ? buf[j] : '.'); - } - printf("\n"); - } - - if (server.repl_backlog) feedReplicationBacklog(buf,buflen); - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReplyProto(slave,buf,buflen); - } -} - -void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { - listNode *ln; - listIter li; - int j; - sds cmdrepr = sdsnew("+"); - robj *cmdobj; - struct timeval tv; - - gettimeofday(&tv,NULL); - cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); - if (c->flags & CLIENT_LUA) { - cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); - } else if (c->flags & CLIENT_UNIX_SOCKET) { - cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); - } else { - cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); - } - - for (j = 0; j < argc; j++) { - if (argv[j]->encoding == OBJ_ENCODING_INT) { - cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)ptrFromObj(argv[j])); - } else { - cmdrepr = sdscatrepr(cmdrepr,(char*)ptrFromObj(argv[j]), - sdslen(ptrFromObj(argv[j]))); - } - if (j != argc-1) - cmdrepr = sdscatlen(cmdrepr," ",1); - } - cmdrepr = sdscatlen(cmdrepr,"\r\n",2); - cmdobj = createObject(OBJ_STRING,cmdrepr); - - listRewind(monitors,&li); - while((ln = listNext(&li))) { - client *monitor = ln->value; - addReply(monitor,cmdobj); - } - decrRefCount(cmdobj); -} - -/* Feed the slave 'c' with the replication backlog starting from the - * specified 'offset' up to the end of the backlog. */ -long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; - - serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); - - if (server.repl_backlog_histlen == 0) { - serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); - return 0; - } - - serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", - server.repl_backlog_size); - serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", - server.repl_backlog_off); - serverLog(LL_DEBUG, "[PSYNC] History len: %lld", - server.repl_backlog_histlen); - serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", - server.repl_backlog_idx); - - /* Compute the amount of bytes we need to discard. */ - skip = offset - server.repl_backlog_off; - serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - - /* Point j to the oldest byte, that is actually our - * server.repl_backlog_off byte. */ - j = (server.repl_backlog_idx + - (server.repl_backlog_size-server.repl_backlog_histlen)) % - server.repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); - - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % server.repl_backlog_size; - - /* Feed slave with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ - len = server.repl_backlog_histlen - skip; - serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - while(len) { - long long thislen = - ((server.repl_backlog_size - j) < len) ? - (server.repl_backlog_size - j) : len; - - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); - len -= thislen; - j = 0; - } - return server.repl_backlog_histlen - skip; -} - -/* Return the offset to provide as reply to the PSYNC command received - * from the slave. The returned value is only valid immediately after - * the BGSAVE process started and before executing any other command - * from clients. */ -long long getPsyncInitialOffset(void) { - return server.master_repl_offset; -} - -/* Send a FULLRESYNC reply in the specific case of a full resynchronization, - * as a side effect setup the slave for a full sync in different ways: - * - * 1) Remember, into the slave client structure, the replication offset - * we sent here, so that if new slaves will later attach to the same - * background RDB saving process (by duplicating this client output - * buffer), we can get the right offset from this slave. - * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that - * we start accumulating differences from this point. - * 3) Force the replication stream to re-emit a SELECT statement so - * the new slave incremental differences will start selecting the - * right database number. - * - * Normally this function should be called immediately after a successful - * BGSAVE for replication was started, or when there is one already in - * progress that we attached our slave to. */ -int replicationSetupSlaveForFullResync(client *slave, long long offset) { - char buf[128]; - int buflen; - - slave->psync_initial_offset = offset; - slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; - /* We are going to accumulate the incremental changes for this - * slave as well. Set slaveseldb to -1 in order to force to re-emit - * a SELECT statement in the replication stream. */ - server.slaveseldb = -1; - - /* Don't send this reply to slaves that approached us with - * the old SYNC command. */ - if (!(slave->flags & CLIENT_PRE_PSYNC)) { - buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", - server.replid,offset); - if (write(slave->fd,buf,buflen) != buflen) { - freeClientAsync(slave); - return C_ERR; - } - } - return C_OK; -} - -/* This function handles the PSYNC command from the point of view of a - * master receiving a request for partial resynchronization. - * - * On success return C_OK, otherwise C_ERR is returned and we proceed - * with the usual full resync. */ -int masterTryPartialResynchronization(client *c) { - long long psync_offset, psync_len; - char *master_replid = ptrFromObj(c->argv[1]); - char buf[128]; - int buflen; - - /* Parse the replication offset asked by the slave. Go to full sync - * on parse error: this should never happen but we try to handle - * it in a robust way compared to aborting. */ - if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != - C_OK) goto need_full_resync; - - /* Is the replication ID of this master the same advertised by the wannabe - * slave via PSYNC? If the replication ID changed this master has a - * different replication history, and there is no way to continue. - * - * Note that there are two potentially valid replication IDs: the ID1 - * and the ID2. The ID2 however is only valid up to a specific offset. */ - if (strcasecmp(master_replid, server.replid) && - (strcasecmp(master_replid, server.replid2) || - psync_offset > server.second_replid_offset)) - { - /* Run id "?" is used by slaves that want to force a full resync. */ - if (master_replid[0] != '?') { - if (strcasecmp(master_replid, server.replid) && - strcasecmp(master_replid, server.replid2)) - { - serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Replication ID mismatch (Replica asked for '%s', my " - "replication IDs are '%s' and '%s')", - master_replid, server.replid, server.replid2); - } else { - serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Requested offset for second ID was %lld, but I can reply " - "up to %lld", psync_offset, server.second_replid_offset); - } - } else { - serverLog(LL_NOTICE,"Full resync requested by replica %s", - replicationGetSlaveName(c)); - } - goto need_full_resync; - } - - /* We still have the data our slave is asking for? */ - if (!server.repl_backlog || - psync_offset < server.repl_backlog_off || - psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) - { - serverLog(LL_NOTICE, - "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); - if (psync_offset > server.master_repl_offset) { - serverLog(LL_WARNING, - "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); - } - goto need_full_resync; - } - - /* If we reached this point, we are able to perform a partial resync: - * 1) Set client state to make it a slave. - * 2) Inform the client we can continue with +CONTINUE - * 3) Send the backlog data (from the offset to the end) to the slave. */ - c->flags |= CLIENT_SLAVE; - c->replstate = SLAVE_STATE_ONLINE; - c->repl_ack_time = server.unixtime; - c->repl_put_online_on_ack = 0; - listAddNodeTail(server.slaves,c); - /* We can't use the connection buffers since they are used to accumulate - * new commands at this stage. But we are sure the socket send buffer is - * empty so this write will never fail actually. */ - if (c->slave_capa & SLAVE_CAPA_PSYNC2) { - buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); - } else { - buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); - } - if (write(c->fd,buf,buflen) != buflen) { - freeClientAsync(c); - return C_OK; - } - psync_len = addReplyReplicationBacklog(c,psync_offset); - serverLog(LL_NOTICE, - "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", - replicationGetSlaveName(c), - psync_len, psync_offset); - /* Note that we don't need to set the selected DB at server.slaveseldb - * to -1 to force the master to emit SELECT, since the slave already - * has this state from the previous connection with the master. */ - - refreshGoodSlavesCount(); - return C_OK; /* The caller can return, no full resync needed. */ - -need_full_resync: - /* We need a full resync for some reason... Note that we can't - * reply to PSYNC right now if a full SYNC is needed. The reply - * must include the master offset at the time the RDB file we transfer - * is generated, so we need to delay the reply to that moment. */ - return C_ERR; -} - -/* Start a BGSAVE for replication goals, which is, selecting the disk or - * socket target depending on the configuration, and making sure that - * the script cache is flushed before to start. - * - * The mincapa argument is the bitwise AND among all the slaves capabilities - * of the slaves waiting for this BGSAVE, so represents the slave capabilities - * all the slaves support. Can be tested via SLAVE_CAPA_* macros. - * - * Side effects, other than starting a BGSAVE: - * - * 1) Handle the slaves in WAIT_START state, by preparing them for a full - * sync if the BGSAVE was successfully started, or sending them an error - * and dropping them from the list of slaves. - * - * 2) Flush the Lua scripting script cache if the BGSAVE was actually - * started. - * - * Returns C_OK on success or C_ERR otherwise. */ -int startBgsaveForReplication(int mincapa) { - int retval; - int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); - listIter li; - listNode *ln; - - serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", - socket_target ? "replicas sockets" : "disk"); - - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - /* Only do rdbSave* when rsiptr is not NULL, - * otherwise slave will miss repl-stream-db. */ - if (rsiptr) { - if (socket_target) - retval = rdbSaveToSlavesSockets(rsiptr); - else - retval = rdbSaveBackground(rsiptr); - } else { - serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); - retval = C_ERR; - } - - /* If we failed to BGSAVE, remove the slaves waiting for a full - * resynchorinization from the list of salves, inform them with - * an error about what happened, close the connection ASAP. */ - if (retval == C_ERR) { - serverLog(LL_WARNING,"BGSAVE for replication failed"); - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - slave->flags &= ~CLIENT_SLAVE; - listDelNode(server.slaves,ln); - addReplyError(slave, - "BGSAVE failed, replication can't continue"); - slave->flags |= CLIENT_CLOSE_AFTER_REPLY; - } - } - return retval; - } - - /* If the target is socket, rdbSaveToSlavesSockets() already setup - * the salves for a full resync. Otherwise for disk target do it now.*/ - if (!socket_target) { - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - replicationSetupSlaveForFullResync(slave, - getPsyncInitialOffset()); - } - } - } - - /* Flush the script cache, since we need that slave differences are - * accumulated without requiring slaves to match our cached scripts. */ - if (retval == C_OK) replicationScriptCacheFlush(); - return retval; -} - -/* SYNC and PSYNC command implemenation. */ -void syncCommand(client *c) { - /* ignore SYNC if already slave or in monitor mode */ - if (c->flags & CLIENT_SLAVE) return; - - /* Refuse SYNC requests if we are a slave but the link with our master - * is not ok... */ - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { - addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); - return; - } - - /* SYNC can't be issued when the server has pending data to send to - * the client about already issued commands. We need a fresh reply - * buffer registering the differences between the BGSAVE and the current - * dataset, so that we can copy to other slaves if needed. */ - if (clientHasPendingReplies(c)) { - addReplyError(c,"SYNC and PSYNC are invalid with pending output"); - return; - } - - serverLog(LL_NOTICE,"Replica %s asks for synchronization", - replicationGetSlaveName(c)); - - /* Try a partial resynchronization if this is a PSYNC command. - * If it fails, we continue with usual full resynchronization, however - * when this happens masterTryPartialResynchronization() already - * replied with: - * - * +FULLRESYNC - * - * So the slave knows the new replid and offset to try a PSYNC later - * if the connection with the master is lost. */ - if (!strcasecmp(ptrFromObj(c->argv[0]),"psync")) { - if (masterTryPartialResynchronization(c) == C_OK) { - server.stat_sync_partial_ok++; - return; /* No full resync needed, return. */ - } else { - char *master_replid = ptrFromObj(c->argv[1]); - - /* Increment stats for failed PSYNCs, but only if the - * replid is not "?", as this is used by slaves to force a full - * resync on purpose when they are not albe to partially - * resync. */ - if (master_replid[0] != '?') server.stat_sync_partial_err++; - } - } else { - /* If a slave uses SYNC, we are dealing with an old implementation - * of the replication protocol (like keydb-cli --slave). Flag the client - * so that we don't expect to receive REPLCONF ACK feedbacks. */ - c->flags |= CLIENT_PRE_PSYNC; - } - - /* Full resynchronization. */ - server.stat_sync_full++; - - /* Setup the slave as one waiting for BGSAVE to start. The following code - * paths will change the state if we handle the slave differently. */ - c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - if (server.repl_disable_tcp_nodelay) - anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ - c->repldbfd = -1; - c->flags |= CLIENT_SLAVE; - listAddNodeTail(server.slaves,c); - - /* Create the replication backlog if needed. */ - if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { - /* When we create the backlog from scratch, we always use a new - * replication ID and clear the ID2, since there is no valid - * past history. */ - changeReplicationId(); - clearReplicationId2(); - createReplicationBacklog(); - } - - /* CASE 1: BGSAVE is in progress, with disk target. */ - if (server.rdb_child_pid != -1 && - server.rdb_child_type == RDB_CHILD_TYPE_DISK) - { - /* Ok a background save is in progress. Let's check if it is a good - * one for replication, i.e. if there is another slave that is - * registering differences since the server forked to save. */ - client *slave; - listNode *ln; - listIter li; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; - } - /* To attach this slave, we check that it has at least all the - * capabilities of the slave that triggered the current BGSAVE. */ - if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { - /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. */ - copyClientOutputBuffer(c,slave); - replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); - serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); - } else { - /* No way, we need to wait for the next BGSAVE in order to - * register differences. */ - serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); - } - - /* CASE 2: BGSAVE is in progress, with socket target. */ - } else if (server.rdb_child_pid != -1 && - server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) - { - /* There is an RDB child process but it is writing directly to - * children sockets. We need to wait for the next BGSAVE - * in order to synchronize. */ - serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); - - /* CASE 3: There is no BGSAVE is progress. */ - } else { - if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { - /* Diskless replication RDB child is created inside - * replicationCron() since we want to delay its start a - * few seconds to wait for more slaves to arrive. */ - if (server.repl_diskless_sync_delay) - serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); - } else { - /* Target is disk (or the slave is not capable of supporting - * diskless replication) and we don't have a BGSAVE in progress, - * let's start one. */ - if (server.aof_child_pid == -1) { - startBgsaveForReplication(c->slave_capa); - } else { - serverLog(LL_NOTICE, - "No BGSAVE in progress, but an AOF rewrite is active. " - "BGSAVE for replication delayed"); - } - } - } - return; -} - -/* REPLCONF