Refactor and (internally) rebrand from pause-clients to pause-actions (#11098)
Renamed from "Pause Clients" to "Pause Actions" since the mechanism can pause several actions in redis, not just clients (e.g. eviction, expiration). Previously each pause purpose (which has a timeout that's tracked separately from others purposes), also implicitly dictated what it pauses (reads, writes, eviction, etc). Now it is explicit, and the actions that are paused (bit flags) are defined separately from the purpose. - Previously, when using feature pause-client it also implicitly means to make the server static: - Pause replica traffic - Pauses eviction processing - Pauses expire processing Making the server static is used also for failover and shutdown. This PR internally rebrand pause-client API to become pause-action API. It also Simplifies pauseClients structure by replacing pointers array with static array. The context of this PR is to add another trigger to pause-client which will activated in case of OOM as throttling mechanism ([see here](https://github.com/redis/redis/issues/10907)). In this case we want only to pause client, and eviction actions.
This commit is contained in:
parent
38028dab8d
commit
c0d7226274
@ -2626,9 +2626,9 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
resetManualFailover();
|
||||
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
|
||||
server.cluster->mf_slave = sender;
|
||||
pauseClients(PAUSE_DURING_FAILOVER,
|
||||
pauseActions(PAUSE_DURING_FAILOVER,
|
||||
now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
|
||||
CLIENT_PAUSE_WRITE);
|
||||
PAUSE_ACTIONS_CLIENT_WRITE_SET);
|
||||
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
|
||||
sender->name);
|
||||
/* We need to send a ping message to the replica, as it would carry
|
||||
@ -3932,9 +3932,9 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
||||
* startup or to abort a manual failover in progress. */
|
||||
void resetManualFailover(void) {
|
||||
if (server.cluster->mf_slave) {
|
||||
/* We were a master failing over, so we paused clients. Regardless
|
||||
* of the outcome we unpause now to allow traffic again. */
|
||||
unpauseClients(PAUSE_DURING_FAILOVER);
|
||||
/* We were a master failing over, so we paused clients and related actions.
|
||||
* Regardless of the outcome we unpause now to allow traffic again. */
|
||||
unpauseActions(PAUSE_DURING_FAILOVER);
|
||||
}
|
||||
server.cluster->mf_end = 0; /* No manual failover in progress. */
|
||||
server.cluster->mf_can_start = 0;
|
||||
|
9
src/db.c
9
src/db.c
@ -1680,11 +1680,10 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) {
|
||||
if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
|
||||
return 1;
|
||||
|
||||
/* If clients are paused, we keep the current dataset constant,
|
||||
* but return to the client what we believe is the right state. Typically,
|
||||
* at the end of the pause we will properly expire the key OR we will
|
||||
* have failed over and the new primary will send us the expire. */
|
||||
if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
|
||||
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
|
||||
* Typically, at the end of the pause we will properly expire the key OR we
|
||||
* will have failed over and the new primary will send us the expire. */
|
||||
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return 1;
|
||||
|
||||
/* Delete the key */
|
||||
deleteExpiredKeyAndPropagate(db,key);
|
||||
|
@ -487,10 +487,8 @@ static int isSafeToPerformEvictions(void) {
|
||||
* and just be masters exact copies. */
|
||||
if (server.masterhost && server.repl_slave_ignore_maxmemory) return 0;
|
||||
|
||||
/* When clients are paused the dataset should be static not just from the
|
||||
* POV of clients not being able to write, but also from the POV of
|
||||
* expires and evictions of keys not being performed. */
|
||||
if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
|
||||
/* If 'evict' action is paused, for whatever reason, then return false */
|
||||
if (isPausedActionsWithUpdate(PAUSE_ACTION_EVICT)) return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
@ -135,10 +135,10 @@ void activeExpireCycle(int type) {
|
||||
int dbs_per_call = CRON_DBS_PER_CALL;
|
||||
long long start = ustime(), timelimit, elapsed;
|
||||
|
||||
/* When clients are paused the dataset should be static not just from the
|
||||
* POV of clients not being able to write, but also from the POV of
|
||||
* expires and evictions of keys not being performed. */
|
||||
if (checkClientPauseTimeoutAndReturnIfPaused()) return;
|
||||
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
|
||||
* Typically, at the end of the pause we will properly expire the key OR we
|
||||
* will have failed over and the new primary will send us the expire. */
|
||||
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return;
|
||||
|
||||
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
|
||||
/* Don't start a fast cycle if the previous cycle did not exit
|
||||
|
@ -3654,7 +3654,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
* periodically in timer callbacks or other periodic callbacks.
|
||||
*/
|
||||
int RM_AvoidReplicaTraffic() {
|
||||
return checkClientPauseTimeoutAndReturnIfPaused();
|
||||
return !!(isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA));
|
||||
}
|
||||
|
||||
/* Change the currently selected DB. Returns an error if the id
|
||||
|
127
src/networking.c
127
src/networking.c
@ -38,6 +38,7 @@
|
||||
#include <ctype.h>
|
||||
|
||||
static void setProtocolError(const char *errstr, client *c);
|
||||
static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
|
||||
int postponeClientRead(client *c);
|
||||
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
|
||||
|
||||
@ -3148,20 +3149,18 @@ NULL
|
||||
addReplyNull(c);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
|
||||
/* CLIENT UNPAUSE */
|
||||
unpauseClients(PAUSE_BY_CLIENT_COMMAND);
|
||||
unpauseActions(PAUSE_BY_CLIENT_COMMAND);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
|
||||
c->argc == 4))
|
||||
{
|
||||
/* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
|
||||
mstime_t end;
|
||||
int type = CLIENT_PAUSE_ALL;
|
||||
int isPauseClientAll = 1;
|
||||
if (c->argc == 4) {
|
||||
if (!strcasecmp(c->argv[3]->ptr,"write")) {
|
||||
type = CLIENT_PAUSE_WRITE;
|
||||
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
|
||||
type = CLIENT_PAUSE_ALL;
|
||||
} else {
|
||||
isPauseClientAll = 0;
|
||||
} else if (strcasecmp(c->argv[3]->ptr,"all")) {
|
||||
addReplyError(c,
|
||||
"CLIENT PAUSE mode must be WRITE or ALL");
|
||||
return;
|
||||
@ -3170,7 +3169,7 @@ NULL
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
|
||||
UNIT_MILLISECONDS) != C_OK) return;
|
||||
pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type);
|
||||
pauseClientsByClient(end, isPauseClientAll);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||
@ -3816,38 +3815,26 @@ void flushSlavesOutputBuffers(void) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Compute current most restrictive pause type and its end time, aggregated for
|
||||
/* Compute current paused actions and its end time, aggregated for
|
||||
* all pause purposes. */
|
||||
static void updateClientPauseTypeAndEndTime(void) {
|
||||
pause_type old_type = server.client_pause_type;
|
||||
pause_type type = CLIENT_PAUSE_OFF;
|
||||
mstime_t end = 0;
|
||||
void updatePausedActions(void) {
|
||||
uint32_t prev_paused_actions = server.paused_actions;
|
||||
server.paused_actions = 0;
|
||||
|
||||
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
|
||||
pause_event *p = server.client_pause_per_purpose[i];
|
||||
if (p == NULL) {
|
||||
/* Nothing to do. */
|
||||
} else if (p->end < server.mstime) {
|
||||
/* This one expired. */
|
||||
zfree(p);
|
||||
server.client_pause_per_purpose[i] = NULL;
|
||||
} else if (p->type > type) {
|
||||
/* This type is the most restrictive so far. */
|
||||
type = p->type;
|
||||
pause_event *p = &(server.client_pause_per_purpose[i]);
|
||||
if (p->end > server.mstime)
|
||||
server.paused_actions |= p->paused_actions;
|
||||
else {
|
||||
p->paused_actions = 0;
|
||||
p->end = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Find the furthest end time among the pause purposes of the most
|
||||
* restrictive type */
|
||||
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
|
||||
pause_event *p = server.client_pause_per_purpose[i];
|
||||
if (p != NULL && p->type == type && p->end > end) end = p->end;
|
||||
}
|
||||
server.client_pause_type = type;
|
||||
server.client_pause_end_time = end;
|
||||
|
||||
/* If the pause type is less restrictive than before, we unblock all clients
|
||||
* so they are reprocessed (may get re-paused). */
|
||||
if (type < old_type) {
|
||||
uint32_t mask_cli = (PAUSE_ACTION_CLIENT_WRITE|PAUSE_ACTION_CLIENT_ALL);
|
||||
if ((server.paused_actions & mask_cli) < (prev_paused_actions & mask_cli)) {
|
||||
unblockPostponedClients();
|
||||
}
|
||||
}
|
||||
@ -3864,7 +3851,26 @@ void unblockPostponedClients() {
|
||||
}
|
||||
}
|
||||
|
||||
/* Pause clients up to the specified unixtime (in ms) for a given type of
|
||||
/* Set pause-client end-time and restricted action. If already paused, then:
|
||||
* 1. Keep higher end-time value between configured and the new one
|
||||
* 2. Keep most restrictive action between configured and the new one */
|
||||
static void pauseClientsByClient(mstime_t endTime, int isPauseClientAll) {
|
||||
uint32_t actions;
|
||||
pause_event *p = &server.client_pause_per_purpose[PAUSE_BY_CLIENT_COMMAND];
|
||||
|
||||
if (isPauseClientAll)
|
||||
actions = PAUSE_ACTIONS_CLIENT_ALL_SET;
|
||||
else {
|
||||
actions = PAUSE_ACTIONS_CLIENT_WRITE_SET;
|
||||
/* If currently configured most restrictive client pause, then keep it */
|
||||
if (p->paused_actions & PAUSE_ACTION_CLIENT_ALL)
|
||||
actions = PAUSE_ACTIONS_CLIENT_ALL_SET;
|
||||
}
|
||||
|
||||
pauseActions(PAUSE_BY_CLIENT_COMMAND, endTime, actions);
|
||||
}
|
||||
|
||||
/* Pause actions up to the specified unixtime (in ms) for a given type of
|
||||
* commands.
|
||||
*
|
||||
* A main use case of this function is to allow pausing replication traffic
|
||||
@ -3875,20 +3881,17 @@ void unblockPostponedClients() {
|
||||
* failover procedure implemented by CLUSTER FAILOVER.
|
||||
*
|
||||
* The function always succeed, even if there is already a pause in progress.
|
||||
* In such a case, the duration is set to the maximum and new end time and the
|
||||
* type is set to the more restrictive type of pause. */
|
||||
void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) {
|
||||
* The new paused_actions of a given 'purpose' will override the old ones and
|
||||
* end time will be updated if new end time is bigger than currently configured */
|
||||
void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions) {
|
||||
/* Manage pause type and end time per pause purpose. */
|
||||
if (server.client_pause_per_purpose[purpose] == NULL) {
|
||||
server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event));
|
||||
server.client_pause_per_purpose[purpose]->type = type;
|
||||
server.client_pause_per_purpose[purpose]->end = end;
|
||||
} else {
|
||||
pause_event *p = server.client_pause_per_purpose[purpose];
|
||||
p->type = max(p->type, type);
|
||||
p->end = max(p->end, end);
|
||||
}
|
||||
updateClientPauseTypeAndEndTime();
|
||||
server.client_pause_per_purpose[purpose].paused_actions = actions;
|
||||
|
||||
/* If currently configured end time bigger than new one, then keep it */
|
||||
if (server.client_pause_per_purpose[purpose].end < end)
|
||||
server.client_pause_per_purpose[purpose].end = end;
|
||||
|
||||
updatePausedActions();
|
||||
|
||||
/* We allow write commands that were queued
|
||||
* up before and after to execute. We need
|
||||
@ -3899,29 +3902,23 @@ void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Unpause clients and queue them for reprocessing. */
|
||||
void unpauseClients(pause_purpose purpose) {
|
||||
if (server.client_pause_per_purpose[purpose] == NULL) return;
|
||||
zfree(server.client_pause_per_purpose[purpose]);
|
||||
server.client_pause_per_purpose[purpose] = NULL;
|
||||
updateClientPauseTypeAndEndTime();
|
||||
/* Unpause actions and queue them for reprocessing. */
|
||||
void unpauseActions(pause_purpose purpose) {
|
||||
server.client_pause_per_purpose[purpose].end = 0;
|
||||
server.client_pause_per_purpose[purpose].paused_actions = 0;
|
||||
updatePausedActions();
|
||||
}
|
||||
|
||||
/* Returns true if clients are paused and false otherwise. */
|
||||
int areClientsPaused(void) {
|
||||
return server.client_pause_type != CLIENT_PAUSE_OFF;
|
||||
/* Returns bitmask of paused actions */
|
||||
uint32_t isPausedActions(uint32_t actions_bitmask) {
|
||||
return (server.paused_actions & actions_bitmask);
|
||||
}
|
||||
|
||||
/* Checks if the current client pause has elapsed and unpause clients
|
||||
* if it has. Also returns true if clients are now paused and false
|
||||
* otherwise. */
|
||||
int checkClientPauseTimeoutAndReturnIfPaused(void) {
|
||||
if (!areClientsPaused())
|
||||
return 0;
|
||||
if (server.client_pause_end_time < server.mstime) {
|
||||
updateClientPauseTypeAndEndTime();
|
||||
}
|
||||
return areClientsPaused();
|
||||
/* Returns bitmask of paused actions */
|
||||
uint32_t isPausedActionsWithUpdate(uint32_t actions_bitmask) {
|
||||
if (!(server.paused_actions & actions_bitmask)) return 0;
|
||||
updatePausedActions();
|
||||
return (server.paused_actions & actions_bitmask);
|
||||
}
|
||||
|
||||
/* This function is called by Redis in order to process a few events from
|
||||
|
@ -3627,7 +3627,7 @@ void replicationCron(void) {
|
||||
((server.cluster_enabled &&
|
||||
server.cluster->mf_end) ||
|
||||
server.failover_end_time) &&
|
||||
checkClientPauseTimeoutAndReturnIfPaused();
|
||||
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
|
||||
|
||||
if (!manual_failover_in_progress) {
|
||||
ping_argv[0] = shared.ping;
|
||||
@ -3865,7 +3865,7 @@ void clearFailoverState() {
|
||||
server.target_replica_host = NULL;
|
||||
server.target_replica_port = 0;
|
||||
server.failover_state = NO_FAILOVER;
|
||||
unpauseClients(PAUSE_DURING_FAILOVER);
|
||||
unpauseActions(PAUSE_DURING_FAILOVER);
|
||||
}
|
||||
|
||||
/* Abort an ongoing failover if one is going on. */
|
||||
@ -4014,7 +4014,9 @@ void failoverCommand(client *c) {
|
||||
server.force_failover = force_flag;
|
||||
server.failover_state = FAILOVER_WAIT_FOR_SYNC;
|
||||
/* Cluster failover will unpause eventually */
|
||||
pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE);
|
||||
pauseActions(PAUSE_DURING_FAILOVER,
|
||||
LLONG_MAX,
|
||||
PAUSE_ACTIONS_CLIENT_WRITE_SET);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
||||
|
28
src/server.c
28
src/server.c
@ -1377,8 +1377,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Clear the paused clients state if needed. */
|
||||
checkClientPauseTimeoutAndReturnIfPaused();
|
||||
/* Clear the paused actions state if needed. */
|
||||
updatePausedActions();
|
||||
|
||||
/* Replication cron function -- used to reconnect to master,
|
||||
* detect transfer failures, start background RDB transfers and so forth.
|
||||
@ -1615,7 +1615,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
* We also don't send the ACKs while clients are paused, since it can
|
||||
* increment the replication backlog, they'll be sent after the pause
|
||||
* if we are still the master. */
|
||||
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
|
||||
if (server.get_ack_from_slaves && !isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA)) {
|
||||
sendGetackToReplicas();
|
||||
server.get_ack_from_slaves = 0;
|
||||
}
|
||||
@ -2448,8 +2448,7 @@ void initServer(void) {
|
||||
server.tracking_pending_keys = listCreate();
|
||||
server.clients_waiting_acks = listCreate();
|
||||
server.get_ack_from_slaves = 0;
|
||||
server.client_pause_type = CLIENT_PAUSE_OFF;
|
||||
server.client_pause_end_time = 0;
|
||||
server.paused_actions = 0;
|
||||
memset(server.client_pause_per_purpose, 0,
|
||||
sizeof(server.client_pause_per_purpose));
|
||||
server.postponed_clients = listCreate();
|
||||
@ -3135,9 +3134,10 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
|
||||
if (!shouldPropagate(target))
|
||||
return;
|
||||
|
||||
/* This needs to be unreachable since the dataset should be fixed during
|
||||
* client pause, otherwise data may be lost during a failover. */
|
||||
serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
|
||||
/* This needs to be unreachable since the dataset should be fixed during
|
||||
* replica pause (otherwise data may be lost during a failover) */
|
||||
serverAssert(!(isPausedActions(PAUSE_ACTION_REPLICA) &&
|
||||
(!server.client_pause_in_transaction)));
|
||||
|
||||
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
|
||||
feedAppendOnlyFile(dbid,argv,argc);
|
||||
@ -3959,8 +3959,8 @@ int processCommand(client *c) {
|
||||
/* If the server is paused, block the client until
|
||||
* the pause has ended. Replicas are never paused. */
|
||||
if (!(c->flags & CLIENT_SLAVE) &&
|
||||
((server.client_pause_type == CLIENT_PAUSE_ALL) ||
|
||||
(server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
|
||||
((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
|
||||
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command)))
|
||||
{
|
||||
c->bpop.timeout = 0;
|
||||
blockClient(c,BLOCKED_POSTPONE);
|
||||
@ -4072,8 +4072,10 @@ int prepareForShutdown(int flags) {
|
||||
!isReadyToShutdown())
|
||||
{
|
||||
server.shutdown_mstime = server.mstime + server.shutdown_timeout * 1000;
|
||||
if (!areClientsPaused()) sendGetackToReplicas();
|
||||
pauseClients(PAUSE_DURING_SHUTDOWN, LLONG_MAX, CLIENT_PAUSE_WRITE);
|
||||
if (!isPausedActions(PAUSE_ACTION_REPLICA)) sendGetackToReplicas();
|
||||
pauseActions(PAUSE_DURING_SHUTDOWN,
|
||||
LLONG_MAX,
|
||||
PAUSE_ACTIONS_CLIENT_WRITE_SET);
|
||||
serverLog(LL_NOTICE, "Waiting for replicas before shutting down.");
|
||||
return C_ERR;
|
||||
}
|
||||
@ -4107,7 +4109,7 @@ static void cancelShutdown(void) {
|
||||
server.shutdown_mstime = 0;
|
||||
server.last_sig_received = 0;
|
||||
replyToClientsBlockedOnShutdown();
|
||||
unpauseClients(PAUSE_DURING_SHUTDOWN);
|
||||
unpauseActions(PAUSE_DURING_SHUTDOWN);
|
||||
}
|
||||
|
||||
/* Returns C_OK if shutdown was aborted and C_ERR if shutdown wasn't ongoing. */
|
||||
|
39
src/server.h
39
src/server.h
@ -568,13 +568,22 @@ typedef enum {
|
||||
#define PROPAGATE_AOF 1
|
||||
#define PROPAGATE_REPL 2
|
||||
|
||||
/* Client pause types, larger types are more restrictive
|
||||
* pause types than smaller pause types. */
|
||||
typedef enum {
|
||||
CLIENT_PAUSE_OFF = 0, /* Pause no commands */
|
||||
CLIENT_PAUSE_WRITE, /* Pause write commands */
|
||||
CLIENT_PAUSE_ALL /* Pause all commands */
|
||||
} pause_type;
|
||||
/* Actions pause types */
|
||||
#define PAUSE_ACTION_CLIENT_WRITE (1<<0)
|
||||
#define PAUSE_ACTION_CLIENT_ALL (1<<1) /* must be bigger than PAUSE_ACTION_CLIENT_WRITE */
|
||||
#define PAUSE_ACTION_EXPIRE (1<<2)
|
||||
#define PAUSE_ACTION_EVICT (1<<3)
|
||||
#define PAUSE_ACTION_REPLICA (1<<4) /* pause replica traffic */
|
||||
|
||||
/* common sets of actions to pause/unpause */
|
||||
#define PAUSE_ACTIONS_CLIENT_WRITE_SET (PAUSE_ACTION_CLIENT_WRITE|\
|
||||
PAUSE_ACTION_EXPIRE|\
|
||||
PAUSE_ACTION_EVICT|\
|
||||
PAUSE_ACTION_REPLICA)
|
||||
#define PAUSE_ACTIONS_CLIENT_ALL_SET (PAUSE_ACTION_CLIENT_ALL|\
|
||||
PAUSE_ACTION_EXPIRE|\
|
||||
PAUSE_ACTION_EVICT|\
|
||||
PAUSE_ACTION_REPLICA)
|
||||
|
||||
/* Client pause purposes. Each purpose has its own end time and pause type. */
|
||||
typedef enum {
|
||||
@ -585,7 +594,7 @@ typedef enum {
|
||||
} pause_purpose;
|
||||
|
||||
typedef struct {
|
||||
pause_type type;
|
||||
uint32_t paused_actions; /* Bitmask of actions */
|
||||
mstime_t end;
|
||||
} pause_event;
|
||||
|
||||
@ -1531,10 +1540,9 @@ struct redisServer {
|
||||
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
|
||||
int in_nested_call; /* If > 0, in a nested call of a call */
|
||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||
pause_type client_pause_type; /* True if clients are currently paused */
|
||||
uint32_t paused_actions; /* Bitmask of actions that are currently paused */
|
||||
list *postponed_clients; /* List of postponed clients */
|
||||
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
|
||||
pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES];
|
||||
pause_event client_pause_per_purpose[NUM_PAUSE_PURPOSES];
|
||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
|
||||
@ -2528,10 +2536,11 @@ void flushSlavesOutputBuffers(void);
|
||||
void disconnectSlaves(void);
|
||||
void evictClients(void);
|
||||
int listenToPort(connListener *fds);
|
||||
void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
|
||||
void unpauseClients(pause_purpose purpose);
|
||||
int areClientsPaused(void);
|
||||
int checkClientPauseTimeoutAndReturnIfPaused(void);
|
||||
void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions_bitmask);
|
||||
void unpauseActions(pause_purpose purpose);
|
||||
uint32_t isPausedActions(uint32_t action_bitmask);
|
||||
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
|
||||
void updatePausedActions(void);
|
||||
void unblockPostponedClients();
|
||||
void processEventsWhileBlocked(void);
|
||||
void whileBlockedCron();
|
||||
|
@ -10,6 +10,45 @@ start_server {tags {"pause network"}} {
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Test old pause-all takes precedence over new pause-write (less restrictive)" {
|
||||
# Scenario:
|
||||
# 1. Run 'PAUSE ALL' for 200msec
|
||||
# 2. Run 'PAUSE WRITE' for 10 msec
|
||||
# 3. Wait 50msec
|
||||
# 4. 'GET FOO'.
|
||||
# Expected that:
|
||||
# - While the time of the second 'PAUSE' is shorter than first 'PAUSE',
|
||||
# pause-client feature will stick to the longer one, i.e, will be paused
|
||||
# up to 200msec.
|
||||
# - The GET command will be postponed ~200msec, even though last command
|
||||
# paused only WRITE. This is because the first 'PAUSE ALL' command is
|
||||
# more restrictive than the second 'PAUSE WRITE' and pause-client feature
|
||||
# preserve most restrictive configuration among multiple settings.
|
||||
set rd [redis_deferring_client]
|
||||
$rd SET FOO BAR
|
||||
|
||||
set test_start_time [clock milliseconds]
|
||||
r client PAUSE 200 ALL
|
||||
r client PAUSE 20 WRITE
|
||||
after 50
|
||||
$rd get FOO
|
||||
set elapsed [expr {[clock milliseconds]-$test_start_time}]
|
||||
assert_lessthan 200 $elapsed
|
||||
}
|
||||
|
||||
test "Test new pause time is smaller than old one, then old time preserved" {
|
||||
r client PAUSE 60000 WRITE
|
||||
r client PAUSE 10 WRITE
|
||||
after 100
|
||||
set rd [redis_deferring_client]
|
||||
$rd SET FOO BAR
|
||||
wait_for_blocked_clients_count 1 100 10
|
||||
|
||||
r client unpause
|
||||
assert_match "OK" [$rd read]
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Test write commands are paused by RO" {
|
||||
r client PAUSE 60000 WRITE
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user