Wait for replicas when shutting down (#9872)
To avoid data loss, this commit adds a grace period for lagging replicas to catch up the replication offset. Done: * Wait for replicas when shutdown is triggered by SIGTERM and SIGINT. * Wait for replicas when shutdown is triggered by the SHUTDOWN command. A new blocked client type BLOCKED_SHUTDOWN is introduced, allowing multiple clients to call SHUTDOWN in parallel. Note that they don't expect a response unless an error happens and shutdown is aborted. * Log warning for each replica lagging behind when finishing shutdown. * CLIENT_PAUSE_WRITE while waiting for replicas. * Configurable grace period 'shutdown-timeout' in seconds (default 10). * New flags for the SHUTDOWN command: - NOW disables the grace period for lagging replicas. - FORCE ignores errors writing the RDB or AOF files which would normally prevent a shutdown. - ABORT cancels ongoing shutdown. Can't be combined with other flags. * New field in the output of the INFO command: 'shutdown_in_milliseconds'. The value is the remaining maximum time to wait for lagging replicas before finishing the shutdown. This field is present in the Server section **only** during shutdown. Not directly related: * When shutting down, if there is an AOF saving child, it is killed **even** if AOF is disabled. This can happen if BGREWRITEAOF is used when AOF is off. * Client pause now has end time and type (WRITE or ALL) per purpose. The different pause purposes are *CLIENT PAUSE command*, *failover* and *shutdown*. If clients are unpaused for one purpose, it doesn't affect client pause for other purposes. For example, the CLIENT UNPAUSE command doesn't affect client pause initiated by the failover or shutdown procedures. A completed failover or a failed shutdown doesn't unpause clients paused by the CLIENT PAUSE command. Notes: * DEBUG RESTART doesn't wait for replicas. * We already have a warning logged when a replica disconnects. This means that if any replica connection is lost during the shutdown, it is either logged as disconnected or as lagging at the time of exit. Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
1bf6d6f11e
commit
45a155bd0f
2
.github/.codespellrc
vendored
2
.github/.codespellrc
vendored
@ -1,5 +1,5 @@
|
|||||||
[codespell]
|
[codespell]
|
||||||
quiet-level = 2
|
quiet-level = 2
|
||||||
count =
|
count =
|
||||||
skip = ./deps,./src/crc16_slottable.h
|
skip = ./deps,./src/crc16_slottable.h,tmp*,./.git,./lcov-html
|
||||||
ignore-words = ./.github/wordlist.txt
|
ignore-words = ./.github/wordlist.txt
|
||||||
|
14
redis.conf
14
redis.conf
@ -1442,6 +1442,20 @@ aof-use-rdb-preamble yes
|
|||||||
# the AOF format in a way that may not be compatible with existing AOF parsers.
|
# the AOF format in a way that may not be compatible with existing AOF parsers.
|
||||||
aof-timestamp-enabled no
|
aof-timestamp-enabled no
|
||||||
|
|
||||||
|
################################ SHUTDOWN #####################################
|
||||||
|
|
||||||
|
# Maximum time to wait for replicas when shutting down, in seconds.
|
||||||
|
#
|
||||||
|
# During shut down, a grace period allows any lagging replicas to catch up with
|
||||||
|
# the latest replication offset before the master exists. This period can
|
||||||
|
# prevent data loss, especially for deployments without configured disk backups.
|
||||||
|
#
|
||||||
|
# The 'shutdown-timeout' value is the grace period's duration in seconds. It is
|
||||||
|
# only applicable when the instance has replicas. To disable the feature, set
|
||||||
|
# the value to 0.
|
||||||
|
#
|
||||||
|
# shutdown-timeout 10
|
||||||
|
|
||||||
################################ LUA SCRIPTING ###############################
|
################################ LUA SCRIPTING ###############################
|
||||||
|
|
||||||
# Max execution time of a Lua script in milliseconds.
|
# Max execution time of a Lua script in milliseconds.
|
||||||
|
@ -191,6 +191,8 @@ void unblockClient(client *c) {
|
|||||||
} else if (c->btype == BLOCKED_PAUSE) {
|
} else if (c->btype == BLOCKED_PAUSE) {
|
||||||
listDelNode(server.paused_clients,c->paused_list_node);
|
listDelNode(server.paused_clients,c->paused_list_node);
|
||||||
c->paused_list_node = NULL;
|
c->paused_list_node = NULL;
|
||||||
|
} else if (c->btype == BLOCKED_SHUTDOWN) {
|
||||||
|
/* No special cleanup. */
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown btype in unblockClient().");
|
serverPanic("Unknown btype in unblockClient().");
|
||||||
}
|
}
|
||||||
@ -231,6 +233,22 @@ void replyToBlockedClientTimedOut(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If one or more clients are blocked on the SHUTDOWN command, this function
|
||||||
|
* sends them an error reply and unblocks them. */
|
||||||
|
void replyToClientsBlockedOnShutdown(void) {
|
||||||
|
if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return;
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
listRewind(server.clients, &li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
client *c = listNodeValue(ln);
|
||||||
|
if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) {
|
||||||
|
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
|
||||||
|
unblockClient(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Mass-unblock clients because something changed in the instance that makes
|
/* Mass-unblock clients because something changed in the instance that makes
|
||||||
* blocking no longer safe. For example clients blocked in list operations
|
* blocking no longer safe. For example clients blocked in list operations
|
||||||
* in an instance which turns from master to slave is unsafe, so this function
|
* in an instance which turns from master to slave is unsafe, so this function
|
||||||
|
@ -2325,7 +2325,9 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
resetManualFailover();
|
resetManualFailover();
|
||||||
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
|
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
|
||||||
server.cluster->mf_slave = sender;
|
server.cluster->mf_slave = sender;
|
||||||
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE);
|
pauseClients(PAUSE_DURING_FAILOVER,
|
||||||
|
now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
|
||||||
|
CLIENT_PAUSE_WRITE);
|
||||||
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
|
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
|
||||||
sender->name);
|
sender->name);
|
||||||
/* We need to send a ping message to the replica, as it would carry
|
/* We need to send a ping message to the replica, as it would carry
|
||||||
@ -3590,7 +3592,7 @@ void resetManualFailover(void) {
|
|||||||
if (server.cluster->mf_slave) {
|
if (server.cluster->mf_slave) {
|
||||||
/* We were a master failing over, so we paused clients. Regardless
|
/* We were a master failing over, so we paused clients. Regardless
|
||||||
* of the outcome we unpause now to allow traffic again. */
|
* of the outcome we unpause now to allow traffic again. */
|
||||||
unpauseClients();
|
unpauseClients(PAUSE_DURING_FAILOVER);
|
||||||
}
|
}
|
||||||
server.cluster->mf_end = 0; /* No manual failover in progress. */
|
server.cluster->mf_end = 0; /* No manual failover in progress. */
|
||||||
server.cluster->mf_can_start = 0;
|
server.cluster->mf_can_start = 0;
|
||||||
|
@ -4321,7 +4321,10 @@ struct redisCommandArg REPLICAOF_Args[] = {
|
|||||||
/********** SHUTDOWN ********************/
|
/********** SHUTDOWN ********************/
|
||||||
|
|
||||||
/* SHUTDOWN history */
|
/* SHUTDOWN history */
|
||||||
#define SHUTDOWN_History NULL
|
commandHistory SHUTDOWN_History[] = {
|
||||||
|
{"7.0","Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting."},
|
||||||
|
{0}
|
||||||
|
};
|
||||||
|
|
||||||
/* SHUTDOWN hints */
|
/* SHUTDOWN hints */
|
||||||
#define SHUTDOWN_Hints NULL
|
#define SHUTDOWN_Hints NULL
|
||||||
@ -4336,6 +4339,9 @@ struct redisCommandArg SHUTDOWN_nosave_save_Subargs[] = {
|
|||||||
/* SHUTDOWN argument table */
|
/* SHUTDOWN argument table */
|
||||||
struct redisCommandArg SHUTDOWN_Args[] = {
|
struct redisCommandArg SHUTDOWN_Args[] = {
|
||||||
{"nosave_save",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SHUTDOWN_nosave_save_Subargs},
|
{"nosave_save",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SHUTDOWN_nosave_save_Subargs},
|
||||||
|
{"now",ARG_TYPE_PURE_TOKEN,-1,"NOW",NULL,NULL,CMD_ARG_OPTIONAL},
|
||||||
|
{"force",ARG_TYPE_PURE_TOKEN,-1,"FORCE",NULL,NULL,CMD_ARG_OPTIONAL},
|
||||||
|
{"abort",ARG_TYPE_PURE_TOKEN,-1,"ABORT",NULL,NULL,CMD_ARG_OPTIONAL},
|
||||||
{0}
|
{0}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -6542,7 +6548,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_WRITE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
|
{"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_WRITE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
|
||||||
{"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS},
|
{"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS},
|
||||||
{"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0},
|
{"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0},
|
||||||
{"shutdown","Synchronously save the dataset to disk and then shut down the server",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
|
{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
|
||||||
{"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args},
|
{"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args},
|
||||||
{"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands},
|
{"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands},
|
||||||
{"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args},
|
{"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args},
|
||||||
|
@ -1,10 +1,17 @@
|
|||||||
{
|
{
|
||||||
"SHUTDOWN": {
|
"SHUTDOWN": {
|
||||||
"summary": "Synchronously save the dataset to disk and then shut down the server",
|
"summary": "Synchronously save the dataset to disk and then shut down the server",
|
||||||
|
"complexity": "O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)",
|
||||||
"group": "server",
|
"group": "server",
|
||||||
"since": "1.0.0",
|
"since": "1.0.0",
|
||||||
"arity": -1,
|
"arity": -1,
|
||||||
"function": "shutdownCommand",
|
"function": "shutdownCommand",
|
||||||
|
"history": [
|
||||||
|
[
|
||||||
|
"7.0",
|
||||||
|
"Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting."
|
||||||
|
]
|
||||||
|
],
|
||||||
"command_flags": [
|
"command_flags": [
|
||||||
"ADMIN",
|
"ADMIN",
|
||||||
"NOSCRIPT",
|
"NOSCRIPT",
|
||||||
@ -29,6 +36,24 @@
|
|||||||
"token": "SAVE"
|
"token": "SAVE"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "now",
|
||||||
|
"type": "pure-token",
|
||||||
|
"token": "NOW",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "force",
|
||||||
|
"type": "pure-token",
|
||||||
|
"token": "FORCE",
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "abort",
|
||||||
|
"type": "pure-token",
|
||||||
|
"token": "ABORT",
|
||||||
|
"optional": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
@ -2717,6 +2717,7 @@ standardConfig configs[] = {
|
|||||||
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
||||||
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
|
||||||
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
|
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
|
||||||
|
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
|
||||||
|
|
||||||
/* Unsigned int configs */
|
/* Unsigned int configs */
|
||||||
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
|
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
|
||||||
|
54
src/db.c
54
src/db.c
@ -1033,23 +1033,59 @@ void typeCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void shutdownCommand(client *c) {
|
void shutdownCommand(client *c) {
|
||||||
int flags = 0;
|
int flags = SHUTDOWN_NOFLAGS;
|
||||||
|
int abort = 0;
|
||||||
if (c->argc > 2) {
|
for (int i = 1; i < c->argc; i++) {
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
|
||||||
return;
|
|
||||||
} else if (c->argc == 2) {
|
|
||||||
if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
|
|
||||||
flags |= SHUTDOWN_NOSAVE;
|
flags |= SHUTDOWN_NOSAVE;
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"save")) {
|
} else if (!strcasecmp(c->argv[i]->ptr,"save")) {
|
||||||
flags |= SHUTDOWN_SAVE;
|
flags |= SHUTDOWN_SAVE;
|
||||||
|
} else if (!strcasecmp(c->argv[i]->ptr, "now")) {
|
||||||
|
flags |= SHUTDOWN_NOW;
|
||||||
|
} else if (!strcasecmp(c->argv[i]->ptr, "force")) {
|
||||||
|
flags |= SHUTDOWN_FORCE;
|
||||||
|
} else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
|
||||||
|
abort = 1;
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
addReplyErrorObject(c,shared.syntaxerr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ((abort && flags != SHUTDOWN_NOFLAGS) ||
|
||||||
|
(flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
|
||||||
|
{
|
||||||
|
/* Illegal combo. */
|
||||||
|
addReplyErrorObject(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (abort) {
|
||||||
|
if (abortShutdown() == C_OK)
|
||||||
|
addReply(c, shared.ok);
|
||||||
|
else
|
||||||
|
addReplyError(c, "No shutdown in progress.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
|
||||||
|
addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(flags & SHUTDOWN_NOSAVE) && scriptIsTimedout()) {
|
||||||
|
/* Script timed out. Shutdown allowed only with the NOSAVE flag. See
|
||||||
|
* also processCommand where these errors are returned. */
|
||||||
|
if (scriptIsEval())
|
||||||
|
addReplyErrorObject(c, shared.slowevalerr);
|
||||||
|
else
|
||||||
|
addReplyErrorObject(c, shared.slowscripterr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockClient(c, BLOCKED_SHUTDOWN);
|
||||||
if (prepareForShutdown(flags) == C_OK) exit(0);
|
if (prepareForShutdown(flags) == C_OK) exit(0);
|
||||||
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
|
/* If we're here, then shutdown is ongoing (the client is still blocked) or
|
||||||
|
* failed (the client has received an error). */
|
||||||
}
|
}
|
||||||
|
|
||||||
void renameGenericCommand(client *c, int nx) {
|
void renameGenericCommand(client *c, int nx) {
|
||||||
|
@ -2873,7 +2873,7 @@ NULL
|
|||||||
addReplyNull(c);
|
addReplyNull(c);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
|
||||||
/* CLIENT UNPAUSE */
|
/* CLIENT UNPAUSE */
|
||||||
unpauseClients();
|
unpauseClients(PAUSE_BY_CLIENT_COMMAND);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
|
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
|
||||||
c->argc == 4))
|
c->argc == 4))
|
||||||
@ -2895,7 +2895,7 @@ NULL
|
|||||||
|
|
||||||
if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
|
if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
|
||||||
UNIT_MILLISECONDS) != C_OK) return;
|
UNIT_MILLISECONDS) != C_OK) return;
|
||||||
pauseClients(end, type);
|
pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||||
@ -3539,6 +3539,48 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Compute current most restictive pause type and its end time, aggregated for
|
||||||
|
* all pause purposes. */
|
||||||
|
static void updateClientPauseTypeAndEndTime(void) {
|
||||||
|
pause_type old_type = server.client_pause_type;
|
||||||
|
pause_type type = CLIENT_PAUSE_OFF;
|
||||||
|
mstime_t end = 0;
|
||||||
|
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
|
||||||
|
pause_event *p = server.client_pause_per_purpose[i];
|
||||||
|
if (p == NULL) {
|
||||||
|
/* Nothing to do. */
|
||||||
|
} else if (p->end < server.mstime) {
|
||||||
|
/* This one expired. */
|
||||||
|
zfree(p);
|
||||||
|
server.client_pause_per_purpose[i] = NULL;
|
||||||
|
} else if (p->type > type) {
|
||||||
|
/* This type is the most restrictive so far. */
|
||||||
|
type = p->type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Find the furthest end time among the pause purposes of the most
|
||||||
|
* restrictive type */
|
||||||
|
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
|
||||||
|
pause_event *p = server.client_pause_per_purpose[i];
|
||||||
|
if (p != NULL && p->type == type && p->end > end) end = p->end;
|
||||||
|
}
|
||||||
|
server.client_pause_type = type;
|
||||||
|
server.client_pause_end_time = end;
|
||||||
|
|
||||||
|
/* If the pause type is less restrictive than before, we unblock all clients
|
||||||
|
* so they are reprocessed (may get re-paused). */
|
||||||
|
if (type < old_type) {
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
listRewind(server.paused_clients, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
client *c = listNodeValue(ln);
|
||||||
|
unblockClient(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Pause clients up to the specified unixtime (in ms) for a given type of
|
/* Pause clients up to the specified unixtime (in ms) for a given type of
|
||||||
* commands.
|
* commands.
|
||||||
*
|
*
|
||||||
@ -3552,14 +3594,18 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
* The function always succeed, even if there is already a pause in progress.
|
* The function always succeed, even if there is already a pause in progress.
|
||||||
* In such a case, the duration is set to the maximum and new end time and the
|
* In such a case, the duration is set to the maximum and new end time and the
|
||||||
* type is set to the more restrictive type of pause. */
|
* type is set to the more restrictive type of pause. */
|
||||||
void pauseClients(mstime_t end, pause_type type) {
|
void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) {
|
||||||
if (type > server.client_pause_type) {
|
/* Manage pause type and end time per pause purpose. */
|
||||||
server.client_pause_type = type;
|
if (server.client_pause_per_purpose[purpose] == NULL) {
|
||||||
}
|
server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event));
|
||||||
|
server.client_pause_per_purpose[purpose]->type = type;
|
||||||
if (end > server.client_pause_end_time) {
|
server.client_pause_per_purpose[purpose]->end = end;
|
||||||
server.client_pause_end_time = end;
|
} else {
|
||||||
|
pause_event *p = server.client_pause_per_purpose[purpose];
|
||||||
|
p->type = max(p->type, type);
|
||||||
|
p->end = max(p->end, end);
|
||||||
}
|
}
|
||||||
|
updateClientPauseTypeAndEndTime();
|
||||||
|
|
||||||
/* We allow write commands that were queued
|
/* We allow write commands that were queued
|
||||||
* up before and after to execute. We need
|
* up before and after to execute. We need
|
||||||
@ -3571,20 +3617,11 @@ void pauseClients(mstime_t end, pause_type type) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Unpause clients and queue them for reprocessing. */
|
/* Unpause clients and queue them for reprocessing. */
|
||||||
void unpauseClients(void) {
|
void unpauseClients(pause_purpose purpose) {
|
||||||
listNode *ln;
|
if (server.client_pause_per_purpose[purpose] == NULL) return;
|
||||||
listIter li;
|
zfree(server.client_pause_per_purpose[purpose]);
|
||||||
client *c;
|
server.client_pause_per_purpose[purpose] = NULL;
|
||||||
|
updateClientPauseTypeAndEndTime();
|
||||||
server.client_pause_type = CLIENT_PAUSE_OFF;
|
|
||||||
server.client_pause_end_time = 0;
|
|
||||||
|
|
||||||
/* Unblock all of the clients so they are reprocessed. */
|
|
||||||
listRewind(server.paused_clients,&li);
|
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
|
||||||
c = listNodeValue(ln);
|
|
||||||
unblockClient(c);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns true if clients are paused and false otherwise. */
|
/* Returns true if clients are paused and false otherwise. */
|
||||||
@ -3599,7 +3636,7 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
|
|||||||
if (!areClientsPaused())
|
if (!areClientsPaused())
|
||||||
return 0;
|
return 0;
|
||||||
if (server.client_pause_end_time < server.mstime) {
|
if (server.client_pause_end_time < server.mstime) {
|
||||||
unpauseClients();
|
updateClientPauseTypeAndEndTime();
|
||||||
}
|
}
|
||||||
return areClientsPaused();
|
return areClientsPaused();
|
||||||
}
|
}
|
||||||
|
@ -3786,7 +3786,7 @@ void clearFailoverState() {
|
|||||||
server.target_replica_host = NULL;
|
server.target_replica_host = NULL;
|
||||||
server.target_replica_port = 0;
|
server.target_replica_port = 0;
|
||||||
server.failover_state = NO_FAILOVER;
|
server.failover_state = NO_FAILOVER;
|
||||||
unpauseClients();
|
unpauseClients(PAUSE_DURING_FAILOVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Abort an ongoing failover if one is going on. */
|
/* Abort an ongoing failover if one is going on. */
|
||||||
@ -3935,7 +3935,7 @@ void failoverCommand(client *c) {
|
|||||||
server.force_failover = force_flag;
|
server.force_failover = force_flag;
|
||||||
server.failover_state = FAILOVER_WAIT_FOR_SYNC;
|
server.failover_state = FAILOVER_WAIT_FOR_SYNC;
|
||||||
/* Cluster failover will unpause eventually */
|
/* Cluster failover will unpause eventually */
|
||||||
pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE);
|
pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
249
src/server.c
249
src/server.c
@ -83,6 +83,13 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
|||||||
/* Global vars */
|
/* Global vars */
|
||||||
struct redisServer server; /* Server global state */
|
struct redisServer server; /* Server global state */
|
||||||
|
|
||||||
|
/*============================ Internal prototypes ========================== */
|
||||||
|
|
||||||
|
static inline int isShutdownInitiated(void);
|
||||||
|
int isReadyToShutdown(void);
|
||||||
|
int finishShutdown(void);
|
||||||
|
const char *replstateToString(int replstate);
|
||||||
|
|
||||||
/*============================ Utility functions ============================ */
|
/*============================ Utility functions ============================ */
|
||||||
|
|
||||||
/* We use a private localtime implementation which is fork-safe. The logging
|
/* We use a private localtime implementation which is fork-safe. The logging
|
||||||
@ -1137,10 +1144,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
|
|
||||||
/* We received a SIGTERM, shutting down here in a safe way, as it is
|
/* We received a SIGTERM, shutting down here in a safe way, as it is
|
||||||
* not ok doing so inside the signal handler. */
|
* not ok doing so inside the signal handler. */
|
||||||
if (server.shutdown_asap) {
|
if (server.shutdown_asap && !isShutdownInitiated()) {
|
||||||
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
|
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
|
||||||
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
|
} else if (isShutdownInitiated()) {
|
||||||
server.shutdown_asap = 0;
|
if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
|
||||||
|
if (finishShutdown() == C_OK) exit(0);
|
||||||
|
/* Shutdown failed. Continue running. An error has been logged. */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Show some info about non-empty databases */
|
/* Show some info about non-empty databases */
|
||||||
@ -1383,6 +1393,14 @@ void whileBlockedCron() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void sendGetackToReplicas(void) {
|
||||||
|
robj *argv[3];
|
||||||
|
argv[0] = shared.replconf;
|
||||||
|
argv[1] = shared.getack;
|
||||||
|
argv[2] = shared.special_asterick; /* Not used argument. */
|
||||||
|
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
|
||||||
|
}
|
||||||
|
|
||||||
extern int ProcessingEventsWhileBlocked;
|
extern int ProcessingEventsWhileBlocked;
|
||||||
|
|
||||||
/* This function gets called every time Redis is entering the
|
/* This function gets called every time Redis is entering the
|
||||||
@ -1467,12 +1485,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
* increment the replication backlog, they'll be sent after the pause
|
* increment the replication backlog, they'll be sent after the pause
|
||||||
* if we are still the master. */
|
* if we are still the master. */
|
||||||
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
|
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
|
||||||
robj *argv[3];
|
sendGetackToReplicas();
|
||||||
|
|
||||||
argv[0] = shared.replconf;
|
|
||||||
argv[1] = shared.getack;
|
|
||||||
argv[2] = shared.special_asterick; /* Not used argument. */
|
|
||||||
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
|
|
||||||
server.get_ack_from_slaves = 0;
|
server.get_ack_from_slaves = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1750,6 +1763,8 @@ void initServerConfig(void) {
|
|||||||
memset(server.blocked_clients_by_type,0,
|
memset(server.blocked_clients_by_type,0,
|
||||||
sizeof(server.blocked_clients_by_type));
|
sizeof(server.blocked_clients_by_type));
|
||||||
server.shutdown_asap = 0;
|
server.shutdown_asap = 0;
|
||||||
|
server.shutdown_flags = 0;
|
||||||
|
server.shutdown_mstime = 0;
|
||||||
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
|
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
|
||||||
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType);
|
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType);
|
||||||
server.next_client_id = 1; /* Client IDs, start from 1 .*/
|
server.next_client_id = 1; /* Client IDs, start from 1 .*/
|
||||||
@ -1852,9 +1867,9 @@ int restartServer(int flags, mstime_t delay) {
|
|||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Perform a proper shutdown. */
|
/* Perform a proper shutdown. We don't wait for lagging replicas though. */
|
||||||
if (flags & RESTART_SERVER_GRACEFULLY &&
|
if (flags & RESTART_SERVER_GRACEFULLY &&
|
||||||
prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
|
prepareForShutdown(SHUTDOWN_NOW) != C_OK)
|
||||||
{
|
{
|
||||||
serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
|
serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
@ -2266,6 +2281,8 @@ void initServer(void) {
|
|||||||
server.get_ack_from_slaves = 0;
|
server.get_ack_from_slaves = 0;
|
||||||
server.client_pause_type = CLIENT_PAUSE_OFF;
|
server.client_pause_type = CLIENT_PAUSE_OFF;
|
||||||
server.client_pause_end_time = 0;
|
server.client_pause_end_time = 0;
|
||||||
|
memset(server.client_pause_per_purpose, 0,
|
||||||
|
sizeof(server.client_pause_per_purpose));
|
||||||
server.paused_clients = listCreate();
|
server.paused_clients = listCreate();
|
||||||
server.events_processed_while_blocked = 0;
|
server.events_processed_while_blocked = 0;
|
||||||
server.system_memory_size = zmalloc_get_memory_size();
|
server.system_memory_size = zmalloc_get_memory_size();
|
||||||
@ -3532,9 +3549,7 @@ int processCommand(client *c) {
|
|||||||
c->cmd->proc != unwatchCommand &&
|
c->cmd->proc != unwatchCommand &&
|
||||||
c->cmd->proc != quitCommand &&
|
c->cmd->proc != quitCommand &&
|
||||||
c->cmd->proc != resetCommand &&
|
c->cmd->proc != resetCommand &&
|
||||||
!(c->cmd->proc == shutdownCommand &&
|
c->cmd->proc != shutdownCommand && /* more checks in shutdownCommand */
|
||||||
c->argc == 2 &&
|
|
||||||
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
|
|
||||||
!(c->cmd->proc == scriptCommand &&
|
!(c->cmd->proc == scriptCommand &&
|
||||||
c->argc == 2 &&
|
c->argc == 2 &&
|
||||||
tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
|
tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
|
||||||
@ -3620,7 +3635,32 @@ void closeListeningSockets(int unlink_unix_socket) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Prepare for shutting down the server. Flags:
|
||||||
|
*
|
||||||
|
* - SHUTDOWN_SAVE: Save a database dump even if the server is configured not to
|
||||||
|
* save any dump.
|
||||||
|
*
|
||||||
|
* - SHUTDOWN_NOSAVE: Don't save any database dump even if the server is
|
||||||
|
* configured to save one.
|
||||||
|
*
|
||||||
|
* - SHUTDOWN_NOW: Don't wait for replicas to catch up before shutting down.
|
||||||
|
*
|
||||||
|
* - SHUTDOWN_FORCE: Ignore errors writing AOF and RDB files on disk, which
|
||||||
|
* would normally prevent a shutdown.
|
||||||
|
*
|
||||||
|
* Unless SHUTDOWN_NOW is set and if any replicas are lagging behind, C_ERR is
|
||||||
|
* returned and server.shutdown_mstime is set to a timestamp to allow a grace
|
||||||
|
* period for the replicas to catch up. This is checked and handled by
|
||||||
|
* serverCron() which completes the shutdown as soon as possible.
|
||||||
|
*
|
||||||
|
* If shutting down fails due to errors writing RDB or AOF files, C_ERR is
|
||||||
|
* returned and an error is logged. If the flag SHUTDOWN_FORCE is set, these
|
||||||
|
* errors are logged but ignored and C_OK is returned.
|
||||||
|
*
|
||||||
|
* On success, this function returns C_OK and then it's OK to call exit(0). */
|
||||||
int prepareForShutdown(int flags) {
|
int prepareForShutdown(int flags) {
|
||||||
|
if (isShutdownInitiated()) return C_ERR;
|
||||||
|
|
||||||
/* When SHUTDOWN is called while the server is loading a dataset in
|
/* When SHUTDOWN is called while the server is loading a dataset in
|
||||||
* memory we need to make sure no attempt is performed to save
|
* memory we need to make sure no attempt is performed to save
|
||||||
* the dataset on shutdown (otherwise it could overwrite the current DB
|
* the dataset on shutdown (otherwise it could overwrite the current DB
|
||||||
@ -3630,13 +3670,108 @@ int prepareForShutdown(int flags) {
|
|||||||
if (server.loading || server.sentinel_mode)
|
if (server.loading || server.sentinel_mode)
|
||||||
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
|
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
|
||||||
|
|
||||||
int save = flags & SHUTDOWN_SAVE;
|
server.shutdown_flags = flags;
|
||||||
int nosave = flags & SHUTDOWN_NOSAVE;
|
|
||||||
|
|
||||||
serverLog(LL_WARNING,"User requested shutdown...");
|
serverLog(LL_WARNING,"User requested shutdown...");
|
||||||
if (server.supervised_mode == SUPERVISED_SYSTEMD)
|
if (server.supervised_mode == SUPERVISED_SYSTEMD)
|
||||||
redisCommunicateSystemd("STOPPING=1\n");
|
redisCommunicateSystemd("STOPPING=1\n");
|
||||||
|
|
||||||
|
/* If we have any replicas, let them catch up the replication offset before
|
||||||
|
* we shut down, to avoid data loss. */
|
||||||
|
if (!(flags & SHUTDOWN_NOW) &&
|
||||||
|
server.shutdown_timeout != 0 &&
|
||||||
|
!isReadyToShutdown())
|
||||||
|
{
|
||||||
|
server.shutdown_mstime = server.mstime + server.shutdown_timeout * 1000;
|
||||||
|
if (!areClientsPaused()) sendGetackToReplicas();
|
||||||
|
pauseClients(PAUSE_DURING_SHUTDOWN, LLONG_MAX, CLIENT_PAUSE_WRITE);
|
||||||
|
serverLog(LL_NOTICE, "Waiting for replicas before shutting down.");
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return finishShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int isShutdownInitiated(void) {
|
||||||
|
return server.shutdown_mstime != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns 0 if there are any replicas which are lagging in replication which we
|
||||||
|
* need to wait for before shutting down. Returns 1 if we're ready to shut
|
||||||
|
* down now. */
|
||||||
|
int isReadyToShutdown(void) {
|
||||||
|
if (listLength(server.slaves) == 0) return 1; /* No replicas. */
|
||||||
|
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(server.slaves, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
client *replica = listNodeValue(ln);
|
||||||
|
if (replica->repl_ack_off != server.master_repl_offset) return 0;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cancelShutdown(void) {
|
||||||
|
server.shutdown_asap = 0;
|
||||||
|
server.shutdown_flags = 0;
|
||||||
|
server.shutdown_mstime = 0;
|
||||||
|
replyToClientsBlockedOnShutdown();
|
||||||
|
unpauseClients(PAUSE_DURING_SHUTDOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns C_OK if shutdown was aborted and C_ERR if shutdown wasn't ongoing. */
|
||||||
|
int abortShutdown(void) {
|
||||||
|
if (isShutdownInitiated()) {
|
||||||
|
cancelShutdown();
|
||||||
|
} else if (server.shutdown_asap) {
|
||||||
|
/* Signal handler has requested shutdown, but it hasn't been initiated
|
||||||
|
* yet. Just clear the flag. */
|
||||||
|
server.shutdown_asap = 0;
|
||||||
|
} else {
|
||||||
|
/* Shutdown neither initiated nor requested. */
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
serverLog(LL_NOTICE, "Shutdown manually aborted.");
|
||||||
|
return C_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* The final step of the shutdown sequence. Returns C_OK if the shutdown
|
||||||
|
* sequence was successful and it's OK to call exit(). If C_ERR is returned,
|
||||||
|
* it's not safe to call exit(). */
|
||||||
|
int finishShutdown(void) {
|
||||||
|
|
||||||
|
int save = server.shutdown_flags & SHUTDOWN_SAVE;
|
||||||
|
int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE;
|
||||||
|
int force = server.shutdown_flags & SHUTDOWN_FORCE;
|
||||||
|
|
||||||
|
/* Log a warning for each replica that is lagging. */
|
||||||
|
listIter replicas_iter;
|
||||||
|
listNode *replicas_list_node;
|
||||||
|
int num_replicas = 0, num_lagging_replicas = 0;
|
||||||
|
listRewind(server.slaves, &replicas_iter);
|
||||||
|
while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
|
||||||
|
client *replica = listNodeValue(replicas_list_node);
|
||||||
|
num_replicas++;
|
||||||
|
if (replica->repl_ack_off != server.master_repl_offset) {
|
||||||
|
num_lagging_replicas++;
|
||||||
|
long lag = replica->replstate == SLAVE_STATE_ONLINE ?
|
||||||
|
time(NULL) - replica->repl_ack_time : 0;
|
||||||
|
serverLog(LL_WARNING,
|
||||||
|
"Lagging replica %s reported offset %lld behind master, lag=%ld, state=%s.",
|
||||||
|
replicationGetSlaveName(replica),
|
||||||
|
server.master_repl_offset - replica->repl_ack_off,
|
||||||
|
lag,
|
||||||
|
replstateToString(replica->replstate));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (num_replicas > 0) {
|
||||||
|
serverLog(LL_NOTICE,
|
||||||
|
"%d of %d replicas are in sync when shutting down.",
|
||||||
|
num_replicas - num_lagging_replicas,
|
||||||
|
num_replicas);
|
||||||
|
}
|
||||||
|
|
||||||
/* Kill all the Lua debugger forked sessions. */
|
/* Kill all the Lua debugger forked sessions. */
|
||||||
ldbKillForkedSessions();
|
ldbKillForkedSessions();
|
||||||
|
|
||||||
@ -3661,20 +3796,24 @@ int prepareForShutdown(int flags) {
|
|||||||
TerminateModuleForkChild(server.child_pid,0);
|
TerminateModuleForkChild(server.child_pid,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.aof_state != AOF_OFF) {
|
/* Kill the AOF saving child as the AOF we already have may be longer
|
||||||
/* Kill the AOF saving child as the AOF we already have may be longer
|
* but contains the full dataset anyway. */
|
||||||
* but contains the full dataset anyway. */
|
if (server.child_type == CHILD_TYPE_AOF) {
|
||||||
if (server.child_type == CHILD_TYPE_AOF) {
|
/* If we have AOF enabled but haven't written the AOF yet, don't
|
||||||
/* If we have AOF enabled but haven't written the AOF yet, don't
|
* shutdown or else the dataset will be lost. */
|
||||||
* shutdown or else the dataset will be lost. */
|
if (server.aof_state == AOF_WAIT_REWRITE) {
|
||||||
if (server.aof_state == AOF_WAIT_REWRITE) {
|
if (force) {
|
||||||
|
serverLog(LL_WARNING, "Writing initial AOF. Exit anyway.");
|
||||||
|
} else {
|
||||||
serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
|
serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
|
||||||
return C_ERR;
|
goto error;
|
||||||
}
|
}
|
||||||
serverLog(LL_WARNING,
|
|
||||||
"There is a child rewriting the AOF. Killing it!");
|
|
||||||
killAppendOnlyChild();
|
|
||||||
}
|
}
|
||||||
|
serverLog(LL_WARNING,
|
||||||
|
"There is a child rewriting the AOF. Killing it!");
|
||||||
|
killAppendOnlyChild();
|
||||||
|
}
|
||||||
|
if (server.aof_state != AOF_OFF) {
|
||||||
/* Append only file: flush buffers and fsync() the AOF at exit */
|
/* Append only file: flush buffers and fsync() the AOF at exit */
|
||||||
serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
|
serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
|
||||||
flushAppendOnlyFile(1);
|
flushAppendOnlyFile(1);
|
||||||
@ -3698,10 +3837,14 @@ int prepareForShutdown(int flags) {
|
|||||||
* in the next cron() Redis will be notified that the background
|
* in the next cron() Redis will be notified that the background
|
||||||
* saving aborted, handling special stuff like slaves pending for
|
* saving aborted, handling special stuff like slaves pending for
|
||||||
* synchronization... */
|
* synchronization... */
|
||||||
serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
|
if (force) {
|
||||||
if (server.supervised_mode == SUPERVISED_SYSTEMD)
|
serverLog(LL_WARNING,"Error trying to save the DB. Exit anyway.");
|
||||||
redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
|
} else {
|
||||||
return C_ERR;
|
serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
|
||||||
|
if (server.supervised_mode == SUPERVISED_SYSTEMD)
|
||||||
|
redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3723,6 +3866,11 @@ int prepareForShutdown(int flags) {
|
|||||||
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
||||||
server.sentinel_mode ? "Sentinel" : "Redis");
|
server.sentinel_mode ? "Sentinel" : "Redis");
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
|
error:
|
||||||
|
serverLog(LL_WARNING, "Errors trying to shut down the server. Check the logs for more information.");
|
||||||
|
cancelShutdown();
|
||||||
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*================================== Commands =============================== */
|
/*================================== Commands =============================== */
|
||||||
@ -4404,6 +4552,20 @@ sds getFullCommandName(struct redisCommand *cmd) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char *replstateToString(int replstate) {
|
||||||
|
switch (replstate) {
|
||||||
|
case SLAVE_STATE_WAIT_BGSAVE_START:
|
||||||
|
case SLAVE_STATE_WAIT_BGSAVE_END:
|
||||||
|
return "wait_bgsave";
|
||||||
|
case SLAVE_STATE_SEND_BULK:
|
||||||
|
return "send_bulk";
|
||||||
|
case SLAVE_STATE_ONLINE:
|
||||||
|
return "online";
|
||||||
|
default:
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Characters we sanitize on INFO output to maintain expected format. */
|
/* Characters we sanitize on INFO output to maintain expected format. */
|
||||||
static char unsafe_info_chars[] = "#:\n\r";
|
static char unsafe_info_chars[] = "#:\n\r";
|
||||||
static char unsafe_info_chars_substs[] = "____"; /* Must be same length as above */
|
static char unsafe_info_chars_substs[] = "____"; /* Must be same length as above */
|
||||||
@ -4550,6 +4712,13 @@ sds genRedisInfoString(const char *section) {
|
|||||||
server.executable ? server.executable : "",
|
server.executable ? server.executable : "",
|
||||||
server.configfile ? server.configfile : "",
|
server.configfile ? server.configfile : "",
|
||||||
server.io_threads_active);
|
server.io_threads_active);
|
||||||
|
|
||||||
|
/* Conditional properties */
|
||||||
|
if (isShutdownInitiated()) {
|
||||||
|
info = sdscatfmt(info,
|
||||||
|
"shutdown_in_milliseconds:%I\r\n",
|
||||||
|
(int64_t)(server.shutdown_mstime - server.mstime));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Clients */
|
/* Clients */
|
||||||
@ -5047,7 +5216,6 @@ sds genRedisInfoString(const char *section) {
|
|||||||
listRewind(server.slaves,&li);
|
listRewind(server.slaves,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *slave = listNodeValue(ln);
|
client *slave = listNodeValue(ln);
|
||||||
char *state = NULL;
|
|
||||||
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_addr;
|
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_addr;
|
||||||
int port;
|
int port;
|
||||||
long lag = 0;
|
long lag = 0;
|
||||||
@ -5057,19 +5225,8 @@ sds genRedisInfoString(const char *section) {
|
|||||||
continue;
|
continue;
|
||||||
slaveip = ip;
|
slaveip = ip;
|
||||||
}
|
}
|
||||||
switch(slave->replstate) {
|
const char *state = replstateToString(slave->replstate);
|
||||||
case SLAVE_STATE_WAIT_BGSAVE_START:
|
if (state[0] == '\0') continue;
|
||||||
case SLAVE_STATE_WAIT_BGSAVE_END:
|
|
||||||
state = "wait_bgsave";
|
|
||||||
break;
|
|
||||||
case SLAVE_STATE_SEND_BULK:
|
|
||||||
state = "send_bulk";
|
|
||||||
break;
|
|
||||||
case SLAVE_STATE_ONLINE:
|
|
||||||
state = "online";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (state == NULL) continue;
|
|
||||||
if (slave->replstate == SLAVE_STATE_ONLINE)
|
if (slave->replstate == SLAVE_STATE_ONLINE)
|
||||||
lag = time(NULL) - slave->repl_ack_time;
|
lag = time(NULL) - slave->repl_ack_time;
|
||||||
|
|
||||||
@ -5589,7 +5746,7 @@ static void sigShutdownHandler(int sig) {
|
|||||||
/* SIGINT is often delivered via Ctrl+C in an interactive session.
|
/* SIGINT is often delivered via Ctrl+C in an interactive session.
|
||||||
* If we receive the signal the second time, we interpret this as
|
* If we receive the signal the second time, we interpret this as
|
||||||
* the user really wanting to quit ASAP without waiting to persist
|
* the user really wanting to quit ASAP without waiting to persist
|
||||||
* on disk. */
|
* on disk and without waiting for lagging replicas. */
|
||||||
if (server.shutdown_asap && sig == SIGINT) {
|
if (server.shutdown_asap && sig == SIGINT) {
|
||||||
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
|
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
|
||||||
rdbRemoveTempFile(getpid(), 1);
|
rdbRemoveTempFile(getpid(), 1);
|
||||||
|
32
src/server.h
32
src/server.h
@ -321,7 +321,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||||||
#define BLOCKED_STREAM 4 /* XREAD. */
|
#define BLOCKED_STREAM 4 /* XREAD. */
|
||||||
#define BLOCKED_ZSET 5 /* BZPOP et al. */
|
#define BLOCKED_ZSET 5 /* BZPOP et al. */
|
||||||
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
|
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
|
||||||
#define BLOCKED_NUM 7 /* Number of blocked states. */
|
#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */
|
||||||
|
#define BLOCKED_NUM 8 /* Number of blocked states. */
|
||||||
|
|
||||||
/* Client request types */
|
/* Client request types */
|
||||||
#define PROTO_REQ_INLINE 1
|
#define PROTO_REQ_INLINE 1
|
||||||
@ -484,6 +485,8 @@ typedef enum {
|
|||||||
#define SHUTDOWN_SAVE 1 /* Force SAVE on SHUTDOWN even if no save
|
#define SHUTDOWN_SAVE 1 /* Force SAVE on SHUTDOWN even if no save
|
||||||
points are configured. */
|
points are configured. */
|
||||||
#define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */
|
#define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */
|
||||||
|
#define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */
|
||||||
|
#define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */
|
||||||
|
|
||||||
/* Command call flags, see call() function */
|
/* Command call flags, see call() function */
|
||||||
#define CMD_CALL_NONE 0
|
#define CMD_CALL_NONE 0
|
||||||
@ -508,6 +511,19 @@ typedef enum {
|
|||||||
CLIENT_PAUSE_ALL /* Pause all commands */
|
CLIENT_PAUSE_ALL /* Pause all commands */
|
||||||
} pause_type;
|
} pause_type;
|
||||||
|
|
||||||
|
/* Client pause purposes. Each purpose has its own end time and pause type. */
|
||||||
|
typedef enum {
|
||||||
|
PAUSE_BY_CLIENT_COMMAND = 0,
|
||||||
|
PAUSE_DURING_SHUTDOWN,
|
||||||
|
PAUSE_DURING_FAILOVER,
|
||||||
|
NUM_PAUSE_PURPOSES /* This value is the number of purposes above. */
|
||||||
|
} pause_purpose;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pause_type type;
|
||||||
|
mstime_t end;
|
||||||
|
} pause_event;
|
||||||
|
|
||||||
/* RDB active child save type. */
|
/* RDB active child save type. */
|
||||||
#define RDB_CHILD_TYPE_NONE 0
|
#define RDB_CHILD_TYPE_NONE 0
|
||||||
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
|
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
|
||||||
@ -1353,7 +1369,9 @@ struct redisServer {
|
|||||||
aeEventLoop *el;
|
aeEventLoop *el;
|
||||||
rax *errors; /* Errors table */
|
rax *errors; /* Errors table */
|
||||||
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
|
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
|
||||||
volatile sig_atomic_t shutdown_asap; /* SHUTDOWN needed ASAP */
|
volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */
|
||||||
|
mstime_t shutdown_mstime; /* Timestamp to limit graceful shutdown. */
|
||||||
|
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
|
||||||
int activerehashing; /* Incremental rehash in serverCron() */
|
int activerehashing; /* Incremental rehash in serverCron() */
|
||||||
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
|
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
|
||||||
char *pidfile; /* PID file path */
|
char *pidfile; /* PID file path */
|
||||||
@ -1413,6 +1431,7 @@ struct redisServer {
|
|||||||
pause_type client_pause_type; /* True if clients are currently paused */
|
pause_type client_pause_type; /* True if clients are currently paused */
|
||||||
list *paused_clients; /* List of pause clients */
|
list *paused_clients; /* List of pause clients */
|
||||||
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
|
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
|
||||||
|
pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES];
|
||||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||||
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
|
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
|
||||||
@ -1613,6 +1632,9 @@ struct redisServer {
|
|||||||
int memcheck_enabled; /* Enable memory check on crash. */
|
int memcheck_enabled; /* Enable memory check on crash. */
|
||||||
int use_exit_on_panic; /* Use exit() on panic and assert rather than
|
int use_exit_on_panic; /* Use exit() on panic and assert rather than
|
||||||
* abort(). useful for Valgrind. */
|
* abort(). useful for Valgrind. */
|
||||||
|
/* Shutdown */
|
||||||
|
int shutdown_timeout; /* Graceful shutdown time limit in seconds. */
|
||||||
|
|
||||||
/* Replication (master) */
|
/* Replication (master) */
|
||||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
|
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
|
||||||
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
|
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
|
||||||
@ -2344,8 +2366,8 @@ void flushSlavesOutputBuffers(void);
|
|||||||
void disconnectSlaves(void);
|
void disconnectSlaves(void);
|
||||||
void evictClients(void);
|
void evictClients(void);
|
||||||
int listenToPort(int port, socketFds *fds);
|
int listenToPort(int port, socketFds *fds);
|
||||||
void pauseClients(mstime_t duration, pause_type type);
|
void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
|
||||||
void unpauseClients(void);
|
void unpauseClients(pause_purpose purpose);
|
||||||
int areClientsPaused(void);
|
int areClientsPaused(void);
|
||||||
int checkClientPauseTimeoutAndReturnIfPaused(void);
|
int checkClientPauseTimeoutAndReturnIfPaused(void);
|
||||||
void processEventsWhileBlocked(void);
|
void processEventsWhileBlocked(void);
|
||||||
@ -2708,6 +2730,8 @@ void preventCommandAOF(client *c);
|
|||||||
void preventCommandReplication(client *c);
|
void preventCommandReplication(client *c);
|
||||||
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
|
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
|
||||||
int prepareForShutdown(int flags);
|
int prepareForShutdown(int flags);
|
||||||
|
void replyToClientsBlockedOnShutdown(void);
|
||||||
|
int abortShutdown(void);
|
||||||
void afterCommand(client *c);
|
void afterCommand(client *c);
|
||||||
int inNestedCall(void);
|
int inNestedCall(void);
|
||||||
#ifdef __GNUC__
|
#ifdef __GNUC__
|
||||||
|
@ -42,7 +42,9 @@ start_server {} {
|
|||||||
$replica config resetstat
|
$replica config resetstat
|
||||||
|
|
||||||
catch {
|
catch {
|
||||||
restart_server 0 true false
|
# SHUTDOWN NOW ensures master doesn't send GETACK to replicas before
|
||||||
|
# shutting down which would affect the replication offset.
|
||||||
|
restart_server 0 true false true now
|
||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
}
|
}
|
||||||
wait_for_condition 50 1000 {
|
wait_for_condition 50 1000 {
|
||||||
@ -77,9 +79,14 @@ start_server {} {
|
|||||||
|
|
||||||
after 20
|
after 20
|
||||||
|
|
||||||
|
# Wait until master has received ACK from replica. If the master thinks
|
||||||
|
# that any replica is lagging when it shuts down, master would send
|
||||||
|
# GETACK to the replicas, affecting the replication offset.
|
||||||
|
set offset [status $master master_repl_offset]
|
||||||
wait_for_condition 500 100 {
|
wait_for_condition 500 100 {
|
||||||
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
|
[string match "*slave0:*,offset=$offset,*" [$master info replication]] &&
|
||||||
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
|
$offset == [status $replica master_repl_offset] &&
|
||||||
|
$offset == [status $sub_replica master_repl_offset]
|
||||||
} else {
|
} else {
|
||||||
show_cluster_status
|
show_cluster_status
|
||||||
fail "Replicas and master offsets were unable to match *exactly*."
|
fail "Replicas and master offsets were unable to match *exactly*."
|
||||||
@ -89,6 +96,11 @@ start_server {} {
|
|||||||
$replica config resetstat
|
$replica config resetstat
|
||||||
|
|
||||||
catch {
|
catch {
|
||||||
|
# Unlike the test above, here we use SIGTERM, which behaves
|
||||||
|
# differently compared to SHUTDOWN NOW if there are lagging
|
||||||
|
# replicas. This is just to increase coverage and let each test use
|
||||||
|
# a different shutdown approach. In this case there are no lagging
|
||||||
|
# replicas though.
|
||||||
restart_server 0 true false
|
restart_server 0 true false
|
||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
}
|
}
|
||||||
@ -136,6 +148,9 @@ start_server {} {
|
|||||||
$replica config resetstat
|
$replica config resetstat
|
||||||
|
|
||||||
catch {
|
catch {
|
||||||
|
# Unlike the test above, here we use SIGTERM. This is just to
|
||||||
|
# increase coverage and let each test use a different shutdown
|
||||||
|
# approach.
|
||||||
restart_server 0 true false
|
restart_server 0 true false
|
||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
}
|
}
|
||||||
|
238
tests/integration/shutdown.tcl
Normal file
238
tests/integration/shutdown.tcl
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
# This test suite tests shutdown when there are lagging replicas connected.
|
||||||
|
|
||||||
|
# Fill up the OS socket send buffer for the replica connection 1M at a time.
|
||||||
|
# When the replication buffer memory increases beyond 2M (often after writing 4M
|
||||||
|
# or so), we assume it's because the OS socket send buffer can't swallow
|
||||||
|
# anymore.
|
||||||
|
proc fill_up_os_socket_send_buffer_for_repl {idx} {
|
||||||
|
set i 0
|
||||||
|
while {1} {
|
||||||
|
incr i
|
||||||
|
populate 1024 junk$i: 1024 $idx
|
||||||
|
after 10
|
||||||
|
set buf_size [s $idx mem_total_replication_buffers]
|
||||||
|
if {$buf_size > 2*1024*1024} {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach how {sigterm shutdown} {
|
||||||
|
test "Shutting down master waits for replica to catch up ($how)" {
|
||||||
|
start_server {} {
|
||||||
|
start_server {} {
|
||||||
|
set master [srv -1 client]
|
||||||
|
set master_host [srv -1 host]
|
||||||
|
set master_port [srv -1 port]
|
||||||
|
set master_pid [srv -1 pid]
|
||||||
|
set replica [srv 0 client]
|
||||||
|
set replica_pid [srv 0 pid]
|
||||||
|
|
||||||
|
# Config master.
|
||||||
|
$master config set shutdown-timeout 300; # 5min for slow CI
|
||||||
|
$master config set repl-backlog-size 1; # small as possible
|
||||||
|
$master config set hz 100; # cron runs every 10ms
|
||||||
|
|
||||||
|
# Config replica.
|
||||||
|
$replica replicaof $master_host $master_port
|
||||||
|
wait_for_sync $replica
|
||||||
|
|
||||||
|
# Preparation: Set k to 1 on both master and replica.
|
||||||
|
$master set k 1
|
||||||
|
wait_for_ofs_sync $master $replica
|
||||||
|
|
||||||
|
# Pause the replica.
|
||||||
|
exec kill -SIGSTOP $replica_pid
|
||||||
|
after 10
|
||||||
|
|
||||||
|
# Fill up the OS socket send buffer for the replica connection
|
||||||
|
# to prevent the following INCR from reaching the replica via
|
||||||
|
# the OS.
|
||||||
|
fill_up_os_socket_send_buffer_for_repl -1
|
||||||
|
|
||||||
|
# Incr k and immediately shutdown master.
|
||||||
|
$master incr k
|
||||||
|
switch $how {
|
||||||
|
sigterm {
|
||||||
|
exec kill -SIGTERM $master_pid
|
||||||
|
}
|
||||||
|
shutdown {
|
||||||
|
set rd [redis_deferring_client -1]
|
||||||
|
$rd shutdown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[s -1 shutdown_in_milliseconds] > 0
|
||||||
|
} else {
|
||||||
|
fail "Master not indicating ongoing shutdown."
|
||||||
|
}
|
||||||
|
|
||||||
|
# Wake up replica and check if master has waited for it.
|
||||||
|
after 20; # 2 cron intervals
|
||||||
|
exec kill -SIGCONT $replica_pid
|
||||||
|
wait_for_condition 300 1000 {
|
||||||
|
[$replica get k] eq 2
|
||||||
|
} else {
|
||||||
|
fail "Master exited before replica could catch up."
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check shutdown log messages on master
|
||||||
|
wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 500
|
||||||
|
assert_equal 0 [count_log_message -1 "*Lagging replica*"]
|
||||||
|
verify_log_message -1 "*1 of 1 replicas are in sync*" 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} {} {repl external:skip}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Shutting down master waits for replica timeout} {
|
||||||
|
start_server {} {
|
||||||
|
start_server {} {
|
||||||
|
set master [srv -1 client]
|
||||||
|
set master_host [srv -1 host]
|
||||||
|
set master_port [srv -1 port]
|
||||||
|
set master_pid [srv -1 pid]
|
||||||
|
set replica [srv 0 client]
|
||||||
|
set replica_pid [srv 0 pid]
|
||||||
|
|
||||||
|
# Config master.
|
||||||
|
$master config set shutdown-timeout 1; # second
|
||||||
|
|
||||||
|
# Config replica.
|
||||||
|
$replica replicaof $master_host $master_port
|
||||||
|
wait_for_sync $replica
|
||||||
|
|
||||||
|
# Preparation: Set k to 1 on both master and replica.
|
||||||
|
$master set k 1
|
||||||
|
wait_for_ofs_sync $master $replica
|
||||||
|
|
||||||
|
# Pause the replica.
|
||||||
|
exec kill -SIGSTOP $replica_pid
|
||||||
|
after 10
|
||||||
|
|
||||||
|
# Fill up the OS socket send buffer for the replica connection to
|
||||||
|
# prevent the following INCR k from reaching the replica via the OS.
|
||||||
|
fill_up_os_socket_send_buffer_for_repl -1
|
||||||
|
|
||||||
|
# Incr k and immediately shutdown master.
|
||||||
|
$master incr k
|
||||||
|
exec kill -SIGTERM $master_pid
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[s -1 shutdown_in_milliseconds] > 0
|
||||||
|
} else {
|
||||||
|
fail "Master not indicating ongoing shutdown."
|
||||||
|
}
|
||||||
|
|
||||||
|
# Let master finish shutting down and check log.
|
||||||
|
wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 100
|
||||||
|
verify_log_message -1 "*Lagging replica*" 0
|
||||||
|
verify_log_message -1 "*0 of 1 replicas are in sync*" 0
|
||||||
|
|
||||||
|
# Wake up replica.
|
||||||
|
exec kill -SIGCONT $replica_pid
|
||||||
|
assert_equal 1 [$replica get k]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} {} {repl external:skip}
|
||||||
|
|
||||||
|
test "Shutting down master waits for replica then fails" {
|
||||||
|
start_server {} {
|
||||||
|
start_server {} {
|
||||||
|
set master [srv -1 client]
|
||||||
|
set master_host [srv -1 host]
|
||||||
|
set master_port [srv -1 port]
|
||||||
|
set master_pid [srv -1 pid]
|
||||||
|
set replica [srv 0 client]
|
||||||
|
set replica_pid [srv 0 pid]
|
||||||
|
|
||||||
|
# Config master and replica.
|
||||||
|
$replica replicaof $master_host $master_port
|
||||||
|
wait_for_sync $replica
|
||||||
|
|
||||||
|
# Pause the replica and write a key on master.
|
||||||
|
exec kill -SIGSTOP $replica_pid
|
||||||
|
after 10
|
||||||
|
$master incr k
|
||||||
|
|
||||||
|
# Two clients call blocking SHUTDOWN in parallel.
|
||||||
|
set rd1 [redis_deferring_client -1]
|
||||||
|
set rd2 [redis_deferring_client -1]
|
||||||
|
$rd1 shutdown
|
||||||
|
$rd2 shutdown
|
||||||
|
set info_clients [$master info clients]
|
||||||
|
assert_match "*connected_clients:3*" $info_clients
|
||||||
|
assert_match "*blocked_clients:2*" $info_clients
|
||||||
|
|
||||||
|
# Start a very slow initial AOFRW, which will prevent shutdown.
|
||||||
|
$master config set rdb-key-save-delay 30000000; # 30 seconds
|
||||||
|
$master config set appendonly yes
|
||||||
|
|
||||||
|
# Wake up replica, causing master to continue shutting down.
|
||||||
|
exec kill -SIGCONT $replica_pid
|
||||||
|
|
||||||
|
# SHUTDOWN returns an error to both clients blocking on SHUTDOWN.
|
||||||
|
catch { $rd1 read } e1
|
||||||
|
catch { $rd2 read } e2
|
||||||
|
assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1
|
||||||
|
assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2
|
||||||
|
$rd1 close
|
||||||
|
$rd2 close
|
||||||
|
|
||||||
|
# Check shutdown log messages on master.
|
||||||
|
verify_log_message -1 "*1 of 1 replicas are in sync*" 0
|
||||||
|
verify_log_message -1 "*Writing initial AOF, can't exit*" 0
|
||||||
|
verify_log_message -1 "*Errors trying to shut down*" 0
|
||||||
|
|
||||||
|
# Let master to exit fast, without waiting for the very slow AOFRW.
|
||||||
|
catch {$master shutdown nosave force}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} {} {repl external:skip}
|
||||||
|
|
||||||
|
test "Shutting down master waits for replica then aborted" {
|
||||||
|
start_server {} {
|
||||||
|
start_server {} {
|
||||||
|
set master [srv -1 client]
|
||||||
|
set master_host [srv -1 host]
|
||||||
|
set master_port [srv -1 port]
|
||||||
|
set master_pid [srv -1 pid]
|
||||||
|
set replica [srv 0 client]
|
||||||
|
set replica_pid [srv 0 pid]
|
||||||
|
|
||||||
|
# Config master and replica.
|
||||||
|
$replica replicaof $master_host $master_port
|
||||||
|
wait_for_sync $replica
|
||||||
|
|
||||||
|
# Pause the replica and write a key on master.
|
||||||
|
exec kill -SIGSTOP $replica_pid
|
||||||
|
after 10
|
||||||
|
$master incr k
|
||||||
|
|
||||||
|
# Two clients call blocking SHUTDOWN in parallel.
|
||||||
|
set rd1 [redis_deferring_client -1]
|
||||||
|
set rd2 [redis_deferring_client -1]
|
||||||
|
$rd1 shutdown
|
||||||
|
$rd2 shutdown
|
||||||
|
set info_clients [$master info clients]
|
||||||
|
assert_match "*connected_clients:3*" $info_clients
|
||||||
|
assert_match "*blocked_clients:2*" $info_clients
|
||||||
|
|
||||||
|
# Abort the shutdown
|
||||||
|
$master shutdown abort
|
||||||
|
|
||||||
|
# Wake up replica, causing master to continue shutting down.
|
||||||
|
exec kill -SIGCONT $replica_pid
|
||||||
|
|
||||||
|
# SHUTDOWN returns an error to both clients blocking on SHUTDOWN.
|
||||||
|
catch { $rd1 read } e1
|
||||||
|
catch { $rd2 read } e2
|
||||||
|
assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1
|
||||||
|
assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2
|
||||||
|
$rd1 close
|
||||||
|
$rd2 close
|
||||||
|
|
||||||
|
# Check shutdown log messages on master.
|
||||||
|
verify_log_message -1 "*Shutdown manually aborted*" 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} {} {repl external:skip}
|
@ -674,8 +674,12 @@ proc start_server {options {code undefined}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
proc restart_server {level wait_ready rotate_logs {reconnect 1}} {
|
proc restart_server {level wait_ready rotate_logs {reconnect 1} {shutdown sigterm}} {
|
||||||
set srv [lindex $::servers end+$level]
|
set srv [lindex $::servers end+$level]
|
||||||
|
if {$shutdown ne {sigterm}} {
|
||||||
|
catch {[dict get $srv "client"] shutdown $shutdown}
|
||||||
|
}
|
||||||
|
# Kill server doesn't mind if the server is already dead
|
||||||
kill_server $srv
|
kill_server $srv
|
||||||
# Remove the default client from the server
|
# Remove the default client from the server
|
||||||
dict unset srv "client"
|
dict unset srv "client"
|
||||||
|
@ -591,8 +591,11 @@ proc stop_bg_complex_data {handle} {
|
|||||||
catch {exec /bin/kill -9 $handle}
|
catch {exec /bin/kill -9 $handle}
|
||||||
}
|
}
|
||||||
|
|
||||||
proc populate {num {prefix key:} {size 3}} {
|
# Write num keys with the given key prefix and value size (in bytes). If idx is
|
||||||
set rd [redis_deferring_client]
|
# given, it's the index (AKA level) used with the srv procedure and it specifies
|
||||||
|
# to which Redis instance to write the keys.
|
||||||
|
proc populate {num {prefix key:} {size 3} {idx 0}} {
|
||||||
|
set rd [redis_deferring_client $idx]
|
||||||
for {set j 0} {$j < $num} {incr j} {
|
for {set j 0} {$j < $num} {incr j} {
|
||||||
$rd set $prefix$j [string repeat A $size]
|
$rd set $prefix$j [string repeat A $size]
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,7 @@ set ::all_tests {
|
|||||||
integration/replication-4
|
integration/replication-4
|
||||||
integration/replication-psync
|
integration/replication-psync
|
||||||
integration/replication-buffer
|
integration/replication-buffer
|
||||||
|
integration/shutdown
|
||||||
integration/aof
|
integration/aof
|
||||||
integration/rdb
|
integration/rdb
|
||||||
integration/corrupt-dump
|
integration/corrupt-dump
|
||||||
|
@ -26,6 +26,17 @@ start_server {tags {"shutdown external:skip"}} {
|
|||||||
}
|
}
|
||||||
|
|
||||||
start_server {tags {"shutdown external:skip"}} {
|
start_server {tags {"shutdown external:skip"}} {
|
||||||
|
test {SHUTDOWN ABORT can cancel SIGTERM} {
|
||||||
|
r debug pause-cron 1
|
||||||
|
set pid [s process_id]
|
||||||
|
exec kill -SIGTERM $pid
|
||||||
|
after 10; # Give signal handler some time to run
|
||||||
|
r shutdown abort
|
||||||
|
verify_log_message 0 "*Shutdown manually aborted*" 0
|
||||||
|
r debug pause-cron 0
|
||||||
|
r ping
|
||||||
|
} {PONG}
|
||||||
|
|
||||||
test {Temp rdb will be deleted in signal handle} {
|
test {Temp rdb will be deleted in signal handle} {
|
||||||
for {set i 0} {$i < 20} {incr i} {
|
for {set i 0} {$i < 20} {incr i} {
|
||||||
r set $i $i
|
r set $i $i
|
||||||
|
Loading…
x
Reference in New Issue
Block a user