From c5618e7fdd1db874b785c19f53f01e78ea56007a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 4 Dec 2013 15:52:20 +0100 Subject: [PATCH] WAIT command: synchronous replication for Redis. --- src/blocked.c | 4 ++ src/networking.c | 1 + src/redis.c | 29 +++++++++- src/redis.h | 10 +++- src/replication.c | 137 +++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 177 insertions(+), 4 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 667b75a62..4cd632bd3 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -130,6 +130,8 @@ void processUnblockedClients(void) { void unblockClient(redisClient *c) { if (c->btype == REDIS_BLOCKED_LIST) { unblockClientWaitingData(c); + } else if (c->btype == REDIS_BLOCKED_WAIT) { + unblockClientWaitingReplicas(c); } else { redisPanic("Unknown btype in unblockClient()."); } @@ -147,6 +149,8 @@ void unblockClient(redisClient *c) { void replyToBlockedClientTimedOut(redisClient *c) { if (c->btype == REDIS_BLOCKED_LIST) { addReply(c,shared.nullmultibulk); + } else if (c->btype == REDIS_BLOCKED_WAIT) { + addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); } else { redisPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/networking.c b/src/networking.c index 7ed8d2c09..fd8ab3c16 100644 --- a/src/networking.c +++ b/src/networking.c @@ -114,6 +114,7 @@ redisClient *createClient(int fd) { c->bpop.target = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; + c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); diff --git a/src/redis.c b/src/redis.c index f29397769..72c9f661f 100644 --- a/src/redis.c +++ b/src/redis.c @@ -263,7 +263,8 @@ struct redisCommand redisCommandTable[] = { {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}, {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}, {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, - {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0} + {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}, + {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0} }; /*============================ Utility functions ============================ */ @@ -1200,8 +1201,29 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; + + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; + } + + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); + /* Try to process pending commands for clients that were just unblocked. */ - processUnblockedClients(); + if (listLength(server.unblocked_clients)) + processUnblockedClients(); /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); @@ -1557,6 +1579,8 @@ void initServer() { server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); + server.clients_waiting_acks = listCreate(); + server.get_ack_from_slaves = 0; createSharedObjects(); adjustOpenFilesLimit(); @@ -2079,6 +2103,7 @@ int processCommand(redisClient *c) { addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); + c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } diff --git a/src/redis.h b/src/redis.h index bf968e3c7..a9b06838b 100644 --- a/src/redis.h +++ b/src/redis.h @@ -495,7 +495,8 @@ typedef struct redisClient { int slave_listening_port; /* As configured with: SLAVECONF listening-port */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if REDIS_BLOCKED. */ - blockingState bpop; /* blocking state */ + blockingState bpop; /* blocking state */ + long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ @@ -750,6 +751,9 @@ struct redisServer { dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ int repl_scriptcache_size; /* Max number of elements. */ + /* Synchronous replication. */ + list *clients_waiting_acks; /* Clients waiting in WAIT command. */ + int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ @@ -1063,6 +1067,9 @@ void replicationScriptCacheInit(void); void replicationScriptCacheFlush(void); void replicationScriptCacheAdd(sds sha1); int replicationScriptCacheExists(sds sha1); +void processClientsWaitingReplicas(void); +void unblockClientWaitingReplicas(redisClient *c); +int replicationCountAcksByOffset(long long offset); /* Generic persistence functions */ void startLoading(FILE *fp); @@ -1398,6 +1405,7 @@ void timeCommand(redisClient *c); void bitopCommand(redisClient *c); void bitcountCommand(redisClient *c); void replconfCommand(redisClient *c); +void waitCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/replication.c b/src/replication.c index 7357ae5ec..c555d1d3a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,6 +39,7 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); +void replicationSendAck(void); /* ---------------------------------- MASTER -------------------------------- */ @@ -560,6 +561,11 @@ void replconfCommand(redisClient *c) { c->repl_ack_time = server.unixtime; /* Note: this command does not reply anything! */ return; + } else if (!strcasecmp(c->argv[j]->ptr,"getack")) { + /* REPLCONF GETACK is used in order to request an ACK ASAP + * to the slave. */ + if (server.masterhost && server.master) replicationSendAck(); + /* Note: this command does not reply anything! */ } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -1495,7 +1501,136 @@ int replicationScriptCacheExists(sds sha1) { return dictFind(server.repl_scriptcache_dict,sha1) != NULL; } -/* --------------------------- REPLICATION CRON ----------------------------- */ +/* ----------------------- SYNCHRONOUS REPLICATION -------------------------- + * Redis synchronous replication design can be summarized in points: + * + * - Redis masters have a global replication offset, used by PSYNC. + * - Master increment the offset every time new commands are sent to slaves. + * - Slaves ping back masters with the offset processed so far. + * + * So synchronous replication adds a new WAIT command in the form: + * + * WAIT + * + * That returns the number of replicas that processed the query when + * we finally have at least num_replicas, or when the timeout was + * reached. + * + * The command is implemented in this way: + * + * - Every time a client processes a command, we remember the replication + * offset after sending that command to the slaves. + * - When WAIT is called, we ask slaves to send an acknowledgement ASAP. + * The client is blocked at the same time (see blocked.c). + * - Once we receive enough ACKs for a given offset or when the timeout + * is reached, the WAIT command is unblocked and the reply sent to the + * client. + */ + +/* This just set a flag so that we broadcast a REPLCONF GETACK command + * to all the slaves in the beforeSleep() function. Note that this way + * we "group" all the clients that want to wait for synchronouns replication + * in a given event loop iteration, and send a single GETACK for them all. */ +void replicationRequestAckFromSlaves(void) { + server.get_ack_from_slaves = 1; +} + +/* Return the number of slaves that already acknowledged the specified + * replication offset. */ +int replicationCountAcksByOffset(long long offset) { + listIter li; + listNode *ln; + int count = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate != REDIS_REPL_ONLINE) continue; + if (slave->repl_ack_off >= offset) count++; + } + return count; +} + +/* WAIT for N replicas to acknowledge the processing of our latest + * write command (and all the previous commands). */ +void waitCommand(redisClient *c) { + mstime_t timeout; + long numreplicas, ackreplicas; + long long offset = c->woff; + + /* Argument parsing. */ + if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK) + return; + if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS) + != REDIS_OK) return; + + /* First try without blocking at all. */ + ackreplicas = replicationCountAcksByOffset(c->woff); + if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) { + addReplyLongLong(c,ackreplicas); + return; + } + + /* Otherwise block the client and put it into our list of clients + * waiting for ack from slaves. */ + c->bpop.timeout = timeout; + c->bpop.reploffset = offset; + c->bpop.numreplicas = numreplicas; + listAddNodeTail(server.clients_waiting_acks,c); + blockClient(c,REDIS_BLOCKED_WAIT); + + /* Make sure that the server will send an ACK request to all the slaves + * before returning to the event loop. */ + replicationRequestAckFromSlaves(); +} + +/* This is called by unblockClient() to perform the blocking op type + * specific cleanup. We just remove the client from the list of clients + * waiting for replica acks. Never call it directly, call unblockClient() + * instead. */ +void unblockClientWaitingReplicas(redisClient *c) { + listNode *ln = listSearchKey(server.clients_waiting_acks,c); + redisAssert(ln != NULL); + listDelNode(server.clients_waiting_acks,ln); +} + +/* Check if there are clients blocked in WAIT that can be unblocked since + * we received enough ACKs from slaves. */ +void processClientsWaitingReplicas(void) { + long long last_offset = 0; + int last_numreplicas = 0; + + listIter li; + listNode *ln; + + listRewind(server.clients_waiting_acks,&li); + while((ln = listNext(&li))) { + redisClient *c = ln->value; + + /* Every time we find a client that is satisfied for a given + * offset and number of replicas, we remember it so the next client + * may be unblocked without calling replicationCountAcksByOffset() + * if the requested offset / replicas were equal or less. */ + if (last_offset && last_offset > c->bpop.reploffset && + last_numreplicas > c->bpop.numreplicas) + { + unblockClient(c); + addReplyLongLong(c,last_numreplicas); + } else { + int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); + + if (numreplicas >= c->bpop.numreplicas) { + last_offset = c->bpop.reploffset; + last_numreplicas = numreplicas; + unblockClient(c); + addReplyLongLong(c,numreplicas); + } + } + } +} + +/* --------------------------- REPLICATION CRON ---------------------------- */ /* Replication cron funciton, called 1 time per second. */ void replicationCron(void) {