diff --git a/.gitignore b/.gitignore index 8ed98aa32..e745f76a0 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ Makefile.dep compile_commands.json redis.code-workspace .cache +.cscope* +.swp diff --git a/src/blocked.c b/src/blocked.c index 0291505cb..85ef9170a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -183,8 +183,7 @@ void queueClientForReprocessing(client *c) { void unblockClient(client *c, int queue_for_reprocessing) { if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF || - c->bstate.btype == BLOCKED_WAIT_PREREPL) { + } else if (c->bstate.btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); @@ -200,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { /* Reset the client for a new query, unless the client has pending command to process * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ - if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN && - c->bstate.btype != BLOCKED_WAIT_PREREPL) { + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { freeClientOriginalArgv(c); /* Clients that are not blocked on keys are not reprocessed so we must * call reqresAppendResponse here (for clients blocked on key, @@ -211,11 +209,11 @@ void unblockClient(client *c, int queue_for_reprocessing) { resetClient(c); } + /* We count blocked client stats on regular clients and not on module clients */ + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; + server.blocked_clients_by_type[c->bstate.btype]--; /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ - if (!(c->flags & CLIENT_MODULE)) - server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */ - server.blocked_clients_by_type[c->bstate.btype]--; c->flags &= ~CLIENT_BLOCKED; c->bstate.btype = BLOCKED_NONE; c->bstate.unblock_on_nokey = 0; @@ -231,15 +229,19 @@ void replyToBlockedClientTimedOut(client *c) { addReplyNullArray(c); updateStatsOnUnblock(c, 0, 0, 0); } else if (c->bstate.btype == BLOCKED_WAIT) { - addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset)); - } else if (c->bstate.btype == BLOCKED_WAITAOF) { - addReplyArrayLen(c, 2); - addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset); - addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset)); + if (c->cmd->proc == waitCommand) { + addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->cmd->proc == waitaofCommand) { + addReplyArrayLen(c, 2); + addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset); + addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset)); + } else if (c->cmd->proc == clusterCommand) { + addReplyErrorObject(c, shared.noreplicaserr); + } else { + serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name); + } } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); - } else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) { - addReplyErrorObject(c, shared.noreplicaserr); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } @@ -585,29 +587,13 @@ static void handleClientsBlockedOnKey(readyList *rl) { } /* block a client for replica acknowledgement */ -void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) { +void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) { c->bstate.timeout = timeout; c->bstate.reploffset = offset; c->bstate.numreplicas = numreplicas; c->bstate.numlocal = numlocal; listAddNodeHead(server.clients_waiting_acks, c); - blockClient(c, btype); -} - -/* block a client due to pre-replication */ -void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0); - c->flags |= CLIENT_PENDING_COMMAND; -} - -/* block a client due to wait command */ -void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0); -} - -/* block a client due to waitaof command */ -void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal); + blockClient(c, BLOCKED_WAIT); } /* Postpone client from executing a command. For example the server might be busy diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 22fdb20cf..0de6351e9 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6018,7 +6018,7 @@ void clusterCommandSetSlot(client *c) { * This ensures that all replicas have the latest topology information, enabling * a reliable slot ownership transfer even if the primary node went down during * the process. */ - if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) { + if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) { forceCommandPropagation(c, PROPAGATE_REPL); /* We are a primary and this is the first time we see this `SETSLOT` * command. Force-replicate the command to all of our replicas @@ -6028,7 +6028,9 @@ void clusterCommandSetSlot(client *c) { * 2. The repl offset target is set to the master's current repl offset + 1. * There is no concern of partial replication because replicas always * ack the repl offset at the command boundary. */ - blockForPreReplication(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves); + blockClientForReplicaAck(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves, 0); + /* Mark client as pending command for execution after replication to replicas. */ + c->flags |= CLIENT_PENDING_COMMAND; replicationRequestAckFromSlaves(); return; } diff --git a/src/networking.c b/src/networking.c index e062bc3ab..9274f21c0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2068,7 +2068,7 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; c->slot = -1; - c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE); + c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE); /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); diff --git a/src/replication.c b/src/replication.c index 1d5e0fe29..375b637f6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3457,7 +3457,7 @@ void waitCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - blockForReplication(c, timeout, offset, numreplicas); + blockClientForReplicaAck(c, timeout, offset, numreplicas, 0); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3497,7 +3497,7 @@ void waitaofCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - blockForAofFsync(c, timeout, c->woff, numlocal, numreplicas); + blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3532,8 +3532,7 @@ void processClientsWaitingReplicas(void) { int numreplicas = 0; client *c = ln->value; - int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF; - int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL; + int is_wait_aof = c->cmd->proc == waitaofCommand; if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) { addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled."); @@ -3580,8 +3579,8 @@ void processClientsWaitingReplicas(void) { addReplyArrayLen(c, 2); addReplyLongLong(c, numlocal); addReplyLongLong(c, numreplicas); - } else if (is_wait_prerepl) { - c->flags |= CLIENT_PREREPL_DONE; + } else if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags |= CLIENT_REPLICATION_DONE; } else { addReplyLongLong(c, numreplicas); } diff --git a/src/server.h b/src/server.h index 249d896d3..2bacc991e 100644 --- a/src/server.h +++ b/src/server.h @@ -426,23 +426,21 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */ -#define CLIENT_PREREPL_DONE (1ULL << 51) /* Indicate that pre-replication has been done on the client */ +#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ typedef enum blocking_type { - BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ - BLOCKED_LIST, /* BLPOP & co. */ - BLOCKED_WAIT, /* WAIT for synchronous replication. */ - BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */ - BLOCKED_MODULE, /* Blocked by a loadable module. */ - BLOCKED_STREAM, /* XREAD. */ - BLOCKED_ZSET, /* BZPOP et al. */ - BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ - BLOCKED_SHUTDOWN, /* SHUTDOWN. */ - BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */ - BLOCKED_NUM, /* Number of blocked states. */ - BLOCKED_END /* End of enumeration */ + BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ + BLOCKED_LIST, /* BLPOP & co. */ + BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_MODULE, /* Blocked by a loadable module. */ + BLOCKED_STREAM, /* XREAD. */ + BLOCKED_ZSET, /* BZPOP et al. */ + BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ + BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_NUM, /* Number of blocked states. */ + BLOCKED_END /* End of enumeration */ } blocking_type; /* Client request types */ @@ -3498,9 +3496,7 @@ void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); void blockClientShutdown(client *c); void blockPostponeClient(client *c); -void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); -void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas); -void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas); +void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal); void replicationRequestAckFromSlaves(void); void signalDeletedKeyAsReady(serverDb *db, robj *key, int type); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);