Fix failover command test failures

Former-commit-id: d3c37c7159a92319759a33851669862a82cf1b28
This commit is contained in:
John Sully 2021-05-29 01:19:12 +00:00
parent 0cdb32f3e4
commit 97d6875862
4 changed files with 11 additions and 9 deletions

View File

@ -3742,8 +3742,8 @@ void flushSlavesOutputBuffers(void) {
* In such a case, the duration is set to the maximum and new end time and the * In such a case, the duration is set to the maximum and new end time and the
* type is set to the more restrictive type of pause. */ * type is set to the more restrictive type of pause. */
void pauseClients(mstime_t end, pause_type type) { void pauseClients(mstime_t end, pause_type type) {
if (type > serverTL->client_pause_type) { if (type > g_pserver->client_pause_type) {
serverTL->client_pause_type = type; g_pserver->client_pause_type = type;
} }
if (end > g_pserver->client_pause_end_time) { if (end > g_pserver->client_pause_end_time) {
@ -3766,7 +3766,7 @@ void unpauseClients(void) {
listIter li; listIter li;
client *c; client *c;
serverTL->client_pause_type = CLIENT_PAUSE_OFF; g_pserver->client_pause_type = CLIENT_PAUSE_OFF;
/* Unblock all of the clients so they are reprocessed. */ /* Unblock all of the clients so they are reprocessed. */
listRewind(g_pserver->paused_clients,&li); listRewind(g_pserver->paused_clients,&li);
@ -3779,7 +3779,7 @@ void unpauseClients(void) {
/* Returns true if clients are paused and false otherwise. */ /* Returns true if clients are paused and false otherwise. */
int areClientsPaused(void) { int areClientsPaused(void) {
return serverTL->client_pause_type != CLIENT_PAUSE_OFF; return g_pserver->client_pause_type != CLIENT_PAUSE_OFF;
} }
/* Checks if the current client pause has elapsed and unpause clients /* Checks if the current client pause has elapsed and unpause clients

View File

@ -2576,6 +2576,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* We may have recieved updates from clients about their current offset. NOTE: /* We may have recieved updates from clients about their current offset. NOTE:
* this can't be done where the ACK is recieved since failover will disconnect * this can't be done where the ACK is recieved since failover will disconnect
* our clients. */ * our clients. */
if (iel == IDX_EVENT_LOOP_MAIN)
updateFailoverStatus(); updateFailoverStatus();
/* Send the invalidation messages to clients participating to the /* Send the invalidation messages to clients participating to the
@ -3426,7 +3427,6 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->cclients = 0; pvar->cclients = 0;
pvar->in_eval = 0; pvar->in_eval = 0;
pvar->in_exec = 0; pvar->in_exec = 0;
pvar->client_pause_type = CLIENT_PAUSE_OFF;
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE); aeSetBeforeSleepProc(pvar->el, beforeSleep, AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
@ -3518,6 +3518,7 @@ void initServer(void) {
g_pserver->blocked_last_cron = 0; g_pserver->blocked_last_cron = 0;
g_pserver->replication_allowed = 1; g_pserver->replication_allowed = 1;
g_pserver->blocking_op_nesting = 0; g_pserver->blocking_op_nesting = 0;
g_pserver->client_pause_type = CLIENT_PAUSE_OFF;
if ((g_pserver->tls_port || g_pserver->tls_replication || g_pserver->tls_cluster) if ((g_pserver->tls_port || g_pserver->tls_replication || g_pserver->tls_cluster)
@ -4529,8 +4530,8 @@ int processCommand(client *c, int callFlags) {
/* If the server is paused, block the client until /* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */ * the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_SLAVE) && if (!(c->flags & CLIENT_SLAVE) &&
((serverTL->client_pause_type == CLIENT_PAUSE_ALL) || ((g_pserver->client_pause_type == CLIENT_PAUSE_ALL) ||
(serverTL->client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command))) (g_pserver->client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
{ {
c->bpop.timeout = 0; c->bpop.timeout = 0;
blockClient(c,BLOCKED_PAUSE); blockClient(c,BLOCKED_PAUSE);

View File

@ -1480,7 +1480,6 @@ struct redisServerThreadVars {
aeEventLoop *el; aeEventLoop *el;
socketFds ipfd; /* TCP socket file descriptors */ socketFds ipfd; /* TCP socket file descriptors */
socketFds tlsfd; /* TLS socket file descriptors */ socketFds tlsfd; /* TLS socket file descriptors */
pause_type client_pause_type; /* True if clients are currently paused */
int in_eval; /* Are we inside EVAL? */ int in_eval; /* Are we inside EVAL? */
int in_exec; /* Are we inside EXEC? */ int in_exec; /* Are we inside EXEC? */
std::vector<client*> clients_pending_write; /* There is to write or install handler. */ std::vector<client*> clients_pending_write; /* There is to write or install handler. */
@ -1620,6 +1619,7 @@ struct redisServer {
int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */
char *ignore_warnings; /* Config: warnings that should be ignored. */ char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
pause_type client_pause_type; /* True if clients are currently paused */
/* Modules */ /* Modules */
::dict *moduleapi; /* Exported core APIs dictionary for modules. */ ::dict *moduleapi; /* Exported core APIs dictionary for modules. */
::dict *sharedapi; /* Like moduleapi but containing the APIs that ::dict *sharedapi; /* Like moduleapi but containing the APIs that

View File

@ -279,6 +279,7 @@ start_server {} {
assert_match *slave* [$node_2 role] assert_match *slave* [$node_2 role]
# We will cycle all of our replicas here and force a psync. # We will cycle all of our replicas here and force a psync.
after 100
assert_equal [expr [s 0 sync_partial_ok] - $initial_psyncs] 2 assert_equal [expr [s 0 sync_partial_ok] - $initial_psyncs] 2
assert_equal [expr [s 0 sync_full] - $initial_syncs] 0 assert_equal [expr [s 0 sync_full] - $initial_syncs] 0