Add support for client pause WRITE (#8170)

Implementation of client pause WRITE and client unpause
This commit is contained in:
Madelyn Olson 2021-01-07 23:36:54 -08:00 committed by GitHub
parent b02780c41d
commit 47579bdf5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 435 additions and 107 deletions

View File

@ -89,6 +89,12 @@ void blockClient(client *c, int btype) {
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
if (btype == BLOCKED_PAUSE) {
listAddNodeTail(server.paused_clients, c);
c->paused_list_node = listLast(server.paused_clients);
/* Mark this client to execute its command */
c->flags |= CLIENT_PENDING_COMMAND;
}
}
/* This function is called in the beforeSleep() function of the event loop
@ -110,6 +116,11 @@ void processUnblockedClients(void) {
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
/* Then process client if it has more data in it's buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
@ -154,6 +165,9 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->btype == BLOCKED_PAUSE) {
listDelNode(server.paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
} else {
serverPanic("Unknown btype in unblockClient().");
}

View File

@ -2164,7 +2164,7 @@ int clusterProcessPacket(clusterLink *link) {
resetManualFailover();
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE);
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
/* We need to send a ping message to the replica, as it would carry
@ -3421,9 +3421,8 @@ void clusterHandleSlaveMigration(int max_slaves) {
* The function can be used both to initialize the manual failover state at
* startup or to abort a manual failover in progress. */
void resetManualFailover(void) {
if (server.cluster->mf_end && clientsArePaused()) {
server.clients_pause_end_time = 0;
clientsArePaused(); /* Just use the side effect of the function. */
if (server.cluster->mf_end) {
checkClientPauseTimeoutAndReturnIfPaused();
}
server.cluster->mf_end = 0; /* No manual failover in progress. */
server.cluster->mf_can_start = 0;

View File

@ -1520,6 +1520,12 @@ int expireIfNeeded(redisDb *db, robj *key) {
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;
/* If clients are paused, we keep the current dataset constant,
* but return to the client what we believe is the right state. Typically,
* at the end of the pause we will properly expire the key OR we will
* have failed over and the new primary will send us the expire. */
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);

View File

@ -462,7 +462,7 @@ static int isSafeToPerformEvictions(void) {
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return 0;
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
return 1;
}

View File

@ -148,7 +148,7 @@ void activeExpireCycle(int type) {
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return;
if (checkClientPauseTimeoutAndReturnIfPaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit

View File

@ -747,6 +747,7 @@ int64_t commandFlagsFromString(char *s) {
else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG;
else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH;
else if (!strcasecmp(t,"may-replicate")) flags |= CMD_MAY_REPLICATE;
else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
else break;
@ -813,6 +814,8 @@ int64_t commandFlagsFromString(char *s) {
* * **"no-auth"**: This command can be run by an un-authenticated client.
* Normally this is used by a command that is used
* to authenticate a client.
* * **"may-replicate"**: This command may generate replication traffic, even
* though it's not a write command.
*/
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
@ -2042,7 +2045,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
* periodically in timer callbacks or other periodic callbacks.
*/
int RM_AvoidReplicaTraffic() {
return clientsArePaused();
return checkClientPauseTimeoutAndReturnIfPaused();
}
/* Change the currently selected DB. Returns an error if the id

View File

@ -178,6 +178,7 @@ client *createClient(connection *conn) {
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
c->paused_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->client_cron_last_memory_usage = 0;
@ -1960,7 +1961,7 @@ void commandProcessed(client *c) {
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
(c->btype != BLOCKED_MODULE && c->btype != BLOCKED_PAUSE))
{
resetClient(c);
}
@ -2003,6 +2004,20 @@ int processCommandAndResetClient(client *c) {
return deadclient ? C_ERR : C_OK;
}
/* This function will execute any fully parsed commands pending on
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
int processPendingCommandsAndResetClient(client *c) {
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
return C_OK;
}
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
@ -2010,9 +2025,6 @@ int processCommandAndResetClient(client *c) {
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
@ -2436,8 +2448,10 @@ void clientCommand(client *c) {
" Return information about client connections. Options:",
" * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)",
" Return clients of specified type.",
"PAUSE <timeout>",
" Suspend all clients for <timout> milliseconds.",
"UNPAUSE",
" Stop the current client pause, resuming traffic.",
"PAUSE <timeout> [WRITE|ALL]",
" Suspend all, or just write, clients for <timout> milliseconds.",
"REPLY (ON|OFF|SKIP)",
" Control the replies sent to the current connection.",
"SETNAME <name>",
@ -2653,13 +2667,31 @@ NULL
addReplyBulk(c,c->name);
else
addReplyNull(c);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
/* CLIENT PAUSE */
} else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
/* CLIENT UNPAUSE */
unpauseClients();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
c->argc == 4))
{
/* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
long long duration;
int type = CLIENT_PAUSE_ALL;
if (c->argc == 4) {
if (!strcasecmp(c->argv[3]->ptr,"write")) {
type = CLIENT_PAUSE_WRITE;
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
type = CLIENT_PAUSE_ALL;
} else {
addReplyError(c,
"CLIENT PAUSE mode must be WRITE or ALL");
return;
}
}
if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration);
UNIT_MILLISECONDS) != C_OK) return;
pauseClients(duration, type);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
@ -3209,54 +3241,66 @@ void flushSlavesOutputBuffers(void) {
}
}
/* Pause clients up to the specified unixtime (in ms). While clients
* are paused no command is processed from clients, so the data set can't
* change during that time.
*
* However while this function pauses normal and Pub/Sub clients, slaves are
* still served, so this function can be used on server upgrades where it is
* required that slaves process the latest bytes from the replication stream
* before being turned to masters.
/* Pause clients up to the specified unixtime (in ms) for a given type of
* commands.
*
* A main use case of this function is to allow pausing replication traffic
* so that a failover without data loss to occur. Replicas will continue to receive
* traffic to faciliate this functionality.
*
* This function is also internally used by Redis Cluster for the manual
* failover procedure implemented by CLUSTER FAILOVER.
*
* The function always succeed, even if there is already a pause in progress.
* In such a case, the pause is extended if the duration is more than the
* time left for the previous duration. However if the duration is smaller
* than the time left for the previous pause, no change is made to the
* left duration. */
void pauseClients(mstime_t end) {
if (!server.clients_paused || end > server.clients_pause_end_time)
server.clients_pause_end_time = end;
server.clients_paused = 1;
* In such a case, the duration is set to the maximum and new end time and the
* type is set to the more restrictive type of pause. */
void pauseClients(mstime_t end, pause_type type) {
if (type > server.client_pause_type) {
server.client_pause_type = type;
}
if (end > server.client_pause_end_time) {
server.client_pause_end_time = end;
}
/* We allow write commands that were queued
* up before and after to execute. We need
* to track this state so that we don't assert
* in propagate(). */
if (server.in_exec) {
server.client_pause_in_transaction = 1;
}
}
/* Return non-zero if clients are currently paused. As a side effect the
* function checks if the pause time was reached and clear it. */
int clientsArePaused(void) {
if (server.clients_paused &&
server.clients_pause_end_time < server.mstime)
{
listNode *ln;
listIter li;
client *c;
/* Unpause clients and queue them for reprocessing. */
void unpauseClients(void) {
listNode *ln;
listIter li;
client *c;
server.client_pause_type = CLIENT_PAUSE_OFF;
server.clients_paused = 0;
/* Put all the clients in the unblocked clients queue in order to
* force the re-processing of the input buffer if any. */
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
/* Don't touch slaves and blocked clients.
* The latter pending requests will be processed when unblocked. */
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
queueClientForReprocessing(c);
}
/* Unblock all of the clients so they are reprocessed. */
listRewind(server.paused_clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
unblockClient(c);
}
return server.clients_paused;
}
/* Returns true if clients are paused and false otherwise. */
int areClientsPaused(void) {
return server.client_pause_type != CLIENT_PAUSE_OFF;
}
/* Checks if the current client pause has elapsed and unpause clients
* if it has. Also returns true if clients are now paused and false
* otherwise. */
int checkClientPauseTimeoutAndReturnIfPaused(void) {
if (server.client_pause_end_time < server.mstime) {
unpauseClients();
}
return areClientsPaused();
}
/* This function is called by Redis in order to process a few events from
@ -3634,15 +3678,13 @@ int handleClientsWithPendingReadsUsingThreads(void) {
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
processInputBuffer(c);
/* We may have pending replies if a thread readQueryFromClient() produced

View File

@ -3237,7 +3237,7 @@ void replicationCron(void) {
int manual_failover_in_progress =
server.cluster_enabled &&
server.cluster->mf_end &&
clientsArePaused();
checkClientPauseTimeoutAndReturnIfPaused();
if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING",4);

View File

@ -163,6 +163,13 @@ struct redisServer server; /* Server global state */
* delay its execution as long as the kernel scheduler is giving
* us time. Note that commands that may trigger a DEL as a side
* effect (like SET) are not fast commands.
*
* may-replicate: Command may produce replication traffic, but should be
* allowed under circumstances where write commands are disallowed.
* Examples include PUBLISH, which replicates pubsub messages,and
* EVAL, which may execute write commands, which are replicated,
* or may just execute read commands. A command can not be marked
* both "write" and "may-replicate"
*
* The following additional flags are only used in order to put commands
* in a specific ACL category. Commands can have multiple ACL categories.
@ -818,7 +825,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"publish",publishCommand,3,
"pub-sub ok-loading ok-stale fast",
"pub-sub ok-loading ok-stale fast may-replicate",
0,NULL,0,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,
@ -884,11 +891,11 @@ struct redisCommand redisCommandTable[] = {
/* EVAL can modify the dataset, however it is not flagged as a write
* command since we do the check while running commands from Lua. */
{"eval",evalCommand,-3,
"no-script @scripting",
"no-script may-replicate @scripting",
0,evalGetKeys,0,0,0,0,0,0},
{"evalsha",evalShaCommand,-3,
"no-script @scripting",
"no-script may-replicate @scripting",
0,evalGetKeys,0,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,
@ -896,7 +903,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"script",scriptCommand,-2,
"no-script @scripting",
"no-script may-replicate @scripting",
0,NULL,0,0,0,0,0,0},
{"time",timeCommand,1,
@ -977,7 +984,7 @@ struct redisCommand redisCommandTable[] = {
* we claim that the representation, even if accessible, is an internal
* affair, and the command is semantically read only. */
{"pfcount",pfcountCommand,-2,
"read-only @hyperloglog",
"read-only may-replicate @hyperloglog",
0,NULL,1,-1,1,0,0,0},
{"pfmerge",pfmergeCommand,-2,
@ -2163,8 +2170,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/
/* Clear the paused clients state if needed. */
checkClientPauseTimeoutAndReturnIfPaused();
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
@ -2361,8 +2368,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* during the previous event loop iteration. Note that we do this after
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
* a server cron cycle in absence of other event loop events. See #6623. */
if (server.get_ack_from_slaves) {
* a server cron cycle in absence of other event loop events. See #6623.
*
* We also don't send the ACKs while clients are paused, since it can
* increment the replication backlog, they'll be sent after the pause
* if we are still the master. */
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
@ -3040,7 +3051,8 @@ void initServer(void) {
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
server.client_pause_type = 0;
server.paused_clients = listCreate();
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
server.blocked_last_cron = 0;
@ -3114,6 +3126,7 @@ void initServer(void) {
server.in_eval = 0;
server.in_exec = 0;
server.propagate_in_transaction = 0;
server.client_pause_in_transaction = 0;
server.child_pid = -1;
server.child_type = CHILD_TYPE_NONE;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
@ -3280,6 +3293,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) {
c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
} else if (!strcasecmp(flag,"no-auth")) {
c->flags |= CMD_NO_AUTH;
} else if (!strcasecmp(flag,"may-replicate")) {
c->flags |= CMD_MAY_REPLICATE;
} else {
/* Parse ACL categories here if the flag name starts with @. */
uint64_t catflag;
@ -3439,6 +3454,10 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
if (server.in_exec && !server.propagate_in_transaction)
execCommandPropagateMulti(dbid);
/* This needs to be unreachable since the dataset should be fixed during
* client pause, otherwise data may be lossed during a failover. */
serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
@ -3477,6 +3496,7 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
* Redis command implementation in order to to force the propagation of a
* specific command execution into AOF / Replication. */
void forceCommandPropagation(client *c, int flags) {
serverAssert(c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE));
if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
}
@ -3701,6 +3721,12 @@ void call(client *c, int flags) {
}
server.also_propagate = prev_also_propagate;
/* Client pause takes effect after a transaction has finished. This needs
* to be located after everything is propagated. */
if (!server.in_exec && server.client_pause_in_transaction) {
server.client_pause_in_transaction = 0;
}
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) {
@ -3814,6 +3840,8 @@ int processCommand(client *c) {
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
int is_may_replicate_command = (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
@ -4012,6 +4040,17 @@ int processCommand(client *c) {
return C_OK;
}
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) &&
((server.client_pause_type & CLIENT_PAUSE_ALL) ||
(server.client_pause_type & CLIENT_PAUSE_WRITE && is_may_replicate_command)))
{
c->bpop.timeout = 0;
blockClient(c,BLOCKED_PAUSE);
return C_OK;
}
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
@ -4266,6 +4305,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) {
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth");
flagcount += addReplyCommandFlag(c,cmd,CMD_MAY_REPLICATE, "may_replicate");
if (cmdHasMovableKeys(cmd)) {
addReplyStatus(c, "movablekeys");
flagcount += 1;

View File

@ -181,33 +181,34 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CMD_ASKING (1ULL<<13) /* "cluster-asking" flag */
#define CMD_FAST (1ULL<<14) /* "fast" flag */
#define CMD_NO_AUTH (1ULL<<15) /* "no-auth" flag */
#define CMD_MAY_REPLICATE (1ULL<<16) /* "may-replicate" flag */
/* Command flags used by the module system. */
#define CMD_MODULE_GETKEYS (1ULL<<16) /* Use the modules getkeys interface. */
#define CMD_MODULE_NO_CLUSTER (1ULL<<17) /* Deny on Redis Cluster. */
#define CMD_MODULE_GETKEYS (1ULL<<17) /* Use the modules getkeys interface. */
#define CMD_MODULE_NO_CLUSTER (1ULL<<18) /* Deny on Redis Cluster. */
/* Command flags that describe ACLs categories. */
#define CMD_CATEGORY_KEYSPACE (1ULL<<18)
#define CMD_CATEGORY_READ (1ULL<<19)
#define CMD_CATEGORY_WRITE (1ULL<<20)
#define CMD_CATEGORY_SET (1ULL<<21)
#define CMD_CATEGORY_SORTEDSET (1ULL<<22)
#define CMD_CATEGORY_LIST (1ULL<<23)
#define CMD_CATEGORY_HASH (1ULL<<24)
#define CMD_CATEGORY_STRING (1ULL<<25)
#define CMD_CATEGORY_BITMAP (1ULL<<26)
#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<27)
#define CMD_CATEGORY_GEO (1ULL<<28)
#define CMD_CATEGORY_STREAM (1ULL<<29)
#define CMD_CATEGORY_PUBSUB (1ULL<<30)
#define CMD_CATEGORY_ADMIN (1ULL<<31)
#define CMD_CATEGORY_FAST (1ULL<<32)
#define CMD_CATEGORY_SLOW (1ULL<<33)
#define CMD_CATEGORY_BLOCKING (1ULL<<34)
#define CMD_CATEGORY_DANGEROUS (1ULL<<35)
#define CMD_CATEGORY_CONNECTION (1ULL<<36)
#define CMD_CATEGORY_TRANSACTION (1ULL<<37)
#define CMD_CATEGORY_SCRIPTING (1ULL<<38)
#define CMD_CATEGORY_KEYSPACE (1ULL<<19)
#define CMD_CATEGORY_READ (1ULL<<20)
#define CMD_CATEGORY_WRITE (1ULL<<21)
#define CMD_CATEGORY_SET (1ULL<<22)
#define CMD_CATEGORY_SORTEDSET (1ULL<<23)
#define CMD_CATEGORY_LIST (1ULL<<24)
#define CMD_CATEGORY_HASH (1ULL<<25)
#define CMD_CATEGORY_STRING (1ULL<<26)
#define CMD_CATEGORY_BITMAP (1ULL<<27)
#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<28)
#define CMD_CATEGORY_GEO (1ULL<<29)
#define CMD_CATEGORY_STREAM (1ULL<<30)
#define CMD_CATEGORY_PUBSUB (1ULL<<31)
#define CMD_CATEGORY_ADMIN (1ULL<<32)
#define CMD_CATEGORY_FAST (1ULL<<33)
#define CMD_CATEGORY_SLOW (1ULL<<34)
#define CMD_CATEGORY_BLOCKING (1ULL<<35)
#define CMD_CATEGORY_DANGEROUS (1ULL<<36)
#define CMD_CATEGORY_CONNECTION (1ULL<<37)
#define CMD_CATEGORY_TRANSACTION (1ULL<<38)
#define CMD_CATEGORY_SCRIPTING (1ULL<<39)
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
@ -250,10 +251,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
in the list of clients we can read
from. */
#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after
we return single threaded that the
client has already pending commands
to be executed. */
#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully
* parsed command ready for execution. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
@ -280,7 +279,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_ZSET 5 /* BZPOP et al. */
#define BLOCKED_NUM 6 /* Number of blocked states. */
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
#define BLOCKED_NUM 7 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@ -436,6 +436,14 @@ typedef enum {
#define PROPAGATE_AOF 1
#define PROPAGATE_REPL 2
/* Client pause types, larger types are more restrictive
* pause types than smaller pause types. */
typedef enum {
CLIENT_PAUSE_OFF = 0, /* Pause no commands */
CLIENT_PAUSE_WRITE, /* Pause write commands */
CLIENT_PAUSE_ALL /* Pause all commands */
} pause_type;
/* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
@ -893,6 +901,7 @@ typedef struct client {
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *paused_list_node; /* list node within the pause list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
@ -1137,6 +1146,7 @@ struct redisServer {
int in_exec; /* Are we inside EXEC? */
int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */
char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
/* Modules */
dict *moduleapi; /* Exported core APIs dictionary for modules. */
dict *sharedapi; /* Like moduleapi but containing the APIs that
@ -1171,8 +1181,9 @@ struct redisServer {
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
pause_type client_pause_type; /* True if clients are currently paused */
list *paused_clients; /* List of pause clients */
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
@ -1794,8 +1805,10 @@ char *getClientTypeName(int class);
void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count);
void pauseClients(mstime_t duration);
int clientsArePaused(void);
void pauseClients(mstime_t duration, pause_type type);
void unpauseClients(void);
int areClientsPaused(void);
int checkClientPauseTimeoutAndReturnIfPaused(void);
void processEventsWhileBlocked(void);
void loadingCron(void);
void whileBlockedCron();
@ -2111,6 +2124,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
size_t freeMemoryGetNotCountedMemory();
int overMaxmemoryAfterAlloc(size_t moremem);
int processCommand(client *c);
int processPendingCommandsAndResetClient(client *c);
void setupSignalHandlers(void);
void removeSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);

200
tests/unit/pause.tcl Normal file
View File

@ -0,0 +1,200 @@
start_server {tags {"pause"}} {
test "Test read commands are not blocked by client pause" {
r client PAUSE 100000000 WRITE
set rd [redis_deferring_client]
$rd GET FOO
$rd PING
$rd INFO
assert_equal [s 0 blocked_clients] 0
r client unpause
$rd close
}
test "Test write commands are paused by RO" {
r client PAUSE 100000000 WRITE
set rd [redis_deferring_client]
$rd SET FOO BAR
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
r client unpause
assert_match "OK" [$rd read]
$rd close
}
test "Test special commands are paused by RO" {
r PFADD pause-hll test
r client PAUSE 100000000 WRITE
# Test that pfcount, which can replicate, is also blocked
set rd [redis_deferring_client]
$rd PFCOUNT pause-hll
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
# Test that publish, which adds the message to the replication
# stream is blocked.
set rd2 [redis_deferring_client]
$rd2 publish foo bar
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {2}
} else {
fail "Clients are not blocked"
}
# Test that SCRIPT LOAD, which is replicated.
set rd3 [redis_deferring_client]
$rd3 script load "return 1"
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {3}
} else {
fail "Clients are not blocked"
}
r client unpause
assert_match "1" [$rd read]
assert_match "0" [$rd2 read]
assert_match "*" [$rd3 read]
$rd close
$rd2 close
$rd3 close
}
test "Test read/admin mutli-execs are not blocked by pause RO" {
r SET FOO BAR
r client PAUSE 100000000 WRITE
set rd [redis_deferring_client]
$rd MULTI
assert_equal [$rd read] "OK"
$rd PING
assert_equal [$rd read] "QUEUED"
$rd GET FOO
assert_equal [$rd read] "QUEUED"
$rd EXEC
assert_equal [s 0 blocked_clients] 0
r client unpause
assert_match "PONG BAR" [$rd read]
$rd close
}
test "Test write mutli-execs are blocked by pause RO" {
set rd [redis_deferring_client]
$rd MULTI
assert_equal [$rd read] "OK"
$rd SET FOO BAR
r client PAUSE 100000000 WRITE
assert_equal [$rd read] "QUEUED"
$rd EXEC
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
r client unpause
assert_match "OK" [$rd read]
$rd close
}
test "Test scripts are blocked by pause RO" {
r client PAUSE 100000000 WRITE
set rd [redis_deferring_client]
$rd EVAL "return 1" 0
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
r client unpause
assert_match "1" [$rd read]
$rd close
}
test "Test multiple clients can be queued up and unblocked" {
r client PAUSE 100000000 WRITE
set clients [list [redis_deferring_client] [redis_deferring_client] [redis_deferring_client]]
foreach client $clients {
$client SET FOO BAR
}
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {3}
} else {
fail "Clients are not blocked"
}
r client unpause
foreach client $clients {
assert_match "OK" [$client read]
$client close
}
}
test "Test clients with syntax errors will get responses immediately" {
r client PAUSE 100000000 WRITE
catch {r set FOO} err
assert_match "ERR wrong number of arguments for *" $err
r client unpause
}
test "Test both active and passive expiries are skipped during client pause" {
set expired_keys [s 0 expired_keys]
r multi
r set foo bar PX 10
r set bar foo PX 10
r client PAUSE 100000000 WRITE
r exec
wait_for_condition 10 100 {
[r get foo] == {} && [r get bar] == {}
} else {
fail "Keys were never logically expired"
}
# No keys should actually have been expired
assert_match $expired_keys [s 0 expired_keys]
r client unpause
# Force the keys to expire
r get foo
r get bar
# Now that clients have been unpaused, expires should go through
assert_match [expr $expired_keys + 2] [s 0 expired_keys]
}
test "Test that client pause starts at the end of a transaction" {
r MULTI
r SET FOO1 BAR
r client PAUSE 100000000 WRITE
r SET FOO2 BAR
r exec
set rd [redis_deferring_client]
$rd SET FOO3 BAR
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
assert_match "BAR" [r GET FOO1]
assert_match "BAR" [r GET FOO2]
assert_match "" [r GET FOO3]
r client unpause
assert_match "OK" [$rd read]
$rd close
}
# Make sure we unpause at the end
r client unpause
}

View File

@ -42,4 +42,14 @@ start_server {} {
$master incr foo
assert {[$master wait 1 3000] == 0}
}
test {WAIT implicitly blocks on client pause since ACKs aren't sent} {
$master multi
$master incr foo
$master client pause 10000 write
$master exec
assert {[$master wait 1 1000] == 0}
$master client unpause
assert {[$master wait 1 1000] == 1}
}
}}