diff --git a/src/networking.cpp b/src/networking.cpp index 2276300c2..ed750c0ae 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -300,6 +300,9 @@ int prepareClientToWrite(client *c, bool fAsync) { serverAssert(!fAsync || GlobalLocksAcquired()); serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. + // do not install a write handler + /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; @@ -1952,7 +1955,7 @@ int processMultibulkBuffer(client *c) { * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ -void processInputBuffer(client *c) { +void processInputBuffer(client *c, int callFlags) { AssertCorrectThread(c); bool fFreed = false; @@ -2014,7 +2017,7 @@ void processInputBuffer(client *c) { server.current_client = c; /* Only reset the client when the command was executed. */ - if (processCommand(c) == C_OK) { + if (processCommand(c, callFlags) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; @@ -2051,16 +2054,19 @@ void processInputBuffer(client *c) { * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { - processInputBuffer(c); + processInputBuffer(c, CMD_CALL_FULL); } else { size_t prev_offset = c->reploff; - processInputBuffer(c); + processInputBuffer(c, CMD_CALL_FULL); size_t applied = c->reploff - prev_offset; if (applied) { - aeAcquireLock(); - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); - aeReleaseLock(); + if (!server.fActiveReplica) + { + aeAcquireLock(); + replicationFeedSlavesFromMasterStream(server.slaves, + c->pending_querybuf, applied); + aeReleaseLock(); + } sdsrange(c->pending_querybuf,applied,-1); } } diff --git a/src/replication.cpp b/src/replication.cpp index fa0d5cf3a..f7bc8bc3b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -228,37 +228,10 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } -/* Propagate write commands to slaves, and populate the replication backlog - * as well. This function is used if the instance is a master: we use - * the commands received by our clients in order to create the replication - * stream. Instead if the instance is a slave and has sub-slaves attached, - * we use replicationFeedSlavesFromMaster() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln; - listIter li; - int j, len; +void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc) +{ char llstr[LONG_STR_SIZE]; - serverAssert(GlobalLocksAcquired()); - - /* If the instance is not a top level master, return ASAP: we'll just proxy - * the stream of data we receive from our master instead, in order to - * propagate *identical* replication stream. In this way this slave can - * advertise the same replication ID as the master (since it shares the - * master replication history and has the same backlog and offsets). */ - if (!server.fActiveReplica && listLength(server.masters)) return; - - /* If there aren't slaves, and there is no backlog buffer to populate, - * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) return; - - /* We can't have slaves attached and no backlog. */ - serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); - - /* Get the lock on all slaves */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - ((client*)ln->value)->lock.lock(); - } + std::unique_locklock)> lock(slave->lock); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { @@ -280,20 +253,56 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); - /* Send it to slaves. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = (client*)ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (server.current_client && FSameHost(server.current_client, slave)) continue; - addReplyAsync(slave,selectcmd); - } + /* Send it to slaves */ + addReply(slave,selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; + /* Feed slaves that are waiting for the initial SYNC (so these commands + * are queued in the output buffer until the initial SYNC completes), + * or are already in sync with the master. */ + + /* Add the multi bulk length. */ + addReplyArrayLen(slave,argc); + + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (int j = 0; j < argc; j++) + addReplyBulk(slave,argv[j]); +} + +/* Propagate write commands to slaves, and populate the replication backlog + * as well. This function is used if the instance is a master: we use + * the commands received by our clients in order to create the replication + * stream. Instead if the instance is a slave and has sub-slaves attached, + * we use replicationFeedSlavesFromMaster() */ +void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { + listNode *ln, *lnReply; + listIter li, liReply; + int j, len; + serverAssert(GlobalLocksAcquired()); + + /* If the instance is not a top level master, return ASAP: we'll just proxy + * the stream of data we receive from our master instead, in order to + * propagate *identical* replication stream. In this way this slave can + * advertise the same replication ID as the master (since it shares the + * master replication history and has the same backlog and offsets). */ + if (!server.fActiveReplica && listLength(server.masters)) return; + + /* If there aren't slaves, and there is no backlog buffer to populate, + * we can return ASAP. */ + if (server.repl_backlog == NULL && listLength(slaves) == 0) return; + + /* We can't have slaves attached and no backlog. */ + serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + + client *fake = createClient(-1, serverTL - server.rgthreadvar); + fake->flags |= CLIENT_FORCE_REPLY; + replicationFeedSlave(fake, dictid, argv, argc); // Note: updates the repl log, keep above the repl update code below + /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; @@ -321,6 +330,25 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } } + long long cchbuf = fake->bufpos; + listRewind(fake->reply, &liReply); + while ((lnReply = listNext(&liReply))) + { + clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); + cchbuf += reply->used; + } + + bool fSendRaw = !server.fActiveReplica || (argc >= 1 && lookupCommand((sds)ptrFromObj(argv[0])) == server.rreplayCommand); + + serverAssert(argc > 0); + serverAssert(cchbuf > 0); + + char uuid[40] = {'\0'}; + uuid_unparse(server.uuid, uuid); + char proto[1024]; + int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + cchProto = std::min((int)sizeof(proto), cchProto); + /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { @@ -329,25 +357,23 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (server.current_client && FSameHost(server.current_client, slave)) continue; + std::unique_locklock)> lock(slave->lock); - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ + if (!fSendRaw) + addReplyProtoAsync(slave, proto, cchProto); - /* Add the multi bulk length. */ - addReplyArrayLenAsync(slave,argc); - - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (j = 0; j < argc; j++) - addReplyBulkAsync(slave,argv[j]); + addReplyProtoAsync(slave,fake->buf,fake->bufpos); + listRewind(fake->reply, &liReply); + while ((lnReply = listNext(&liReply))) + { + clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); + addReplyProtoAsync(slave, reply->buf(), reply->used); + } + if (!fSendRaw) + addReplyAsync(slave,shared.crlf); } - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = (client*)ln->value; - slave->lock.unlock(); - } + freeClient(fake); } /* This function is used in order to proxy what we receive from our master @@ -369,6 +395,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle if (server.repl_backlog) feedReplicationBacklog(buf,buflen); listRewind(slaves,&li); + while((ln = listNext(&li))) { client *slave = (client*)ln->value; std::lock_guardlock)> ulock(slave->lock); @@ -377,6 +404,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + addReplyProtoAsync(slave,buf,buflen); } @@ -682,6 +710,7 @@ int startBgsaveForReplication(int mincapa) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = (client*)ln->value; + std::unique_locklock)> lock(slave->lock); if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->replstate = REPL_STATE_NONE; @@ -701,6 +730,7 @@ int startBgsaveForReplication(int mincapa) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = (client*)ln->value; + std::unique_locklock)> lock(slave->lock); if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(slave, @@ -1731,7 +1761,8 @@ int slaveTryPartialResynchronization(redisMaster *mi, aeEventLoop *el, int fd, i memcpy(mi->cached_master->replid,sznew,sizeof(server.replid)); /* Disconnect all the sub-slaves: they need to be notified. */ - disconnectSlaves(); + if (!server.fActiveReplica) + disconnectSlaves(); } } @@ -2035,8 +2066,23 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * as well, if we have any sub-slaves. The master may transfer us an * entirely different data set and we have no way to incrementally feed * our slaves after that. */ - disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ + if (!server.fActiveReplica) + { + disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ + freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ + } + else + { + if (listLength(server.slaves)) + { + changeReplicationId(); + clearReplicationId2(); + } + else + { + freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ + } + } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.master_replid and master_initial_offset are @@ -2206,7 +2252,8 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { /* Force our slaves to resync with us as well. They may hopefully be able * to partially resync with us, but we can notify the replid change. */ - disconnectSlaves(); + if (!server.fActiveReplica) + disconnectSlaves(); cancelReplicationHandshake(mi); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ @@ -2238,7 +2285,8 @@ void replicationUnsetMaster(redisMaster *mi) { * of the replication ID change (see shiftReplicationId() call). However * the slaves will be able to partially resync with us, so it will be * a very fast reconnection. */ - disconnectSlaves(); + if (!server.fActiveReplica) + disconnectSlaves(); mi->repl_state = REPL_STATE_NONE; /* We need to make sure the new master will start the replication stream @@ -3083,4 +3131,58 @@ redisMaster *MasterInfoFromClient(client *c) return mi; } return nullptr; +} + +void replicaReplayCommand(client *c) +{ + // the replay command contains two arguments: + // 1: The UUID of the source + // 2: The raw command buffer to be replayed + + if (!(c->flags & CLIENT_MASTER)) + { + addReplyError(c, "Command must be sent from a master"); + return; + } + + /* First Validate Arguments */ + if (c->argc != 3) + { + addReplyError(c, "Invalid number of arguments"); + return; + } + + unsigned char uuid[UUID_BINARY_LEN]; + if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 + || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + { + addReplyError(c, "Expected UUID arg1"); + return; + } + + if (c->argv[2]->type != OBJ_STRING) + { + addReplyError(c, "Expected command buffer arg2"); + return; + } + + if (FSameUuidNoNil(uuid, server.uuid)) + { + addReply(c, shared.ok); + return; // Our own commands have come back to us. Ignore them. + } + + // OK We've recieved a command lets execute + client *cFake = createClient(-1, c->iel); + cFake->lock.lock(); + cFake->querybuf = sdscat(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); + selectDb(cFake, c->db->id); + processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->lock.unlock(); + addReply(c, shared.ok); + freeClient(cFake); + + // call() will not propogate this for us, so we do so here + alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); + return; } \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 333e40c79..795cc570a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1003,6 +1003,10 @@ struct redisCommand redisCommandTable[] = { {"acl",aclCommand,-2, "admin no-script ok-loading ok-stale", + 0,NULL,0,0,0,0,0,0}, + + {"rreplay",replicaReplayCommand,3, + "read-only fast noprop", 0,NULL,0,0,0,0,0,0} }; @@ -2456,6 +2460,7 @@ void initServerConfig(void) { server.pexpireCommand = lookupCommandByCString("pexpire"); server.xclaimCommand = lookupCommandByCString("xclaim"); server.xgroupCommand = lookupCommandByCString("xgroup"); + server.rreplayCommand = lookupCommandByCString("rreplay"); /* Slow log */ server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; @@ -3023,6 +3028,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags) c->flags |= CMD_ASKING; } else if (!strcasecmp(flag,"fast")) { c->flags |= CMD_FAST | CMD_CATEGORY_FAST; + } else if (!strcasecmp(flag,"noprop")) { + c->flags |= CMD_SKIP_PROPOGATE; } else { /* Parse ACL categories here if the flag name starts with @. */ uint64_t catflag; @@ -3344,6 +3351,9 @@ void call(client *c, int flags) { !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; + if (c->cmd->flags & CMD_SKIP_PROPOGATE) + propagate_flags &= ~PROPAGATE_REPL; + /* Call propagate() only if at least one of AOF / replication * propagation is needed. Note that modules commands handle replication * in an explicit way, so we never replicate them automatically. */ @@ -3392,7 +3402,7 @@ void call(client *c, int flags) { * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ -int processCommand(client *c) { +int processCommand(client *c, int callFlags) { serverAssert(GlobalLocksAcquired()); moduleCallCommandFilters(c); @@ -3607,7 +3617,7 @@ int processCommand(client *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { - call(c,CMD_CALL_FULL); + call(c,callFlags); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); diff --git a/src/server.h b/src/server.h index b6c1ce87b..de4a5b2d6 100644 --- a/src/server.h +++ b/src/server.h @@ -266,6 +266,7 @@ extern "C" { #define CMD_CATEGORY_CONNECTION (1ULL<<34) #define CMD_CATEGORY_TRANSACTION (1ULL<<35) #define CMD_CATEGORY_SCRIPTING (1ULL<<36) +#define CMD_SKIP_PROPOGATE (1ULL<<37) /* "noprop" flag */ /* AOF states */ #define AOF_OFF 0 /* AOF is off */ @@ -305,6 +306,7 @@ extern "C" { #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ +#define CLIENT_FORCE_REPLY (1<<29) /* Should addReply be forced to write the text? */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1171,7 +1173,7 @@ struct redisServer { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand; + *xgroupCommand, *rreplayCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1606,7 +1608,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); -void processInputBuffer(client *c); +void processInputBuffer(client *c, int callFlags); void processInputBufferAndReplicate(client *c); void processGopherRequest(client *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@ -1630,6 +1632,9 @@ void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); +#ifdef __cplusplus +void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); +#endif void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); void addReplySetLen(client *c, long length); @@ -1940,7 +1945,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(void); int freeMemoryIfNeededAndSafe(void); -int processCommand(client *c); +int processCommand(client *c, int callFlags); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(const char *s); @@ -2362,6 +2367,7 @@ void xdelCommand(client *c); void xtrimCommand(client *c); void lolwutCommand(client *c); void aclCommand(client *c); +void replicaReplayCommand(client *c); int FBrokenLinkToMaster(); int FActiveMaster(client *c);