WAIT command: synchronous replication for Redis.
This commit is contained in:
parent
5f743cc4f8
commit
a7ebb0c7bf
@ -130,6 +130,8 @@ void processUnblockedClients(void) {
|
|||||||
void unblockClient(redisClient *c) {
|
void unblockClient(redisClient *c) {
|
||||||
if (c->btype == REDIS_BLOCKED_LIST) {
|
if (c->btype == REDIS_BLOCKED_LIST) {
|
||||||
unblockClientWaitingData(c);
|
unblockClientWaitingData(c);
|
||||||
|
} else if (c->btype == REDIS_BLOCKED_WAIT) {
|
||||||
|
unblockClientWaitingReplicas(c);
|
||||||
} else {
|
} else {
|
||||||
redisPanic("Unknown btype in unblockClient().");
|
redisPanic("Unknown btype in unblockClient().");
|
||||||
}
|
}
|
||||||
@ -147,6 +149,8 @@ void unblockClient(redisClient *c) {
|
|||||||
void replyToBlockedClientTimedOut(redisClient *c) {
|
void replyToBlockedClientTimedOut(redisClient *c) {
|
||||||
if (c->btype == REDIS_BLOCKED_LIST) {
|
if (c->btype == REDIS_BLOCKED_LIST) {
|
||||||
addReply(c,shared.nullmultibulk);
|
addReply(c,shared.nullmultibulk);
|
||||||
|
} else if (c->btype == REDIS_BLOCKED_WAIT) {
|
||||||
|
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
||||||
} else {
|
} else {
|
||||||
redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
||||||
}
|
}
|
||||||
|
@ -114,6 +114,7 @@ redisClient *createClient(int fd) {
|
|||||||
c->bpop.target = NULL;
|
c->bpop.target = NULL;
|
||||||
c->bpop.numreplicas = 0;
|
c->bpop.numreplicas = 0;
|
||||||
c->bpop.reploffset = 0;
|
c->bpop.reploffset = 0;
|
||||||
|
c->woff = 0;
|
||||||
c->watched_keys = listCreate();
|
c->watched_keys = listCreate();
|
||||||
c->pubsub_channels = dictCreate(&setDictType,NULL);
|
c->pubsub_channels = dictCreate(&setDictType,NULL);
|
||||||
c->pubsub_patterns = listCreate();
|
c->pubsub_patterns = listCreate();
|
||||||
|
29
src/redis.c
29
src/redis.c
@ -263,7 +263,8 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
|
{"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
|
||||||
{"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0},
|
{"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0},
|
||||||
{"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
|
{"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
|
||||||
{"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}
|
{"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
|
||||||
|
{"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*============================ Utility functions ============================ */
|
/*============================ Utility functions ============================ */
|
||||||
@ -1200,8 +1201,29 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
if (server.active_expire_enabled && server.masterhost == NULL)
|
if (server.active_expire_enabled && server.masterhost == NULL)
|
||||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||||
|
|
||||||
|
/* Send all the slaves an ACK request if at least one client blocked
|
||||||
|
* during the previous event loop iteration. */
|
||||||
|
if (server.get_ack_from_slaves) {
|
||||||
|
robj *argv[3];
|
||||||
|
|
||||||
|
argv[0] = createStringObject("REPLCONF",8);
|
||||||
|
argv[1] = createStringObject("GETACK",6);
|
||||||
|
argv[2] = createStringObject("*",1); /* Not used argument. */
|
||||||
|
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
|
||||||
|
decrRefCount(argv[0]);
|
||||||
|
decrRefCount(argv[1]);
|
||||||
|
decrRefCount(argv[2]);
|
||||||
|
server.get_ack_from_slaves = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unblock all the clients blocked for synchronous replication
|
||||||
|
* in WAIT. */
|
||||||
|
if (listLength(server.clients_waiting_acks))
|
||||||
|
processClientsWaitingReplicas();
|
||||||
|
|
||||||
/* Try to process pending commands for clients that were just unblocked. */
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
processUnblockedClients();
|
if (listLength(server.unblocked_clients))
|
||||||
|
processUnblockedClients();
|
||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
@ -1557,6 +1579,8 @@ void initServer() {
|
|||||||
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
||||||
server.unblocked_clients = listCreate();
|
server.unblocked_clients = listCreate();
|
||||||
server.ready_keys = listCreate();
|
server.ready_keys = listCreate();
|
||||||
|
server.clients_waiting_acks = listCreate();
|
||||||
|
server.get_ack_from_slaves = 0;
|
||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
adjustOpenFilesLimit();
|
adjustOpenFilesLimit();
|
||||||
@ -2079,6 +2103,7 @@ int processCommand(redisClient *c) {
|
|||||||
addReply(c,shared.queued);
|
addReply(c,shared.queued);
|
||||||
} else {
|
} else {
|
||||||
call(c,REDIS_CALL_FULL);
|
call(c,REDIS_CALL_FULL);
|
||||||
|
c->woff = server.master_repl_offset;
|
||||||
if (listLength(server.ready_keys))
|
if (listLength(server.ready_keys))
|
||||||
handleClientsBlockedOnLists();
|
handleClientsBlockedOnLists();
|
||||||
}
|
}
|
||||||
|
10
src/redis.h
10
src/redis.h
@ -495,7 +495,8 @@ typedef struct redisClient {
|
|||||||
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
||||||
multiState mstate; /* MULTI/EXEC state */
|
multiState mstate; /* MULTI/EXEC state */
|
||||||
int btype; /* Type of blocking op if REDIS_BLOCKED. */
|
int btype; /* Type of blocking op if REDIS_BLOCKED. */
|
||||||
blockingState bpop; /* blocking state */
|
blockingState bpop; /* blocking state */
|
||||||
|
long long woff; /* Last write global replication offset. */
|
||||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||||
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
|
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
|
||||||
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
|
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
|
||||||
@ -750,6 +751,9 @@ struct redisServer {
|
|||||||
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
|
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
|
||||||
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
|
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
|
||||||
int repl_scriptcache_size; /* Max number of elements. */
|
int repl_scriptcache_size; /* Max number of elements. */
|
||||||
|
/* Synchronous replication. */
|
||||||
|
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
|
||||||
|
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
|
||||||
/* Limits */
|
/* Limits */
|
||||||
unsigned int maxclients; /* Max number of simultaneous clients */
|
unsigned int maxclients; /* Max number of simultaneous clients */
|
||||||
unsigned long long maxmemory; /* Max number of memory bytes to use */
|
unsigned long long maxmemory; /* Max number of memory bytes to use */
|
||||||
@ -1063,6 +1067,9 @@ void replicationScriptCacheInit(void);
|
|||||||
void replicationScriptCacheFlush(void);
|
void replicationScriptCacheFlush(void);
|
||||||
void replicationScriptCacheAdd(sds sha1);
|
void replicationScriptCacheAdd(sds sha1);
|
||||||
int replicationScriptCacheExists(sds sha1);
|
int replicationScriptCacheExists(sds sha1);
|
||||||
|
void processClientsWaitingReplicas(void);
|
||||||
|
void unblockClientWaitingReplicas(redisClient *c);
|
||||||
|
int replicationCountAcksByOffset(long long offset);
|
||||||
|
|
||||||
/* Generic persistence functions */
|
/* Generic persistence functions */
|
||||||
void startLoading(FILE *fp);
|
void startLoading(FILE *fp);
|
||||||
@ -1398,6 +1405,7 @@ void timeCommand(redisClient *c);
|
|||||||
void bitopCommand(redisClient *c);
|
void bitopCommand(redisClient *c);
|
||||||
void bitcountCommand(redisClient *c);
|
void bitcountCommand(redisClient *c);
|
||||||
void replconfCommand(redisClient *c);
|
void replconfCommand(redisClient *c);
|
||||||
|
void waitCommand(redisClient *c);
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
|
|
||||||
void replicationDiscardCachedMaster(void);
|
void replicationDiscardCachedMaster(void);
|
||||||
void replicationResurrectCachedMaster(int newfd);
|
void replicationResurrectCachedMaster(int newfd);
|
||||||
|
void replicationSendAck(void);
|
||||||
|
|
||||||
/* ---------------------------------- MASTER -------------------------------- */
|
/* ---------------------------------- MASTER -------------------------------- */
|
||||||
|
|
||||||
@ -560,6 +561,11 @@ void replconfCommand(redisClient *c) {
|
|||||||
c->repl_ack_time = server.unixtime;
|
c->repl_ack_time = server.unixtime;
|
||||||
/* Note: this command does not reply anything! */
|
/* Note: this command does not reply anything! */
|
||||||
return;
|
return;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
|
||||||
|
/* REPLCONF GETACK is used in order to request an ACK ASAP
|
||||||
|
* to the slave. */
|
||||||
|
if (server.masterhost && server.master) replicationSendAck();
|
||||||
|
/* Note: this command does not reply anything! */
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
||||||
(char*)c->argv[j]->ptr);
|
(char*)c->argv[j]->ptr);
|
||||||
@ -1495,7 +1501,136 @@ int replicationScriptCacheExists(sds sha1) {
|
|||||||
return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
|
return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* --------------------------- REPLICATION CRON ----------------------------- */
|
/* ----------------------- SYNCHRONOUS REPLICATION --------------------------
|
||||||
|
* Redis synchronous replication design can be summarized in points:
|
||||||
|
*
|
||||||
|
* - Redis masters have a global replication offset, used by PSYNC.
|
||||||
|
* - Master increment the offset every time new commands are sent to slaves.
|
||||||
|
* - Slaves ping back masters with the offset processed so far.
|
||||||
|
*
|
||||||
|
* So synchronous replication adds a new WAIT command in the form:
|
||||||
|
*
|
||||||
|
* WAIT <num_replicas> <milliseconds_timeout>
|
||||||
|
*
|
||||||
|
* That returns the number of replicas that processed the query when
|
||||||
|
* we finally have at least num_replicas, or when the timeout was
|
||||||
|
* reached.
|
||||||
|
*
|
||||||
|
* The command is implemented in this way:
|
||||||
|
*
|
||||||
|
* - Every time a client processes a command, we remember the replication
|
||||||
|
* offset after sending that command to the slaves.
|
||||||
|
* - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
|
||||||
|
* The client is blocked at the same time (see blocked.c).
|
||||||
|
* - Once we receive enough ACKs for a given offset or when the timeout
|
||||||
|
* is reached, the WAIT command is unblocked and the reply sent to the
|
||||||
|
* client.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* This just set a flag so that we broadcast a REPLCONF GETACK command
|
||||||
|
* to all the slaves in the beforeSleep() function. Note that this way
|
||||||
|
* we "group" all the clients that want to wait for synchronouns replication
|
||||||
|
* in a given event loop iteration, and send a single GETACK for them all. */
|
||||||
|
void replicationRequestAckFromSlaves(void) {
|
||||||
|
server.get_ack_from_slaves = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return the number of slaves that already acknowledged the specified
|
||||||
|
* replication offset. */
|
||||||
|
int replicationCountAcksByOffset(long long offset) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *slave = ln->value;
|
||||||
|
|
||||||
|
if (slave->replstate != REDIS_REPL_ONLINE) continue;
|
||||||
|
if (slave->repl_ack_off >= offset) count++;
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* WAIT for N replicas to acknowledge the processing of our latest
|
||||||
|
* write command (and all the previous commands). */
|
||||||
|
void waitCommand(redisClient *c) {
|
||||||
|
mstime_t timeout;
|
||||||
|
long numreplicas, ackreplicas;
|
||||||
|
long long offset = c->woff;
|
||||||
|
|
||||||
|
/* Argument parsing. */
|
||||||
|
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
|
||||||
|
return;
|
||||||
|
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
|
||||||
|
!= REDIS_OK) return;
|
||||||
|
|
||||||
|
/* First try without blocking at all. */
|
||||||
|
ackreplicas = replicationCountAcksByOffset(c->woff);
|
||||||
|
if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
|
||||||
|
addReplyLongLong(c,ackreplicas);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Otherwise block the client and put it into our list of clients
|
||||||
|
* waiting for ack from slaves. */
|
||||||
|
c->bpop.timeout = timeout;
|
||||||
|
c->bpop.reploffset = offset;
|
||||||
|
c->bpop.numreplicas = numreplicas;
|
||||||
|
listAddNodeTail(server.clients_waiting_acks,c);
|
||||||
|
blockClient(c,REDIS_BLOCKED_WAIT);
|
||||||
|
|
||||||
|
/* Make sure that the server will send an ACK request to all the slaves
|
||||||
|
* before returning to the event loop. */
|
||||||
|
replicationRequestAckFromSlaves();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This is called by unblockClient() to perform the blocking op type
|
||||||
|
* specific cleanup. We just remove the client from the list of clients
|
||||||
|
* waiting for replica acks. Never call it directly, call unblockClient()
|
||||||
|
* instead. */
|
||||||
|
void unblockClientWaitingReplicas(redisClient *c) {
|
||||||
|
listNode *ln = listSearchKey(server.clients_waiting_acks,c);
|
||||||
|
redisAssert(ln != NULL);
|
||||||
|
listDelNode(server.clients_waiting_acks,ln);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if there are clients blocked in WAIT that can be unblocked since
|
||||||
|
* we received enough ACKs from slaves. */
|
||||||
|
void processClientsWaitingReplicas(void) {
|
||||||
|
long long last_offset = 0;
|
||||||
|
int last_numreplicas = 0;
|
||||||
|
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
listRewind(server.clients_waiting_acks,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
redisClient *c = ln->value;
|
||||||
|
|
||||||
|
/* Every time we find a client that is satisfied for a given
|
||||||
|
* offset and number of replicas, we remember it so the next client
|
||||||
|
* may be unblocked without calling replicationCountAcksByOffset()
|
||||||
|
* if the requested offset / replicas were equal or less. */
|
||||||
|
if (last_offset && last_offset > c->bpop.reploffset &&
|
||||||
|
last_numreplicas > c->bpop.numreplicas)
|
||||||
|
{
|
||||||
|
unblockClient(c);
|
||||||
|
addReplyLongLong(c,last_numreplicas);
|
||||||
|
} else {
|
||||||
|
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
|
||||||
|
|
||||||
|
if (numreplicas >= c->bpop.numreplicas) {
|
||||||
|
last_offset = c->bpop.reploffset;
|
||||||
|
last_numreplicas = numreplicas;
|
||||||
|
unblockClient(c);
|
||||||
|
addReplyLongLong(c,numreplicas);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* --------------------------- REPLICATION CRON ---------------------------- */
|
||||||
|
|
||||||
/* Replication cron funciton, called 1 time per second. */
|
/* Replication cron funciton, called 1 time per second. */
|
||||||
void replicationCron(void) {
|
void replicationCron(void) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user