Add RREPLAY command and put the issue of packet storms to rest. Fixes issue #24

Former-commit-id: 68d2b648e0cfeac1ec6f7b68255631ba27d83739
This commit is contained in:
John Sully 2019-04-06 00:14:27 -04:00
parent fdbc361e58
commit d36db18e7c
4 changed files with 195 additions and 71 deletions

View File

@ -300,6 +300,9 @@ int prepareClientToWrite(client *c, bool fAsync) {
serverAssert(!fAsync || GlobalLocksAcquired());
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
// do not install a write handler
/* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
@ -1952,7 +1955,7 @@ int processMultibulkBuffer(client *c) {
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
void processInputBuffer(client *c, int callFlags) {
AssertCorrectThread(c);
bool fFreed = false;
@ -2014,7 +2017,7 @@ void processInputBuffer(client *c) {
server.current_client = c;
/* Only reset the client when the command was executed. */
if (processCommand(c) == C_OK) {
if (processCommand(c, callFlags) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
@ -2051,16 +2054,19 @@ void processInputBuffer(client *c) {
* raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
processInputBuffer(c, CMD_CALL_FULL);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
processInputBuffer(c, CMD_CALL_FULL);
size_t applied = c->reploff - prev_offset;
if (applied) {
aeAcquireLock();
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
aeReleaseLock();
if (!server.fActiveReplica)
{
aeAcquireLock();
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
aeReleaseLock();
}
sdsrange(c->pending_querybuf,applied,-1);
}
}

View File

@ -228,37 +228,10 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc)
{
char llstr[LONG_STR_SIZE];
serverAssert(GlobalLocksAcquired());
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (!server.fActiveReplica && listLength(server.masters)) return;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Get the lock on all slaves */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
((client*)ln->value)->lock.lock();
}
std::unique_lock<decltype(slave->lock)> lock(slave->lock);
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
@ -280,20 +253,56 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (server.current_client && FSameHost(server.current_client, slave)) continue;
addReplyAsync(slave,selectcmd);
}
/* Send it to slaves */
addReply(slave,selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyArrayLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (int j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln, *lnReply;
listIter li, liReply;
int j, len;
serverAssert(GlobalLocksAcquired());
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (!server.fActiveReplica && listLength(server.masters)) return;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
client *fake = createClient(-1, serverTL - server.rgthreadvar);
fake->flags |= CLIENT_FORCE_REPLY;
replicationFeedSlave(fake, dictid, argv, argc); // Note: updates the repl log, keep above the repl update code below
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
@ -321,6 +330,25 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
}
long long cchbuf = fake->bufpos;
listRewind(fake->reply, &liReply);
while ((lnReply = listNext(&liReply)))
{
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
cchbuf += reply->used;
}
bool fSendRaw = !server.fActiveReplica || (argc >= 1 && lookupCommand((sds)ptrFromObj(argv[0])) == server.rreplayCommand);
serverAssert(argc > 0);
serverAssert(cchbuf > 0);
char uuid[40] = {'\0'};
uuid_unparse(server.uuid, uuid);
char proto[1024];
int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
cchProto = std::min((int)sizeof(proto), cchProto);
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
@ -329,25 +357,23 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (server.current_client && FSameHost(server.current_client, slave)) continue;
std::unique_lock<decltype(slave->lock)> lock(slave->lock);
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
if (!fSendRaw)
addReplyProtoAsync(slave, proto, cchProto);
/* Add the multi bulk length. */
addReplyArrayLenAsync(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulkAsync(slave,argv[j]);
addReplyProtoAsync(slave,fake->buf,fake->bufpos);
listRewind(fake->reply, &liReply);
while ((lnReply = listNext(&liReply)))
{
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
addReplyProtoAsync(slave, reply->buf(), reply->used);
}
if (!fSendRaw)
addReplyAsync(slave,shared.crlf);
}
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
slave->lock.unlock();
}
freeClient(fake);
}
/* This function is used in order to proxy what we receive from our master
@ -369,6 +395,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
std::lock_guard<decltype(slave->lock)> ulock(slave->lock);
@ -377,6 +404,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyProtoAsync(slave,buf,buflen);
}
@ -682,6 +710,7 @@ int startBgsaveForReplication(int mincapa) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
std::unique_lock<decltype(slave->lock)> lock(slave->lock);
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
@ -701,6 +730,7 @@ int startBgsaveForReplication(int mincapa) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
std::unique_lock<decltype(slave->lock)> lock(slave->lock);
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
@ -1731,7 +1761,8 @@ int slaveTryPartialResynchronization(redisMaster *mi, aeEventLoop *el, int fd, i
memcpy(mi->cached_master->replid,sznew,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
if (!server.fActiveReplica)
disconnectSlaves();
}
}
@ -2035,8 +2066,23 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
if (!server.fActiveReplica)
{
disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
}
else
{
if (listLength(server.slaves))
{
changeReplicationId();
clearReplicationId2();
}
else
{
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
}
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
@ -2206,7 +2252,8 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
if (!server.fActiveReplica)
disconnectSlaves();
cancelReplicationHandshake(mi);
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
@ -2238,7 +2285,8 @@ void replicationUnsetMaster(redisMaster *mi) {
* of the replication ID change (see shiftReplicationId() call). However
* the slaves will be able to partially resync with us, so it will be
* a very fast reconnection. */
disconnectSlaves();
if (!server.fActiveReplica)
disconnectSlaves();
mi->repl_state = REPL_STATE_NONE;
/* We need to make sure the new master will start the replication stream
@ -3083,4 +3131,58 @@ redisMaster *MasterInfoFromClient(client *c)
return mi;
}
return nullptr;
}
void replicaReplayCommand(client *c)
{
// the replay command contains two arguments:
// 1: The UUID of the source
// 2: The raw command buffer to be replayed
if (!(c->flags & CLIENT_MASTER))
{
addReplyError(c, "Command must be sent from a master");
return;
}
/* First Validate Arguments */
if (c->argc != 3)
{
addReplyError(c, "Invalid number of arguments");
return;
}
unsigned char uuid[UUID_BINARY_LEN];
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
{
addReplyError(c, "Expected UUID arg1");
return;
}
if (c->argv[2]->type != OBJ_STRING)
{
addReplyError(c, "Expected command buffer arg2");
return;
}
if (FSameUuidNoNil(uuid, server.uuid))
{
addReply(c, shared.ok);
return; // Our own commands have come back to us. Ignore them.
}
// OK We've recieved a command lets execute
client *cFake = createClient(-1, c->iel);
cFake->lock.lock();
cFake->querybuf = sdscat(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
selectDb(cFake, c->db->id);
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
cFake->lock.unlock();
addReply(c, shared.ok);
freeClient(cFake);
// call() will not propogate this for us, so we do so here
alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL);
return;
}

View File

@ -1003,6 +1003,10 @@ struct redisCommand redisCommandTable[] = {
{"acl",aclCommand,-2,
"admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"rreplay",replicaReplayCommand,3,
"read-only fast noprop",
0,NULL,0,0,0,0,0,0}
};
@ -2456,6 +2460,7 @@ void initServerConfig(void) {
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
server.rreplayCommand = lookupCommandByCString("rreplay");
/* Slow log */
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
@ -3023,6 +3028,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags)
c->flags |= CMD_ASKING;
} else if (!strcasecmp(flag,"fast")) {
c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
} else if (!strcasecmp(flag,"noprop")) {
c->flags |= CMD_SKIP_PROPOGATE;
} else {
/* Parse ACL categories here if the flag name starts with @. */
uint64_t catflag;
@ -3344,6 +3351,9 @@ void call(client *c, int flags) {
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
if (c->cmd->flags & CMD_SKIP_PROPOGATE)
propagate_flags &= ~PROPAGATE_REPL;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
@ -3392,7 +3402,7 @@ void call(client *c, int flags) {
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
int processCommand(client *c, int callFlags) {
serverAssert(GlobalLocksAcquired());
moduleCallCommandFilters(c);
@ -3607,7 +3617,7 @@ int processCommand(client *c) {
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
call(c,callFlags);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();

View File

@ -266,6 +266,7 @@ extern "C" {
#define CMD_CATEGORY_CONNECTION (1ULL<<34)
#define CMD_CATEGORY_TRANSACTION (1ULL<<35)
#define CMD_CATEGORY_SCRIPTING (1ULL<<36)
#define CMD_SKIP_PROPOGATE (1ULL<<37) /* "noprop" flag */
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
@ -305,6 +306,7 @@ extern "C" {
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
#define CLIENT_FORCE_REPLY (1<<29) /* Should addReply be forced to write the text? */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -1171,7 +1173,7 @@ struct redisServer {
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand;
*xgroupCommand, *rreplayCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@ -1606,7 +1608,7 @@ void setDeferredMapLen(client *c, void *node, long length);
void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
void processInputBuffer(client *c);
void processInputBuffer(client *c, int callFlags);
void processInputBufferAndReplicate(client *c);
void processGopherRequest(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@ -1630,6 +1632,9 @@ void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
#endif
void addReplyArrayLen(client *c, long length);
void addReplyMapLen(client *c, long length);
void addReplySetLen(client *c, long length);
@ -1940,7 +1945,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void);
int freeMemoryIfNeededAndSafe(void);
int processCommand(client *c);
int processCommand(client *c, int callFlags);
void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(const char *s);
@ -2362,6 +2367,7 @@ void xdelCommand(client *c);
void xtrimCommand(client *c);
void lolwutCommand(client *c);
void aclCommand(client *c);
void replicaReplayCommand(client *c);
int FBrokenLinkToMaster();
int FActiveMaster(client *c);