From 47579bdf5cccfbac1cf2b6e7e52218c5b9faed86 Mon Sep 17 00:00:00 2001 From: Madelyn Olson <34459052+madolson@users.noreply.github.com> Date: Thu, 7 Jan 2021 23:36:54 -0800 Subject: [PATCH] Add support for client pause WRITE (#8170) Implementation of client pause WRITE and client unpause --- src/blocked.c | 14 +++ src/cluster.c | 7 +- src/db.c | 6 ++ src/evict.c | 2 +- src/expire.c | 2 +- src/module.c | 5 +- src/networking.c | 156 +++++++++++++++++++++------------ src/replication.c | 2 +- src/server.c | 60 ++++++++++--- src/server.h | 78 ++++++++++------- tests/unit/pause.tcl | 200 +++++++++++++++++++++++++++++++++++++++++++ tests/unit/wait.tcl | 10 +++ 12 files changed, 435 insertions(+), 107 deletions(-) create mode 100644 tests/unit/pause.tcl diff --git a/src/blocked.c b/src/blocked.c index e3f7b74d6..250e4bd98 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -89,6 +89,12 @@ void blockClient(client *c, int btype) { server.blocked_clients++; server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); + if (btype == BLOCKED_PAUSE) { + listAddNodeTail(server.paused_clients, c); + c->paused_list_node = listLast(server.paused_clients); + /* Mark this client to execute its command */ + c->flags |= CLIENT_PENDING_COMMAND; + } } /* This function is called in the beforeSleep() function of the event loop @@ -110,6 +116,11 @@ void processUnblockedClients(void) { * client is not blocked before to proceed, but things may change and * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { + /* If we have a queued command, execute it now. */ + if (processPendingCommandsAndResetClient(c) == C_ERR) { + continue; + } + /* Then process client if it has more data in it's buffer. */ if (c->querybuf && sdslen(c->querybuf) > 0) { processInputBuffer(c); } @@ -154,6 +165,9 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); + } else if (c->btype == BLOCKED_PAUSE) { + listDelNode(server.paused_clients,c->paused_list_node); + c->paused_list_node = NULL; } else { serverPanic("Unknown btype in unblockClient()."); } diff --git a/src/cluster.c b/src/cluster.c index d9f1a66d7..c327334ee 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2164,7 +2164,7 @@ int clusterProcessPacket(clusterLink *link) { resetManualFailover(); server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); + pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); /* We need to send a ping message to the replica, as it would carry @@ -3421,9 +3421,8 @@ void clusterHandleSlaveMigration(int max_slaves) { * The function can be used both to initialize the manual failover state at * startup or to abort a manual failover in progress. */ void resetManualFailover(void) { - if (server.cluster->mf_end && clientsArePaused()) { - server.clients_pause_end_time = 0; - clientsArePaused(); /* Just use the side effect of the function. */ + if (server.cluster->mf_end) { + checkClientPauseTimeoutAndReturnIfPaused(); } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; diff --git a/src/db.c b/src/db.c index c4b48fb0e..5d63566a7 100644 --- a/src/db.c +++ b/src/db.c @@ -1520,6 +1520,12 @@ int expireIfNeeded(redisDb *db, robj *key) { * we think the key is expired at this time. */ if (server.masterhost != NULL) return 1; + /* If clients are paused, we keep the current dataset constant, + * but return to the client what we believe is the right state. Typically, + * at the end of the pause we will properly expire the key OR we will + * have failed over and the new primary will send us the expire. */ + if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; + /* Delete the key */ server.stat_expiredkeys++; propagateExpire(db,key,server.lazyfree_lazy_expire); diff --git a/src/evict.c b/src/evict.c index 3642352ad..04513cd1a 100644 --- a/src/evict.c +++ b/src/evict.c @@ -462,7 +462,7 @@ static int isSafeToPerformEvictions(void) { /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ - if (clientsArePaused()) return 0; + if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; return 1; } diff --git a/src/expire.c b/src/expire.c index 5433f46ca..275a735a7 100644 --- a/src/expire.c +++ b/src/expire.c @@ -148,7 +148,7 @@ void activeExpireCycle(int type) { /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ - if (clientsArePaused()) return; + if (checkClientPauseTimeoutAndReturnIfPaused()) return; if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit diff --git a/src/module.c b/src/module.c index c9e0b8812..bf186f8b7 100644 --- a/src/module.c +++ b/src/module.c @@ -747,6 +747,7 @@ int64_t commandFlagsFromString(char *s) { else if (!strcasecmp(t,"no-slowlog")) flags |= CMD_SKIP_SLOWLOG; else if (!strcasecmp(t,"fast")) flags |= CMD_FAST; else if (!strcasecmp(t,"no-auth")) flags |= CMD_NO_AUTH; + else if (!strcasecmp(t,"may-replicate")) flags |= CMD_MAY_REPLICATE; else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS; else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER; else break; @@ -813,6 +814,8 @@ int64_t commandFlagsFromString(char *s) { * * **"no-auth"**: This command can be run by an un-authenticated client. * Normally this is used by a command that is used * to authenticate a client. + * * **"may-replicate"**: This command may generate replication traffic, even + * though it's not a write command. */ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) { int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0; @@ -2042,7 +2045,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { * periodically in timer callbacks or other periodic callbacks. */ int RM_AvoidReplicaTraffic() { - return clientsArePaused(); + return checkClientPauseTimeoutAndReturnIfPaused(); } /* Change the currently selected DB. Returns an error if the id diff --git a/src/networking.c b/src/networking.c index e71e39624..9a86e6085 100644 --- a/src/networking.c +++ b/src/networking.c @@ -178,6 +178,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->sockname = NULL; c->client_list_node = NULL; + c->paused_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; c->client_cron_last_memory_usage = 0; @@ -1960,7 +1961,7 @@ void commandProcessed(client *c) { * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */ if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) + (c->btype != BLOCKED_MODULE && c->btype != BLOCKED_PAUSE)) { resetClient(c); } @@ -2003,6 +2004,20 @@ int processCommandAndResetClient(client *c) { return deadclient ? C_ERR : C_OK; } + +/* This function will execute any fully parsed commands pending on + * the client. Returns C_ERR if the client is no longer valid after executing + * the command, and C_OK for all other cases. */ +int processPendingCommandsAndResetClient(client *c) { + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~CLIENT_PENDING_COMMAND; + if (processCommandAndResetClient(c) == C_ERR) { + return C_ERR; + } + } + return C_OK; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be @@ -2010,9 +2025,6 @@ int processCommandAndResetClient(client *c) { void processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { - /* Return if clients are paused. */ - if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; - /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2436,8 +2448,10 @@ void clientCommand(client *c) { " Return information about client connections. Options:", " * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)", " Return clients of specified type.", -"PAUSE ", -" Suspend all clients for milliseconds.", +"UNPAUSE", +" Stop the current client pause, resuming traffic.", +"PAUSE [WRITE|ALL]", +" Suspend all, or just write, clients for milliseconds.", "REPLY (ON|OFF|SKIP)", " Control the replies sent to the current connection.", "SETNAME ", @@ -2653,13 +2667,31 @@ NULL addReplyBulk(c,c->name); else addReplyNull(c); - } else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) { - /* CLIENT PAUSE */ + } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) { + /* CLIENT UNPAUSE */ + unpauseClients(); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 || + c->argc == 4)) + { + /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */ long long duration; + int type = CLIENT_PAUSE_ALL; + if (c->argc == 4) { + if (!strcasecmp(c->argv[3]->ptr,"write")) { + type = CLIENT_PAUSE_WRITE; + } else if (!strcasecmp(c->argv[3]->ptr,"all")) { + type = CLIENT_PAUSE_ALL; + } else { + addReplyError(c, + "CLIENT PAUSE mode must be WRITE or ALL"); + return; + } + } if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration, - UNIT_MILLISECONDS) != C_OK) return; - pauseClients(duration); + UNIT_MILLISECONDS) != C_OK) return; + pauseClients(duration, type); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] @@ -3209,54 +3241,66 @@ void flushSlavesOutputBuffers(void) { } } -/* Pause clients up to the specified unixtime (in ms). While clients - * are paused no command is processed from clients, so the data set can't - * change during that time. - * - * However while this function pauses normal and Pub/Sub clients, slaves are - * still served, so this function can be used on server upgrades where it is - * required that slaves process the latest bytes from the replication stream - * before being turned to masters. +/* Pause clients up to the specified unixtime (in ms) for a given type of + * commands. * + * A main use case of this function is to allow pausing replication traffic + * so that a failover without data loss to occur. Replicas will continue to receive + * traffic to faciliate this functionality. + * * This function is also internally used by Redis Cluster for the manual * failover procedure implemented by CLUSTER FAILOVER. * * The function always succeed, even if there is already a pause in progress. - * In such a case, the pause is extended if the duration is more than the - * time left for the previous duration. However if the duration is smaller - * than the time left for the previous pause, no change is made to the - * left duration. */ -void pauseClients(mstime_t end) { - if (!server.clients_paused || end > server.clients_pause_end_time) - server.clients_pause_end_time = end; - server.clients_paused = 1; + * In such a case, the duration is set to the maximum and new end time and the + * type is set to the more restrictive type of pause. */ +void pauseClients(mstime_t end, pause_type type) { + if (type > server.client_pause_type) { + server.client_pause_type = type; + } + + if (end > server.client_pause_end_time) { + server.client_pause_end_time = end; + } + + /* We allow write commands that were queued + * up before and after to execute. We need + * to track this state so that we don't assert + * in propagate(). */ + if (server.in_exec) { + server.client_pause_in_transaction = 1; + } } -/* Return non-zero if clients are currently paused. As a side effect the - * function checks if the pause time was reached and clear it. */ -int clientsArePaused(void) { - if (server.clients_paused && - server.clients_pause_end_time < server.mstime) - { - listNode *ln; - listIter li; - client *c; +/* Unpause clients and queue them for reprocessing. */ +void unpauseClients(void) { + listNode *ln; + listIter li; + client *c; + + server.client_pause_type = CLIENT_PAUSE_OFF; - server.clients_paused = 0; - - /* Put all the clients in the unblocked clients queue in order to - * force the re-processing of the input buffer if any. */ - listRewind(server.clients,&li); - while ((ln = listNext(&li)) != NULL) { - c = listNodeValue(ln); - - /* Don't touch slaves and blocked clients. - * The latter pending requests will be processed when unblocked. */ - if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue; - queueClientForReprocessing(c); - } + /* Unblock all of the clients so they are reprocessed. */ + listRewind(server.paused_clients,&li); + while ((ln = listNext(&li)) != NULL) { + c = listNodeValue(ln); + unblockClient(c); } - return server.clients_paused; +} + +/* Returns true if clients are paused and false otherwise. */ +int areClientsPaused(void) { + return server.client_pause_type != CLIENT_PAUSE_OFF; +} + +/* Checks if the current client pause has elapsed and unpause clients + * if it has. Also returns true if clients are now paused and false + * otherwise. */ +int checkClientPauseTimeoutAndReturnIfPaused(void) { + if (server.client_pause_end_time < server.mstime) { + unpauseClients(); + } + return areClientsPaused(); } /* This function is called by Redis in order to process a few events from @@ -3634,15 +3678,13 @@ int handleClientsWithPendingReadsUsingThreads(void) { c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); - if (c->flags & CLIENT_PENDING_COMMAND) { - c->flags &= ~CLIENT_PENDING_COMMAND; - if (processCommandAndResetClient(c) == C_ERR) { - /* If the client is no longer valid, we avoid - * processing the client later. So we just go - * to the next. */ - continue; - } + if (processPendingCommandsAndResetClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid + * processing the client later. So we just go + * to the next. */ + continue; } + processInputBuffer(c); /* We may have pending replies if a thread readQueryFromClient() produced diff --git a/src/replication.c b/src/replication.c index 361bdb626..9fb19eaca 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3237,7 +3237,7 @@ void replicationCron(void) { int manual_failover_in_progress = server.cluster_enabled && server.cluster->mf_end && - clientsArePaused(); + checkClientPauseTimeoutAndReturnIfPaused(); if (!manual_failover_in_progress) { ping_argv[0] = createStringObject("PING",4); diff --git a/src/server.c b/src/server.c index faa67fccb..2f9fa4f39 100644 --- a/src/server.c +++ b/src/server.c @@ -163,6 +163,13 @@ struct redisServer server; /* Server global state */ * delay its execution as long as the kernel scheduler is giving * us time. Note that commands that may trigger a DEL as a side * effect (like SET) are not fast commands. + * + * may-replicate: Command may produce replication traffic, but should be + * allowed under circumstances where write commands are disallowed. + * Examples include PUBLISH, which replicates pubsub messages,and + * EVAL, which may execute write commands, which are replicated, + * or may just execute read commands. A command can not be marked + * both "write" and "may-replicate" * * The following additional flags are only used in order to put commands * in a specific ACL category. Commands can have multiple ACL categories. @@ -818,7 +825,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"publish",publishCommand,3, - "pub-sub ok-loading ok-stale fast", + "pub-sub ok-loading ok-stale fast may-replicate", 0,NULL,0,0,0,0,0,0}, {"pubsub",pubsubCommand,-2, @@ -884,11 +891,11 @@ struct redisCommand redisCommandTable[] = { /* EVAL can modify the dataset, however it is not flagged as a write * command since we do the check while running commands from Lua. */ {"eval",evalCommand,-3, - "no-script @scripting", + "no-script may-replicate @scripting", 0,evalGetKeys,0,0,0,0,0,0}, {"evalsha",evalShaCommand,-3, - "no-script @scripting", + "no-script may-replicate @scripting", 0,evalGetKeys,0,0,0,0,0,0}, {"slowlog",slowlogCommand,-2, @@ -896,7 +903,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"script",scriptCommand,-2, - "no-script @scripting", + "no-script may-replicate @scripting", 0,NULL,0,0,0,0,0,0}, {"time",timeCommand,1, @@ -977,7 +984,7 @@ struct redisCommand redisCommandTable[] = { * we claim that the representation, even if accessible, is an internal * affair, and the command is semantically read only. */ {"pfcount",pfcountCommand,-2, - "read-only @hyperloglog", + "read-only may-replicate @hyperloglog", 0,NULL,1,-1,1,0,0,0}, {"pfmerge",pfmergeCommand,-2, @@ -2163,8 +2170,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Clear the paused clients flag if needed. */ - clientsArePaused(); /* Don't check return value, just use the side effect.*/ + /* Clear the paused clients state if needed. */ + checkClientPauseTimeoutAndReturnIfPaused(); /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ @@ -2361,8 +2368,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { + * a server cron cycle in absence of other event loop events. See #6623. + * + * We also don't send the ACKs while clients are paused, since it can + * increment the replication backlog, they'll be sent after the pause + * if we are still the master. */ + if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) { robj *argv[3]; argv[0] = createStringObject("REPLCONF",8); @@ -3040,7 +3051,8 @@ void initServer(void) { server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; - server.clients_paused = 0; + server.client_pause_type = 0; + server.paused_clients = listCreate(); server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); server.blocked_last_cron = 0; @@ -3114,6 +3126,7 @@ void initServer(void) { server.in_eval = 0; server.in_exec = 0; server.propagate_in_transaction = 0; + server.client_pause_in_transaction = 0; server.child_pid = -1; server.child_type = CHILD_TYPE_NONE; server.rdb_child_type = RDB_CHILD_TYPE_NONE; @@ -3280,6 +3293,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) { c->flags |= CMD_FAST | CMD_CATEGORY_FAST; } else if (!strcasecmp(flag,"no-auth")) { c->flags |= CMD_NO_AUTH; + } else if (!strcasecmp(flag,"may-replicate")) { + c->flags |= CMD_MAY_REPLICATE; } else { /* Parse ACL categories here if the flag name starts with @. */ uint64_t catflag; @@ -3439,6 +3454,10 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, if (server.in_exec && !server.propagate_in_transaction) execCommandPropagateMulti(dbid); + /* This needs to be unreachable since the dataset should be fixed during + * client pause, otherwise data may be lossed during a failover. */ + serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction)); + if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) @@ -3477,6 +3496,7 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, * Redis command implementation in order to to force the propagation of a * specific command execution into AOF / Replication. */ void forceCommandPropagation(client *c, int flags) { + serverAssert(c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)); if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL; if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF; } @@ -3701,6 +3721,12 @@ void call(client *c, int flags) { } server.also_propagate = prev_also_propagate; + /* Client pause takes effect after a transaction has finished. This needs + * to be located after everything is propagated. */ + if (!server.in_exec && server.client_pause_in_transaction) { + server.client_pause_in_transaction = 0; + } + /* If the client has keys tracking enabled for client side caching, * make sure to remember the keys it fetched via this command. */ if (c->cmd->flags & CMD_READONLY) { @@ -3814,6 +3840,8 @@ int processCommand(client *c) { (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE)); int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) || (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING)); + int is_may_replicate_command = (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE))); /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ @@ -4012,6 +4040,17 @@ int processCommand(client *c) { return C_OK; } + /* If the server is paused, block the client until + * the pause has ended. Replicas are never paused. */ + if (!(c->flags & CLIENT_SLAVE) && + ((server.client_pause_type & CLIENT_PAUSE_ALL) || + (server.client_pause_type & CLIENT_PAUSE_WRITE && is_may_replicate_command))) + { + c->bpop.timeout = 0; + blockClient(c,BLOCKED_PAUSE); + return C_OK; + } + /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && @@ -4266,6 +4305,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) { flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth"); + flagcount += addReplyCommandFlag(c,cmd,CMD_MAY_REPLICATE, "may_replicate"); if (cmdHasMovableKeys(cmd)) { addReplyStatus(c, "movablekeys"); flagcount += 1; diff --git a/src/server.h b/src/server.h index fb6d71c71..586e085e9 100644 --- a/src/server.h +++ b/src/server.h @@ -181,33 +181,34 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CMD_ASKING (1ULL<<13) /* "cluster-asking" flag */ #define CMD_FAST (1ULL<<14) /* "fast" flag */ #define CMD_NO_AUTH (1ULL<<15) /* "no-auth" flag */ +#define CMD_MAY_REPLICATE (1ULL<<16) /* "may-replicate" flag */ /* Command flags used by the module system. */ -#define CMD_MODULE_GETKEYS (1ULL<<16) /* Use the modules getkeys interface. */ -#define CMD_MODULE_NO_CLUSTER (1ULL<<17) /* Deny on Redis Cluster. */ +#define CMD_MODULE_GETKEYS (1ULL<<17) /* Use the modules getkeys interface. */ +#define CMD_MODULE_NO_CLUSTER (1ULL<<18) /* Deny on Redis Cluster. */ /* Command flags that describe ACLs categories. */ -#define CMD_CATEGORY_KEYSPACE (1ULL<<18) -#define CMD_CATEGORY_READ (1ULL<<19) -#define CMD_CATEGORY_WRITE (1ULL<<20) -#define CMD_CATEGORY_SET (1ULL<<21) -#define CMD_CATEGORY_SORTEDSET (1ULL<<22) -#define CMD_CATEGORY_LIST (1ULL<<23) -#define CMD_CATEGORY_HASH (1ULL<<24) -#define CMD_CATEGORY_STRING (1ULL<<25) -#define CMD_CATEGORY_BITMAP (1ULL<<26) -#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<27) -#define CMD_CATEGORY_GEO (1ULL<<28) -#define CMD_CATEGORY_STREAM (1ULL<<29) -#define CMD_CATEGORY_PUBSUB (1ULL<<30) -#define CMD_CATEGORY_ADMIN (1ULL<<31) -#define CMD_CATEGORY_FAST (1ULL<<32) -#define CMD_CATEGORY_SLOW (1ULL<<33) -#define CMD_CATEGORY_BLOCKING (1ULL<<34) -#define CMD_CATEGORY_DANGEROUS (1ULL<<35) -#define CMD_CATEGORY_CONNECTION (1ULL<<36) -#define CMD_CATEGORY_TRANSACTION (1ULL<<37) -#define CMD_CATEGORY_SCRIPTING (1ULL<<38) +#define CMD_CATEGORY_KEYSPACE (1ULL<<19) +#define CMD_CATEGORY_READ (1ULL<<20) +#define CMD_CATEGORY_WRITE (1ULL<<21) +#define CMD_CATEGORY_SET (1ULL<<22) +#define CMD_CATEGORY_SORTEDSET (1ULL<<23) +#define CMD_CATEGORY_LIST (1ULL<<24) +#define CMD_CATEGORY_HASH (1ULL<<25) +#define CMD_CATEGORY_STRING (1ULL<<26) +#define CMD_CATEGORY_BITMAP (1ULL<<27) +#define CMD_CATEGORY_HYPERLOGLOG (1ULL<<28) +#define CMD_CATEGORY_GEO (1ULL<<29) +#define CMD_CATEGORY_STREAM (1ULL<<30) +#define CMD_CATEGORY_PUBSUB (1ULL<<31) +#define CMD_CATEGORY_ADMIN (1ULL<<32) +#define CMD_CATEGORY_FAST (1ULL<<33) +#define CMD_CATEGORY_SLOW (1ULL<<34) +#define CMD_CATEGORY_BLOCKING (1ULL<<35) +#define CMD_CATEGORY_DANGEROUS (1ULL<<36) +#define CMD_CATEGORY_CONNECTION (1ULL<<37) +#define CMD_CATEGORY_TRANSACTION (1ULL<<38) +#define CMD_CATEGORY_SCRIPTING (1ULL<<39) /* AOF states */ #define AOF_OFF 0 /* AOF is off */ @@ -250,10 +251,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put in the list of clients we can read from. */ -#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after - we return single threaded that the - client has already pending commands - to be executed. */ +#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully + * parsed command ready for execution. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to perform client side caching. */ #define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */ @@ -280,7 +279,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ #define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_NUM 6 /* Number of blocked states. */ +#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */ +#define BLOCKED_NUM 7 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -436,6 +436,14 @@ typedef enum { #define PROPAGATE_AOF 1 #define PROPAGATE_REPL 2 +/* Client pause types, larger types are more restrictive + * pause types than smaller pause types. */ +typedef enum { + CLIENT_PAUSE_OFF = 0, /* Pause no commands */ + CLIENT_PAUSE_WRITE, /* Pause write commands */ + CLIENT_PAUSE_ALL /* Pause all commands */ +} pause_type; + /* RDB active child save type. */ #define RDB_CHILD_TYPE_NONE 0 #define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ @@ -893,6 +901,7 @@ typedef struct client { sds peerid; /* Cached peer ID. */ sds sockname; /* Cached connection target address. */ listNode *client_list_node; /* list node in client list */ + listNode *paused_list_node; /* list node within the pause list */ RedisModuleUserChangedFunc auth_callback; /* Module callback to execute * when the authenticated user * changes. */ @@ -1137,6 +1146,7 @@ struct redisServer { int in_exec; /* Are we inside EXEC? */ int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ char *ignore_warnings; /* Config: warnings that should be ignored. */ + int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ /* Modules */ dict *moduleapi; /* Exported core APIs dictionary for modules. */ dict *sharedapi; /* Like moduleapi but containing the APIs that @@ -1171,8 +1181,9 @@ struct redisServer { rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ - int clients_paused; /* True if clients are currently paused */ - mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ + pause_type client_pause_type; /* True if clients are currently paused */ + list *paused_clients; /* List of pause clients */ + mstime_t client_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ @@ -1794,8 +1805,10 @@ char *getClientTypeName(int class); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); -void pauseClients(mstime_t duration); -int clientsArePaused(void); +void pauseClients(mstime_t duration, pause_type type); +void unpauseClients(void); +int areClientsPaused(void); +int checkClientPauseTimeoutAndReturnIfPaused(void); void processEventsWhileBlocked(void); void loadingCron(void); void whileBlockedCron(); @@ -2111,6 +2124,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev size_t freeMemoryGetNotCountedMemory(); int overMaxmemoryAfterAlloc(size_t moremem); int processCommand(client *c); +int processPendingCommandsAndResetClient(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); struct redisCommand *lookupCommand(sds name); diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl new file mode 100644 index 000000000..9f5cfd607 --- /dev/null +++ b/tests/unit/pause.tcl @@ -0,0 +1,200 @@ +start_server {tags {"pause"}} { + test "Test read commands are not blocked by client pause" { + r client PAUSE 100000000 WRITE + set rd [redis_deferring_client] + $rd GET FOO + $rd PING + $rd INFO + assert_equal [s 0 blocked_clients] 0 + r client unpause + $rd close + } + + test "Test write commands are paused by RO" { + r client PAUSE 100000000 WRITE + + set rd [redis_deferring_client] + $rd SET FOO BAR + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + + r client unpause + assert_match "OK" [$rd read] + $rd close + } + + test "Test special commands are paused by RO" { + r PFADD pause-hll test + r client PAUSE 100000000 WRITE + + # Test that pfcount, which can replicate, is also blocked + set rd [redis_deferring_client] + $rd PFCOUNT pause-hll + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + + # Test that publish, which adds the message to the replication + # stream is blocked. + set rd2 [redis_deferring_client] + $rd2 publish foo bar + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {2} + } else { + fail "Clients are not blocked" + } + + # Test that SCRIPT LOAD, which is replicated. + set rd3 [redis_deferring_client] + $rd3 script load "return 1" + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {3} + } else { + fail "Clients are not blocked" + } + + r client unpause + assert_match "1" [$rd read] + assert_match "0" [$rd2 read] + assert_match "*" [$rd3 read] + $rd close + $rd2 close + $rd3 close + } + + test "Test read/admin mutli-execs are not blocked by pause RO" { + r SET FOO BAR + r client PAUSE 100000000 WRITE + set rd [redis_deferring_client] + $rd MULTI + assert_equal [$rd read] "OK" + $rd PING + assert_equal [$rd read] "QUEUED" + $rd GET FOO + assert_equal [$rd read] "QUEUED" + $rd EXEC + assert_equal [s 0 blocked_clients] 0 + r client unpause + assert_match "PONG BAR" [$rd read] + $rd close + } + + test "Test write mutli-execs are blocked by pause RO" { + set rd [redis_deferring_client] + $rd MULTI + assert_equal [$rd read] "OK" + $rd SET FOO BAR + r client PAUSE 100000000 WRITE + assert_equal [$rd read] "QUEUED" + $rd EXEC + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r client unpause + assert_match "OK" [$rd read] + $rd close + } + + test "Test scripts are blocked by pause RO" { + r client PAUSE 100000000 WRITE + set rd [redis_deferring_client] + $rd EVAL "return 1" 0 + + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r client unpause + assert_match "1" [$rd read] + $rd close + } + + test "Test multiple clients can be queued up and unblocked" { + r client PAUSE 100000000 WRITE + set clients [list [redis_deferring_client] [redis_deferring_client] [redis_deferring_client]] + foreach client $clients { + $client SET FOO BAR + } + + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {3} + } else { + fail "Clients are not blocked" + } + r client unpause + foreach client $clients { + assert_match "OK" [$client read] + $client close + } + } + + test "Test clients with syntax errors will get responses immediately" { + r client PAUSE 100000000 WRITE + catch {r set FOO} err + assert_match "ERR wrong number of arguments for *" $err + r client unpause + } + + test "Test both active and passive expiries are skipped during client pause" { + set expired_keys [s 0 expired_keys] + r multi + r set foo bar PX 10 + r set bar foo PX 10 + r client PAUSE 100000000 WRITE + r exec + + wait_for_condition 10 100 { + [r get foo] == {} && [r get bar] == {} + } else { + fail "Keys were never logically expired" + } + + # No keys should actually have been expired + assert_match $expired_keys [s 0 expired_keys] + + r client unpause + + # Force the keys to expire + r get foo + r get bar + + # Now that clients have been unpaused, expires should go through + assert_match [expr $expired_keys + 2] [s 0 expired_keys] + } + + test "Test that client pause starts at the end of a transaction" { + r MULTI + r SET FOO1 BAR + r client PAUSE 100000000 WRITE + r SET FOO2 BAR + r exec + + set rd [redis_deferring_client] + $rd SET FOO3 BAR + + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + + assert_match "BAR" [r GET FOO1] + assert_match "BAR" [r GET FOO2] + assert_match "" [r GET FOO3] + + r client unpause + assert_match "OK" [$rd read] + $rd close + } + + # Make sure we unpause at the end + r client unpause +} diff --git a/tests/unit/wait.tcl b/tests/unit/wait.tcl index b1500cff8..b20900c23 100644 --- a/tests/unit/wait.tcl +++ b/tests/unit/wait.tcl @@ -42,4 +42,14 @@ start_server {} { $master incr foo assert {[$master wait 1 3000] == 0} } + + test {WAIT implicitly blocks on client pause since ACKs aren't sent} { + $master multi + $master incr foo + $master client pause 10000 write + $master exec + assert {[$master wait 1 1000] == 0} + $master client unpause + assert {[$master wait 1 1000] == 1} + } }}