/* Asynchronous replication implementation. * * Copyright (c) 2009-2012, Salvatore Sanfilippo * Copyright (c) 2019 John Sully * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "cluster.h" #include "bio.h" #include #include #include #include #include #include #include #include #include #include #include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); void replicationSendAck(redisMaster *mi); void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); static void propagateMasterStaleKeys(); /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the replica ip:listening_port * pair. Mostly useful for logging, since we want to log a replica using its * IP address and its listening port which is more clear for the user, for * example: "Closing connection with replica 10.1.2.3:6380". */ char *replicationGetSlaveName(client *c) { static char buf[NET_PEER_ID_LEN]; char ip[NET_IP_STR_LEN]; ip[0] = '\0'; buf[0] = '\0'; if (c->slave_ip[0] != '\0' || connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1) { /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */ if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip)); if (c->slave_listening_port) anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port); else snprintf(buf,sizeof(buf),"%s:",ip); } else { snprintf(buf,sizeof(buf),"client id #%llu", (unsigned long long) c->id); } return buf; } static bool FSameUuidNoNil(const unsigned char *a, const unsigned char *b) { unsigned char zeroCheck = 0; for (int i = 0; i < UUID_BINARY_LEN; ++i) { if (a[i] != b[i]) return false; zeroCheck |= a[i]; } return (zeroCheck != 0); // if the UUID is nil then it is never equal } static bool FSameHost(client *clientA, client *clientB) { if (clientA == nullptr || clientB == nullptr) return false; const unsigned char *a = clientA->uuid; const unsigned char *b = clientB->uuid; return FSameUuidNoNil(a, b); } static bool FMasterHost(client *c) { listIter li; listNode *ln; listRewind(g_pserver->masters, &li); while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); if (FSameUuidNoNil(mi->master_uuid, c->uuid)) return true; } return false; } static bool FAnyDisconnectedMasters() { listIter li; listNode *ln; listRewind(g_pserver->masters, &li); while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); if (mi->repl_state != REPL_STATE_CONNECTED) return true; } return false; } client *replicaFromMaster(redisMaster *mi) { if (mi->master == nullptr) return nullptr; listIter liReplica; listNode *lnReplica; listRewind(g_pserver->slaves, &liReplica); while ((lnReplica = listNext(&liReplica)) != nullptr) { client *replica = (client*)listNodeValue(lnReplica); if (FSameHost(mi->master, replica)) return replica; } return nullptr; } /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { serverAssert(g_pserver->repl_backlog == NULL); g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * g_pserver->repl_backlog_size and to resize the buffer and setup it so that * it contains the same data as the previous one (possibly less data, but * the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(long long newsize) { if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; g_pserver->repl_backlog_size = newsize; if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even * worse often we need to alloc additional space before freeing the * old buffer. */ zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; } } void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); while ((ln = listNext(&li))) { // g_pserver->slaves should be empty, or filled with clients pending close client *c = (client*)listNodeValue(ln); serverAssert(c->flags & CLIENT_CLOSE_ASAP || FMasterHost(c)); } zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = NULL; } /* Add data to the replication backlog. * This function also increments the global replication offset stored at * g_pserver->master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; g_pserver->master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ while(len) { size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx; if (thislen > len) thislen = len; memcpy(g_pserver->repl_backlog+g_pserver->repl_backlog_idx,p,thislen); g_pserver->repl_backlog_idx += thislen; if (g_pserver->repl_backlog_idx == g_pserver->repl_backlog_size) g_pserver->repl_backlog_idx = 0; len -= thislen; p += thislen; g_pserver->repl_backlog_histlen += thislen; } if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size) g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset - g_pserver->repl_backlog_histlen + 1; } /* Wrapper for feedReplicationBacklog() that takes Redis string objects * as input. */ void feedReplicationBacklogWithObject(robj *o) { char llstr[LONG_STR_SIZE]; void *p; size_t len; if (o->encoding == OBJ_ENCODING_INT) { len = ll2string(llstr,sizeof(llstr),(long)ptrFromObj(o)); p = llstr; } else { len = sdslen((sds)ptrFromObj(o)); p = ptrFromObj(o); } feedReplicationBacklog(p,len); } void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw) { char llstr[LONG_STR_SIZE]; std::unique_locklock)> lock(replica->lock); /* Send SELECT command to every replica if needed. */ if (g_pserver->replicaseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } /* Add the SELECT command into the backlog. */ /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves */ addReplyAsync(replica,selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } g_pserver->replicaseldb = dictid; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyArrayLenAsync(replica,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (int j = 0; j < argc; j++) addReplyBulkAsync(replica,argv[j]); } /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a replica and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln, *lnReply; listIter li, liReply; int j, len; serverAssert(GlobalLocksAcquired()); if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this replica can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (g_pserver->repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); client *fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar); fake->flags |= CLIENT_FORCE_REPLY; bool fSendRaw = !g_pserver->fActiveReplica; replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below long long cchbuf = fake->bufpos; listRewind(fake->reply, &liReply); while ((lnReply = listNext(&liReply))) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); cchbuf += reply->used; } serverAssert(argc > 0); serverAssert(cchbuf > 0); char uuid[40] = {'\0'}; uuid_unparse(cserver.uuid, uuid); char proto[1024]; int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) { if (fSendRaw) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } else { feedReplicationBacklog(proto, cchProto); feedReplicationBacklog(fake->buf, fake->bufpos); listRewind(fake->reply, &liReply); while ((lnReply = listNext(&liReply))) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); feedReplicationBacklog(reply->buf(), reply->used); } const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); feedReplicationBacklog(szMvcc, cchMvcc); } } /* Write the command to every replica. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; std::unique_locklock)> lock(replica->lock, std::defer_lock); // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() if (FCorrectThread(replica)) lock.lock(); if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) { replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; continue; } /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ if (!fSendRaw) addReplyProtoAsync(replica, proto, cchProto); addReplyProtoAsync(replica,fake->buf,fake->bufpos); listRewind(fake->reply, &liReply); while ((lnReply = listNext(&liReply))) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } if (!fSendRaw) { addReplyAsync(replica,shared.crlf); addReplyProtoAsync(replica, szDbNum, cchDbNum); addReplyProtoAsync(replica, szMvcc, cchMvcc); } } freeClient(fake); } /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { listNode *ln; listIter li; /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { printf("%zu:",buflen); for (size_t j = 0; j < buflen; j++) { printf("%c", isprint(buf[j]) ? buf[j] : '.'); } printf("\n"); } if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); listRewind(slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; std::unique_locklock)> ulock(replica->lock, std::defer_lock); if (FCorrectThread(replica)) ulock.lock(); if (FMasterHost(replica)) continue; // Active Active case, don't feed back /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; addReplyProtoAsync(replica,buf,buflen); } if (listLength(slaves)) ProcessPendingAsyncWrites(); // flush them to their respective threads } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j; sds cmdrepr = sdsnew("+"); robj *cmdobj; struct timeval tv; serverAssert(GlobalLocksAcquired()); gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); if (c->flags & CLIENT_LUA) { cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); } else if (c->flags & CLIENT_UNIX_SOCKET) { cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,g_pserver->unixsocket); } else { cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); } for (j = 0; j < argc; j++) { if (argv[j]->encoding == OBJ_ENCODING_INT) { cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)ptrFromObj(argv[j])); } else { cmdrepr = sdscatrepr(cmdrepr,(char*)ptrFromObj(argv[j]), sdslen((sds)ptrFromObj(argv[j]))); } if (j != argc-1) cmdrepr = sdscatlen(cmdrepr," ",1); } cmdrepr = sdscatlen(cmdrepr,"\r\n",2); cmdobj = createObject(OBJ_STRING,cmdrepr); listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; std::unique_locklock)> lock(monitor->lock, std::defer_lock); // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() if (FCorrectThread(c)) lock.lock(); addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); } /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { long long j, skip, len; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (g_pserver->repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", g_pserver->repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", g_pserver->repl_backlog_off); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", g_pserver->repl_backlog_histlen); serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", g_pserver->repl_backlog_idx); /* Compute the amount of bytes we need to discard. */ skip = offset - g_pserver->repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); /* Point j to the oldest byte, that is actually our * g_pserver->repl_backlog_off byte. */ j = (g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) % g_pserver->repl_backlog_size; serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); /* Discard the amount of data to seek to the specified 'offset'. */ j = (j + skip) % g_pserver->repl_backlog_size; /* Feed replica with data. Since it is a circular buffer we have to * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((g_pserver->repl_backlog_size - j) < len) ? (g_pserver->repl_backlog_size - j) : len; serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen)); len -= thislen; j = 0; } return g_pserver->repl_backlog_histlen - skip; } /* Return the offset to provide as reply to the PSYNC command received * from the replica. The returned value is only valid immediately after * the BGSAVE process started and before executing any other command * from clients. */ long long getPsyncInitialOffset(void) { return g_pserver->master_repl_offset; } /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the replica for a full sync in different ways: * * 1) Remember, into the replica client structure, the replication offset * we sent here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this replica. * 2) Set the replication state of the replica to WAIT_BGSAVE_END so that * we start accumulating differences from this point. * 3) Force the replication stream to re-emit a SELECT statement so * the new replica incremental differences will start selecting the * right database number. * * Normally this function should be called immediately after a successful * BGSAVE for replication was started, or when there is one already in * progress that we attached our replica to. */ int replicationSetupSlaveForFullResync(client *replica, long long offset) { char buf[128]; int buflen; replica->psync_initial_offset = offset; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * replica as well. Set replicaseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ g_pserver->replicaseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(replica->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", g_pserver->replid,offset); if (connWrite(replica->conn,buf,buflen) != buflen) { freeClientAsync(replica); return C_ERR; } } return C_OK; } /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { serverAssert(GlobalLocksAcquired()); long long psync_offset, psync_len; char *master_replid = (char*)ptrFromObj(c->argv[1]); char buf[128]; int buflen; /* Parse the replication offset asked by the replica. Go to full sync * on parse error: this should never happen but we try to handle * it in a robust way compared to aborting. */ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) goto need_full_resync; /* Is the replication ID of this master the same advertised by the wannabe * replica via PSYNC? If the replication ID changed this master has a * different replication history, and there is no way to continue. * * Note that there are two potentially valid replication IDs: the ID1 * and the ID2. The ID2 however is only valid up to a specific offset. */ if (strcasecmp(master_replid, g_pserver->replid) && (strcasecmp(master_replid, g_pserver->replid2) || psync_offset > g_pserver->second_replid_offset)) { /* Run id "?" is used by slaves that want to force a full resync. */ if (master_replid[0] != '?') { if (strcasecmp(master_replid, g_pserver->replid) && strcasecmp(master_replid, g_pserver->replid2)) { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Replication ID mismatch (Replica asked for '%s', my " "replication IDs are '%s' and '%s')", master_replid, g_pserver->replid, g_pserver->replid2); } else { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Requested offset for second ID was %lld, but I can reply " "up to %lld", psync_offset, g_pserver->second_replid_offset); } } else { serverLog(LL_NOTICE,"Full resync requested by replica %s", replicationGetSlaveName(c)); } goto need_full_resync; } /* We still have the data our replica is asking for? */ if (!g_pserver->repl_backlog || psync_offset < g_pserver->repl_backlog_off || psync_offset > (g_pserver->repl_backlog_off + g_pserver->repl_backlog_histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > g_pserver->master_repl_offset) { serverLog(LL_WARNING, "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a replica. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the replica. */ c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", g_pserver->replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } if (connWrite(c->conn,buf,buflen) != buflen) { if (FCorrectThread(c)) freeClient(c); else freeClientAsync(c); return C_OK; } psync_len = addReplyReplicationBacklog(c,psync_offset); serverLog(LL_NOTICE, "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset); /* Note that we don't need to set the selected DB at g_pserver->replicaseldb * to -1 to force the master to emit SELECT, since the replica already * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); /* Fire the replica change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL); return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ return C_ERR; } /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. * * The mincapa argument is the bitwise AND among all the slaves capabilities * of the slaves waiting for this BGSAVE, so represents the replica capabilities * all the slaves support. Can be tested via SLAVE_CAPA_* macros. * * Side effects, other than starting a BGSAVE: * * 1) Handle the slaves in WAIT_START state, by preparing them for a full * sync if the BGSAVE was successfully started, or sending them an error * and dropping them from the list of slaves. * * 2) Flush the Lua scripting script cache if the BGSAVE was actually * started. * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa) { serverAssert(GlobalLocksAcquired()); int retval; int socket_target = g_pserver->repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; listNode *ln; serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); /* Only do rdbSave* when rsiptr is not NULL, * otherwise replica will miss repl-stream-db. */ if (rsiptr) { if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); else retval = rdbSaveBackground(rsiptr); } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; } /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchronization from the list of slaves, inform them with * an error about what happened, close the connection ASAP. */ if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; std::unique_locklock)> lock(replica->lock); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replica->replstate = REPL_STATE_NONE; replica->flags &= ~CLIENT_SLAVE; listDelNode(g_pserver->slaves,ln); addReplyError(replica, "BGSAVE failed, replication can't continue"); replica->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } /* If the target is socket, rdbSaveToSlavesSockets() already setup * the slaves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; std::unique_locklock)> lock(replica->lock); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(replica, getPsyncInitialOffset()); } } } /* Flush the script cache, since we need that replica differences are * accumulated without requiring slaves to match our cached scripts. */ if (retval == C_OK) replicationScriptCacheFlush(); return retval; } /* SYNC and PSYNC command implemenation. */ void syncCommand(client *c) { /* ignore SYNC if already replica or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; /* Refuse SYNC requests if we are a replica but the link with our master * is not ok... */ if (!g_pserver->fActiveReplica) { if (FAnyDisconnectedMasters()) { addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); return; } } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } serverLog(LL_NOTICE,"Replica %s asks for synchronization", replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens masterTryPartialResynchronization() already * replied with: * * +FULLRESYNC * * So the replica knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { g_pserver->stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_replid = (char*)ptrFromObj(c->argv[1]); /* Increment stats for failed PSYNCs, but only if the * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_replid[0] != '?') g_pserver->stat_sync_partial_err++; } } else { /* If a replica uses SYNC, we are dealing with an old implementation * of the replication protocol (like keydb-cli --replica). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; } /* Full resynchronization. */ g_pserver->stat_sync_full++; /* Setup the replica as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the replica differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (g_pserver->repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(g_pserver->slaves,c); /* Create the replication backlog if needed. */ if (listLength(g_pserver->slaves) == 1 && g_pserver->repl_backlog == NULL) { /* When we create the backlog from scratch, we always use a new * replication ID and clear the ID2, since there is no valid * past history. */ changeReplicationId(); clearReplicationId2(); createReplicationBacklog(); } /* CASE 1: BGSAVE is in progress, with disk target. */ if (g_pserver->rdb_child_pid != -1 && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another replica that is * registering differences since the server forked to save. */ client *replica; listNode *ln; listIter li; listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { replica = (client*)ln->value; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; } /* To attach this replica, we check that it has at least all the * capabilities of the replica that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & replica->slave_capa) == replica->slave_capa)) { /* Perfect, the server is already registering differences for * another replica. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,replica); replicationSetupSlaveForFullResync(c,replica->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); } /* CASE 2: BGSAVE is in progress, with socket target. */ } else if (g_pserver->rdb_child_pid != -1 && g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is progress. */ } else { if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ if (g_pserver->repl_diskless_sync_delay) serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); } else { /* Target is disk (or the replica is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ if (!hasActiveChildProcess()) { startBgsaveForReplication(c->slave_capa); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } } return; } void processReplconfUuid(client *c, robj *arg) { const char *remoteUUID = nullptr; if (arg->type != OBJ_STRING) goto LError; remoteUUID = (const char*)ptrFromObj(arg); if (strlen(remoteUUID) != 36) goto LError; if (uuid_parse(remoteUUID, c->uuid) != 0) goto LError; char szServerUUID[36 + 2]; // 1 for the '+', another for '\0' szServerUUID[0] = '+'; uuid_unparse(cserver.uuid, szServerUUID+1); addReplyProto(c, szServerUUID, 37); addReplyProto(c, "\r\n", 2); return; LError: addReplyError(c, "Invalid UUID"); return; } /* REPLCONF