diff --git a/src/ae.cpp b/src/ae.cpp index 941debf00..d074d702a 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -30,6 +30,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include +#include #include #include #include @@ -75,18 +77,29 @@ enum class AE_ASYNC_OP PostFunction, PostCppFunction, DeleteFileEvent, + CreateFileEvent, }; -typedef struct aeCommand + +struct aeCommandControl +{ + std::condition_variable cv; + std::atomic rval; + std::mutex mutexcv; +}; + +struct aeCommand { AE_ASYNC_OP op; int fd; int mask; union { aePostFunctionProc *proc; + aeFileProc *fproc; std::function *pfn; }; void *clientData; -} aeCommand; + aeCommandControl *pctl; +}; void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) { @@ -106,6 +119,14 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) aeDeleteFileEvent(eventLoop, cmd.fd, cmd.mask); break; + case AE_ASYNC_OP::CreateFileEvent: + { + std::unique_lock ulock(cmd.pctl->mutexcv); + std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); + cmd.pctl->cv.notify_all(); + } + break; + case AE_ASYNC_OP::PostFunction: { std::unique_lock ulock(g_lock); @@ -114,15 +135,39 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) } case AE_ASYNC_OP::PostCppFunction: - { + { std::unique_lock ulock(g_lock); (*cmd.pfn)(); delete cmd.pfn; - } + } + break; } } } +int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData) +{ + if (eventLoop == g_eventLoopThisThread) + return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); + + aeCommand cmd; + cmd.op = AE_ASYNC_OP::CreateFileEvent; + cmd.fd = fd; + cmd.mask = mask; + cmd.fproc = proc; + cmd.clientData = clientData; + cmd.pctl = new aeCommandControl(); + + std::unique_lock ulock(cmd.pctl->mutexcv); + auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + AE_ASSERT(size == sizeof(cmd)); + cmd.pctl->cv.wait(ulock); + int ret = cmd.pctl->rval; + delete cmd.pctl; + return ret; +} + int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) { aeCommand cmd; @@ -295,7 +340,6 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) } extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { - AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); if (fd >= eventLoop->setsize) return 0; aeFileEvent *fe = &eventLoop->events[fd]; @@ -673,4 +717,9 @@ void aeAcquireLock() void aeReleaseLock() { g_lock.unlock(); +} + +int aeThreadOwnsLock() +{ + return g_lock.fOwnLock(); } \ No newline at end of file diff --git a/src/ae.h b/src/ae.h index 5a9a359a7..3eb0af502 100644 --- a/src/ae.h +++ b/src/ae.h @@ -138,6 +138,8 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); +int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); @@ -156,6 +158,7 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeAcquireLock(); void aeReleaseLock(); +int aeThreadOwnsLock(); #ifdef __cplusplus } diff --git a/src/replication.cpp b/src/replication.cpp new file mode 100644 index 000000000..645088399 --- /dev/null +++ b/src/replication.cpp @@ -0,0 +1,2752 @@ +/* Asynchronous replication implementation. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "server.h" + +#include +#include +#include +#include +#include + +void replicationDiscardCachedMaster(void); +void replicationResurrectCachedMaster(int newfd); +void replicationSendAck(void); +void putSlaveOnline(client *slave); +int cancelReplicationHandshake(void); + +/* --------------------------- Utility functions ---------------------------- */ + +/* Return the pointer to a string representing the slave ip:listening_port + * pair. Mostly useful for logging, since we want to log a slave using its + * IP address and its listening port which is more clear for the user, for + * example: "Closing connection with replica 10.1.2.3:6380". */ +char *replicationGetSlaveName(client *c) { + static char buf[NET_PEER_ID_LEN]; + char ip[NET_IP_STR_LEN]; + + ip[0] = '\0'; + buf[0] = '\0'; + if (c->slave_ip[0] != '\0' || + anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) + { + /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */ + if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip)); + + if (c->slave_listening_port) + anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); + else + snprintf(buf,sizeof(buf),"%s:",ip); + } else { + snprintf(buf,sizeof(buf),"client id #%llu", + (unsigned long long) c->id); + } + return buf; +} + +/* ---------------------------------- MASTER -------------------------------- */ + +void createReplicationBacklog(void) { + serverAssert(server.repl_backlog == NULL); + server.repl_backlog = (char*)zmalloc(server.repl_backlog_size, MALLOC_LOCAL); + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + + /* We don't have any data inside our buffer, but virtually the first + * byte we have is the next byte that will be generated for the + * replication stream. */ + server.repl_backlog_off = server.master_repl_offset+1; +} + +/* This function is called when the user modifies the replication backlog + * size at runtime. It is up to the function to both update the + * server.repl_backlog_size and to resize the buffer and setup it so that + * it contains the same data as the previous one (possibly less data, but + * the most recent bytes, or the same data and more free space in case the + * buffer is enlarged). */ +void resizeReplicationBacklog(long long newsize) { + if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) + newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; + if (server.repl_backlog_size == newsize) return; + + server.repl_backlog_size = newsize; + if (server.repl_backlog != NULL) { + /* What we actually do is to flush the old buffer and realloc a new + * empty one. It will refill with new data incrementally. + * The reason is that copying a few gigabytes adds latency and even + * worse often we need to alloc additional space before freeing the + * old buffer. */ + zfree(server.repl_backlog); + server.repl_backlog = (char*)zmalloc(server.repl_backlog_size, MALLOC_LOCAL); + server.repl_backlog_histlen = 0; + server.repl_backlog_idx = 0; + /* Next byte we have is... the next since the buffer is empty. */ + server.repl_backlog_off = server.master_repl_offset+1; + } +} + +void freeReplicationBacklog(void) { + serverAssert(listLength(server.slaves) == 0); + zfree(server.repl_backlog); + server.repl_backlog = NULL; +} + +/* Add data to the replication backlog. + * This function also increments the global replication offset stored at + * server.master_repl_offset, because there is no case where we want to feed + * the backlog without incrementing the offset. */ +void feedReplicationBacklog(void *ptr, size_t len) { + unsigned char *p = (unsigned char*)ptr; + + server.master_repl_offset += len; + + /* This is a circular buffer, so write as much data we can at every + * iteration and rewind the "idx" index if we reach the limit. */ + while(len) { + size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; + if (thislen > len) thislen = len; + memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); + server.repl_backlog_idx += thislen; + if (server.repl_backlog_idx == server.repl_backlog_size) + server.repl_backlog_idx = 0; + len -= thislen; + p += thislen; + server.repl_backlog_histlen += thislen; + } + if (server.repl_backlog_histlen > server.repl_backlog_size) + server.repl_backlog_histlen = server.repl_backlog_size; + /* Set the offset of the first byte we have in the backlog. */ + server.repl_backlog_off = server.master_repl_offset - + server.repl_backlog_histlen + 1; +} + +/* Wrapper for feedReplicationBacklog() that takes Redis string objects + * as input. */ +void feedReplicationBacklogWithObject(robj *o) { + char llstr[LONG_STR_SIZE]; + void *p; + size_t len; + + if (o->encoding == OBJ_ENCODING_INT) { + len = ll2string(llstr,sizeof(llstr),(long)ptrFromObj(o)); + p = llstr; + } else { + len = sdslen((sds)ptrFromObj(o)); + p = ptrFromObj(o); + } + feedReplicationBacklog(p,len); +} + +/* Propagate write commands to slaves, and populate the replication backlog + * as well. This function is used if the instance is a master: we use + * the commands received by our clients in order to create the replication + * stream. Instead if the instance is a slave and has sub-slaves attached, + * we use replicationFeedSlavesFromMaster() */ +void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j, len; + char llstr[LONG_STR_SIZE]; + + /* If the instance is not a top level master, return ASAP: we'll just proxy + * the stream of data we receive from our master instead, in order to + * propagate *identical* replication stream. In this way this slave can + * advertise the same replication ID as the master (since it shares the + * master replication history and has the same backlog and offsets). */ + if (server.masterhost != NULL) return; + + /* If there aren't slaves, and there is no backlog buffer to populate, + * we can return ASAP. */ + if (server.repl_backlog == NULL && listLength(slaves) == 0) return; + + /* We can't have slaves attached and no backlog. */ + serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + + /* Send SELECT command to every slave if needed. */ + if (server.slaveseldb != dictid) { + robj *selectcmd; + + /* For a few DBs we have pre-computed SELECT command. */ + if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + } else { + int dictid_len; + + dictid_len = ll2string(llstr,sizeof(llstr),dictid); + selectcmd = createObject(OBJ_STRING, + sdscatprintf(sdsempty(), + "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + dictid_len, llstr)); + } + + /* Add the SELECT command into the backlog. */ + if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); + + /* Send it to slaves. */ + listRewind(slaves,&li); + while((ln = listNext(&li))) { + client *slave = (client*)ln->value; + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + addReplyAsync(slave,selectcmd); + } + + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) + decrRefCount(selectcmd); + } + server.slaveseldb = dictid; + + /* Write the command to the replication backlog if any. */ + if (server.repl_backlog) { + char aux[LONG_STR_SIZE+3]; + + /* Add the multi bulk reply length. */ + aux[0] = '*'; + len = ll2string(aux+1,sizeof(aux)-1,argc); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBacklog(aux,len+3); + + for (j = 0; j < argc; j++) { + long objlen = stringObjectLen(argv[j]); + + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * and add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,sizeof(aux)-1,objlen); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBacklog(aux,len+3); + feedReplicationBacklogWithObject(argv[j]); + feedReplicationBacklog(aux+len+1,2); + } + } + + /* Write the command to every slave. */ + listRewind(slaves,&li); + while((ln = listNext(&li))) { + client *slave = (client*)ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + + /* Feed slaves that are waiting for the initial SYNC (so these commands + * are queued in the output buffer until the initial SYNC completes), + * or are already in sync with the master. */ + + /* Add the multi bulk length. */ + addReplyArrayLenAsync(slave,argc); + + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (j = 0; j < argc; j++) + addReplyBulkAsync(slave,argv[j]); + } +} + +/* This function is used in order to proxy what we receive from our master + * to our sub-slaves. */ +#include +void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { + listNode *ln; + listIter li; + + /* Debugging: this is handy to see the stream sent from master + * to slaves. Disabled with if(0). */ + if (0) { + printf("%zu:",buflen); + for (size_t j = 0; j < buflen; j++) { + printf("%c", isprint(buf[j]) ? buf[j] : '.'); + } + printf("\n"); + } + + if (server.repl_backlog) feedReplicationBacklog(buf,buflen); + listRewind(slaves,&li); + while((ln = listNext(&li))) { + client *slave = (client*)ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + addReplyProto(slave,buf,buflen); + } +} + +void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (c->flags & CLIENT_LUA) { + cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); + } else if (c->flags & CLIENT_UNIX_SOCKET) { + cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); + } else { + cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); + } + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == OBJ_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)ptrFromObj(argv[j])); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)ptrFromObj(argv[j]), + sdslen((sds)ptrFromObj(argv[j]))); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(OBJ_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + client *monitor = (client*)ln->value; + addReply(monitor,cmdobj); + } + decrRefCount(cmdobj); +} + +/* Feed the slave 'c' with the replication backlog starting from the + * specified 'offset' up to the end of the backlog. */ +long long addReplyReplicationBacklog(client *c, long long offset) { + long long j, skip, len; + + serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); + + if (server.repl_backlog_histlen == 0) { + serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); + return 0; + } + + serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", + server.repl_backlog_size); + serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", + server.repl_backlog_off); + serverLog(LL_DEBUG, "[PSYNC] History len: %lld", + server.repl_backlog_histlen); + serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", + server.repl_backlog_idx); + + /* Compute the amount of bytes we need to discard. */ + skip = offset - server.repl_backlog_off; + serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); + + /* Point j to the oldest byte, that is actually our + * server.repl_backlog_off byte. */ + j = (server.repl_backlog_idx + + (server.repl_backlog_size-server.repl_backlog_histlen)) % + server.repl_backlog_size; + serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); + + /* Discard the amount of data to seek to the specified 'offset'. */ + j = (j + skip) % server.repl_backlog_size; + + /* Feed slave with data. Since it is a circular buffer we have to + * split the reply in two parts if we are cross-boundary. */ + len = server.repl_backlog_histlen - skip; + serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); + while(len) { + long long thislen = + ((server.repl_backlog_size - j) < len) ? + (server.repl_backlog_size - j) : len; + + serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); + addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); + len -= thislen; + j = 0; + } + return server.repl_backlog_histlen - skip; +} + +/* Return the offset to provide as reply to the PSYNC command received + * from the slave. The returned value is only valid immediately after + * the BGSAVE process started and before executing any other command + * from clients. */ +long long getPsyncInitialOffset(void) { + return server.master_repl_offset; +} + +/* Send a FULLRESYNC reply in the specific case of a full resynchronization, + * as a side effect setup the slave for a full sync in different ways: + * + * 1) Remember, into the slave client structure, the replication offset + * we sent here, so that if new slaves will later attach to the same + * background RDB saving process (by duplicating this client output + * buffer), we can get the right offset from this slave. + * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that + * we start accumulating differences from this point. + * 3) Force the replication stream to re-emit a SELECT statement so + * the new slave incremental differences will start selecting the + * right database number. + * + * Normally this function should be called immediately after a successful + * BGSAVE for replication was started, or when there is one already in + * progress that we attached our slave to. */ +int replicationSetupSlaveForFullResync(client *slave, long long offset) { + char buf[128]; + int buflen; + + slave->psync_initial_offset = offset; + slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + /* We are going to accumulate the incremental changes for this + * slave as well. Set slaveseldb to -1 in order to force to re-emit + * a SELECT statement in the replication stream. */ + server.slaveseldb = -1; + + /* Don't send this reply to slaves that approached us with + * the old SYNC command. */ + if (!(slave->flags & CLIENT_PRE_PSYNC)) { + buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", + server.replid,offset); + if (write(slave->fd,buf,buflen) != buflen) { + freeClientAsync(slave); + return C_ERR; + } + } + return C_OK; +} + +/* This function handles the PSYNC command from the point of view of a + * master receiving a request for partial resynchronization. + * + * On success return C_OK, otherwise C_ERR is returned and we proceed + * with the usual full resync. */ +int masterTryPartialResynchronization(client *c) { + long long psync_offset, psync_len; + char *master_replid = (char*)ptrFromObj(c->argv[1]); + char buf[128]; + int buflen; + + /* Parse the replication offset asked by the slave. Go to full sync + * on parse error: this should never happen but we try to handle + * it in a robust way compared to aborting. */ + if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != + C_OK) goto need_full_resync; + + /* Is the replication ID of this master the same advertised by the wannabe + * slave via PSYNC? If the replication ID changed this master has a + * different replication history, and there is no way to continue. + * + * Note that there are two potentially valid replication IDs: the ID1 + * and the ID2. The ID2 however is only valid up to a specific offset. */ + if (strcasecmp(master_replid, server.replid) && + (strcasecmp(master_replid, server.replid2) || + psync_offset > server.second_replid_offset)) + { + /* Run id "?" is used by slaves that want to force a full resync. */ + if (master_replid[0] != '?') { + if (strcasecmp(master_replid, server.replid) && + strcasecmp(master_replid, server.replid2)) + { + serverLog(LL_NOTICE,"Partial resynchronization not accepted: " + "Replication ID mismatch (Replica asked for '%s', my " + "replication IDs are '%s' and '%s')", + master_replid, server.replid, server.replid2); + } else { + serverLog(LL_NOTICE,"Partial resynchronization not accepted: " + "Requested offset for second ID was %lld, but I can reply " + "up to %lld", psync_offset, server.second_replid_offset); + } + } else { + serverLog(LL_NOTICE,"Full resync requested by replica %s", + replicationGetSlaveName(c)); + } + goto need_full_resync; + } + + /* We still have the data our slave is asking for? */ + if (!server.repl_backlog || + psync_offset < server.repl_backlog_off || + psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) + { + serverLog(LL_NOTICE, + "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); + if (psync_offset > server.master_repl_offset) { + serverLog(LL_WARNING, + "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); + } + goto need_full_resync; + } + + /* If we reached this point, we are able to perform a partial resync: + * 1) Set client state to make it a slave. + * 2) Inform the client we can continue with +CONTINUE + * 3) Send the backlog data (from the offset to the end) to the slave. */ + c->flags |= CLIENT_SLAVE; + c->replstate = SLAVE_STATE_ONLINE; + c->repl_ack_time = server.unixtime; + c->repl_put_online_on_ack = 0; + listAddNodeTail(server.slaves,c); + /* We can't use the connection buffers since they are used to accumulate + * new commands at this stage. But we are sure the socket send buffer is + * empty so this write will never fail actually. */ + if (c->slave_capa & SLAVE_CAPA_PSYNC2) { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); + } else { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + } + if (write(c->fd,buf,buflen) != buflen) { + freeClientAsync(c); + return C_OK; + } + psync_len = addReplyReplicationBacklog(c,psync_offset); + serverLog(LL_NOTICE, + "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", + replicationGetSlaveName(c), + psync_len, psync_offset); + /* Note that we don't need to set the selected DB at server.slaveseldb + * to -1 to force the master to emit SELECT, since the slave already + * has this state from the previous connection with the master. */ + + refreshGoodSlavesCount(); + return C_OK; /* The caller can return, no full resync needed. */ + +need_full_resync: + /* We need a full resync for some reason... Note that we can't + * reply to PSYNC right now if a full SYNC is needed. The reply + * must include the master offset at the time the RDB file we transfer + * is generated, so we need to delay the reply to that moment. */ + return C_ERR; +} + +/* Start a BGSAVE for replication goals, which is, selecting the disk or + * socket target depending on the configuration, and making sure that + * the script cache is flushed before to start. + * + * The mincapa argument is the bitwise AND among all the slaves capabilities + * of the slaves waiting for this BGSAVE, so represents the slave capabilities + * all the slaves support. Can be tested via SLAVE_CAPA_* macros. + * + * Side effects, other than starting a BGSAVE: + * + * 1) Handle the slaves in WAIT_START state, by preparing them for a full + * sync if the BGSAVE was successfully started, or sending them an error + * and dropping them from the list of slaves. + * + * 2) Flush the Lua scripting script cache if the BGSAVE was actually + * started. + * + * Returns C_OK on success or C_ERR otherwise. */ +int startBgsaveForReplication(int mincapa) { + int retval; + int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); + listIter li; + listNode *ln; + + serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", + socket_target ? "replicas sockets" : "disk"); + + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + /* Only do rdbSave* when rsiptr is not NULL, + * otherwise slave will miss repl-stream-db. */ + if (rsiptr) { + if (socket_target) + retval = rdbSaveToSlavesSockets(rsiptr); + else + retval = rdbSaveBackground(rsiptr); + } else { + serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); + retval = C_ERR; + } + + /* If we failed to BGSAVE, remove the slaves waiting for a full + * resynchorinization from the list of salves, inform them with + * an error about what happened, close the connection ASAP. */ + if (retval == C_ERR) { + serverLog(LL_WARNING,"BGSAVE for replication failed"); + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = (client*)ln->value; + + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + slave->flags &= ~CLIENT_SLAVE; + listDelNode(server.slaves,ln); + addReplyError(slave, + "BGSAVE failed, replication can't continue"); + slave->flags |= CLIENT_CLOSE_AFTER_REPLY; + } + } + return retval; + } + + /* If the target is socket, rdbSaveToSlavesSockets() already setup + * the salves for a full resync. Otherwise for disk target do it now.*/ + if (!socket_target) { + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = (client*)ln->value; + + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + replicationSetupSlaveForFullResync(slave, + getPsyncInitialOffset()); + } + } + } + + /* Flush the script cache, since we need that slave differences are + * accumulated without requiring slaves to match our cached scripts. */ + if (retval == C_OK) replicationScriptCacheFlush(); + return retval; +} + +/* SYNC and PSYNC command implemenation. */ +void syncCommand(client *c) { + /* ignore SYNC if already slave or in monitor mode */ + if (c->flags & CLIENT_SLAVE) return; + + /* Refuse SYNC requests if we are a slave but the link with our master + * is not ok... */ + if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { + addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); + return; + } + + /* SYNC can't be issued when the server has pending data to send to + * the client about already issued commands. We need a fresh reply + * buffer registering the differences between the BGSAVE and the current + * dataset, so that we can copy to other slaves if needed. */ + if (clientHasPendingReplies(c, TRUE)) { + addReplyError(c,"SYNC and PSYNC are invalid with pending output"); + return; + } + + serverLog(LL_NOTICE,"Replica %s asks for synchronization", + replicationGetSlaveName(c)); + + /* Try a partial resynchronization if this is a PSYNC command. + * If it fails, we continue with usual full resynchronization, however + * when this happens masterTryPartialResynchronization() already + * replied with: + * + * +FULLRESYNC + * + * So the slave knows the new replid and offset to try a PSYNC later + * if the connection with the master is lost. */ + if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) { + if (masterTryPartialResynchronization(c) == C_OK) { + server.stat_sync_partial_ok++; + return; /* No full resync needed, return. */ + } else { + char *master_replid = (char*)ptrFromObj(c->argv[1]); + + /* Increment stats for failed PSYNCs, but only if the + * replid is not "?", as this is used by slaves to force a full + * resync on purpose when they are not albe to partially + * resync. */ + if (master_replid[0] != '?') server.stat_sync_partial_err++; + } + } else { + /* If a slave uses SYNC, we are dealing with an old implementation + * of the replication protocol (like keydb-cli --slave). Flag the client + * so that we don't expect to receive REPLCONF ACK feedbacks. */ + c->flags |= CLIENT_PRE_PSYNC; + } + + /* Full resynchronization. */ + server.stat_sync_full++; + + /* Setup the slave as one waiting for BGSAVE to start. The following code + * paths will change the state if we handle the slave differently. */ + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + if (server.repl_disable_tcp_nodelay) + anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ + c->repldbfd = -1; + c->flags |= CLIENT_SLAVE; + listAddNodeTail(server.slaves,c); + + /* Create the replication backlog if needed. */ + if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { + /* When we create the backlog from scratch, we always use a new + * replication ID and clear the ID2, since there is no valid + * past history. */ + changeReplicationId(); + clearReplicationId2(); + createReplicationBacklog(); + } + + /* CASE 1: BGSAVE is in progress, with disk target. */ + if (server.rdb_child_pid != -1 && + server.rdb_child_type == RDB_CHILD_TYPE_DISK) + { + /* Ok a background save is in progress. Let's check if it is a good + * one for replication, i.e. if there is another slave that is + * registering differences since the server forked to save. */ + client *slave; + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + slave = (client*)ln->value; + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; + } + /* To attach this slave, we check that it has at least all the + * capabilities of the slave that triggered the current BGSAVE. */ + if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { + /* Perfect, the server is already registering differences for + * another slave. Set the right state, and copy the buffer. */ + copyClientOutputBuffer(c,slave); + replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); + serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); + } else { + /* No way, we need to wait for the next BGSAVE in order to + * register differences. */ + serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); + } + + /* CASE 2: BGSAVE is in progress, with socket target. */ + } else if (server.rdb_child_pid != -1 && + server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) + { + /* There is an RDB child process but it is writing directly to + * children sockets. We need to wait for the next BGSAVE + * in order to synchronize. */ + serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); + + /* CASE 3: There is no BGSAVE is progress. */ + } else { + if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { + /* Diskless replication RDB child is created inside + * replicationCron() since we want to delay its start a + * few seconds to wait for more slaves to arrive. */ + if (server.repl_diskless_sync_delay) + serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); + } else { + /* Target is disk (or the slave is not capable of supporting + * diskless replication) and we don't have a BGSAVE in progress, + * let's start one. */ + if (server.aof_child_pid == -1) { + startBgsaveForReplication(c->slave_capa); + } else { + serverLog(LL_NOTICE, + "No BGSAVE in progress, but an AOF rewrite is active. " + "BGSAVE for replication delayed"); + } + } + } + return; +} + +/* REPLCONF