
The metric tracks cpu time in micro-seconds, sharing the same value as `INFO COMMANDSTATS`, aggregated under per-slot context. --------- Signed-off-by: Kyle Kim <kimkyle@amazon.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
731 lines
33 KiB
C
731 lines
33 KiB
C
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
|
|
*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
* ---------------------------------------------------------------------------
|
|
*
|
|
* API:
|
|
*
|
|
* blockClient() set the CLIENT_BLOCKED flag in the client, and set the
|
|
* specified block type 'btype' filed to one of BLOCKED_* macros.
|
|
*
|
|
* unblockClient() unblocks the client doing the following:
|
|
* 1) It calls the btype-specific function to cleanup the state.
|
|
* 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
|
|
* 3) It puts the client into a list of just unblocked clients that are
|
|
* processed ASAP in the beforeSleep() event loop callback, so that
|
|
* if there is some query buffer to process, we do it. This is also
|
|
* required because otherwise there is no 'readable' event fired, we
|
|
* already read the pending commands. We also set the CLIENT_UNBLOCKED
|
|
* flag to remember the client is in the unblocked_clients list.
|
|
*
|
|
* processUnblockedClients() is called inside the beforeSleep() function
|
|
* to process the query buffer from unblocked clients and remove the clients
|
|
* from the blocked_clients queue.
|
|
*
|
|
* replyToBlockedClientTimedOut() is called by the cron function when
|
|
* a client blocked reaches the specified timeout (if the timeout is set
|
|
* to 0, no timeout is processed).
|
|
* It usually just needs to send a reply to the client.
|
|
*
|
|
* When implementing a new type of blocking operation, the implementation
|
|
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
|
|
* to handle the btype-specific behavior of this two functions.
|
|
* If the blocking operation waits for certain keys to change state, the
|
|
* clusterRedirectBlockedClientIfNeeded() function should also be updated.
|
|
*/
|
|
|
|
#include "server.h"
|
|
#include "slowlog.h"
|
|
#include "latency.h"
|
|
#include "monotonic.h"
|
|
#include "cluster_slot_stats.h"
|
|
|
|
/* forward declarations */
|
|
static void unblockClientWaitingData(client *c);
|
|
static void handleClientsBlockedOnKey(readyList *rl);
|
|
static void unblockClientOnKey(client *c, robj *key);
|
|
static void moduleUnblockClientOnKey(client *c, robj *key);
|
|
static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
|
|
|
|
void initClientBlockingState(client *c) {
|
|
c->bstate.btype = BLOCKED_NONE;
|
|
c->bstate.timeout = 0;
|
|
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
|
|
c->bstate.numreplicas = 0;
|
|
c->bstate.reploffset = 0;
|
|
c->bstate.unblock_on_nokey = 0;
|
|
c->bstate.async_rm_call_handle = NULL;
|
|
}
|
|
|
|
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
|
|
* flag is set client query buffer is not longer processed, but accumulated,
|
|
* and will be processed when the client is unblocked. */
|
|
void blockClient(client *c, int btype) {
|
|
/* Primary client should never be blocked unless pause or module */
|
|
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
|
|
|
|
c->flag.blocked = 1;
|
|
c->bstate.btype = btype;
|
|
if (!c->flag.module)
|
|
server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
|
|
server.blocked_clients_by_type[btype]++;
|
|
addClientToTimeoutTable(c);
|
|
}
|
|
|
|
/* Usually when a client is unblocked due to being blocked while processing some command
|
|
* he will attempt to reprocess the command which will update the statistics.
|
|
* However in case the client was timed out or in case of module blocked client is being unblocked
|
|
* the command will not be reprocessed and we need to make stats update.
|
|
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.*/
|
|
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
|
|
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
|
|
c->lastcmd->microseconds += total_cmd_duration;
|
|
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
|
|
c->lastcmd->calls++;
|
|
c->commands_processed++;
|
|
server.stat_numcommands++;
|
|
if (had_errors) c->lastcmd->failed_calls++;
|
|
if (server.latency_tracking_enabled)
|
|
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration * 1000);
|
|
/* Log the command into the Slow log if needed. */
|
|
slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
|
|
c->duration = 0;
|
|
/* Log the reply duration event. */
|
|
latencyAddSampleIfNeeded("command-unblocking", reply_us / 1000);
|
|
}
|
|
|
|
/* This function is called in the beforeSleep() function of the event loop
|
|
* in order to process the pending input buffer of clients that were
|
|
* unblocked after a blocking operation. */
|
|
void processUnblockedClients(void) {
|
|
listNode *ln;
|
|
client *c;
|
|
|
|
while (listLength(server.unblocked_clients)) {
|
|
ln = listFirst(server.unblocked_clients);
|
|
serverAssert(ln != NULL);
|
|
c = ln->value;
|
|
listDelNode(server.unblocked_clients, ln);
|
|
c->flag.unblocked = 0;
|
|
|
|
if (c->flag.module) {
|
|
if (!c->flag.blocked) {
|
|
moduleCallCommandUnblockedHandler(c);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
/* Process remaining data in the input buffer, unless the client
|
|
* is blocked again. Actually processInputBuffer() checks that the
|
|
* client is not blocked before to proceed, but things may change and
|
|
* the code is conceptually more correct this way. */
|
|
if (!c->flag.blocked) {
|
|
/* If we have a queued command, execute it now. */
|
|
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
|
|
continue;
|
|
}
|
|
}
|
|
beforeNextClient(c);
|
|
}
|
|
}
|
|
|
|
/* This function will schedule the client for reprocessing at a safe time.
|
|
*
|
|
* This is useful when a client was blocked for some reason (blocking operation,
|
|
* CLIENT PAUSE, or whatever), because it may end with some accumulated query
|
|
* buffer that needs to be processed ASAP:
|
|
*
|
|
* 1. When a client is blocked, its readable handler is still active.
|
|
* 2. However in this case it only gets data into the query buffer, but the
|
|
* query is not parsed or executed once there is enough to proceed as
|
|
* usually (because the client is blocked... so we can't execute commands).
|
|
* 3. When the client is unblocked, without this function, the client would
|
|
* have to write some query in order for the readable handler to finally
|
|
* call processQueryBuffer*() on it.
|
|
* 4. With this function instead we can put the client in a queue that will
|
|
* process it for queries ready to be executed at a safe time.
|
|
*/
|
|
void queueClientForReprocessing(client *c) {
|
|
/* The client may already be into the unblocked list because of a previous
|
|
* blocking operation, don't add back it into the list multiple times. */
|
|
if (!c->flag.unblocked) {
|
|
c->flag.unblocked = 1;
|
|
listAddNodeTail(server.unblocked_clients, c);
|
|
}
|
|
}
|
|
|
|
/* Unblock a client calling the right function depending on the kind
|
|
* of operation the client is blocking for. */
|
|
void unblockClient(client *c, int queue_for_reprocessing) {
|
|
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
|
|
unblockClientWaitingData(c);
|
|
} else if (c->bstate.btype == BLOCKED_WAIT) {
|
|
unblockClientWaitingReplicas(c);
|
|
} else if (c->bstate.btype == BLOCKED_MODULE) {
|
|
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
|
|
unblockClientFromModule(c);
|
|
} else if (c->bstate.btype == BLOCKED_POSTPONE) {
|
|
listDelNode(server.postponed_clients, c->postponed_list_node);
|
|
c->postponed_list_node = NULL;
|
|
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
|
|
/* No special cleanup. */
|
|
} else {
|
|
serverPanic("Unknown btype in unblockClient().");
|
|
}
|
|
|
|
/* Reset the client for a new query, unless the client has pending command to process
|
|
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
|
|
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
|
|
freeClientOriginalArgv(c);
|
|
/* Clients that are not blocked on keys are not reprocessed so we must
|
|
* call reqresAppendResponse here (for clients blocked on key,
|
|
* unblockClientOnKey is called, which eventually calls processCommand,
|
|
* which calls reqresAppendResponse) */
|
|
reqresAppendResponse(c);
|
|
resetClient(c);
|
|
}
|
|
|
|
/* We count blocked client stats on regular clients and not on module clients */
|
|
if (!c->flag.module) server.blocked_clients--;
|
|
server.blocked_clients_by_type[c->bstate.btype]--;
|
|
/* Clear the flags, and put the client in the unblocked list so that
|
|
* we'll process new commands in its query buffer ASAP. */
|
|
c->flag.blocked = 0;
|
|
c->bstate.btype = BLOCKED_NONE;
|
|
c->bstate.unblock_on_nokey = 0;
|
|
removeClientFromTimeoutTable(c);
|
|
if (queue_for_reprocessing) queueClientForReprocessing(c);
|
|
}
|
|
|
|
/* This function gets called when a blocked client timed out in order to
|
|
* send it a reply of some kind. After this function is called,
|
|
* unblockClient() will be called with the same client as argument. */
|
|
void replyToBlockedClientTimedOut(client *c) {
|
|
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
|
|
addReplyNullArray(c);
|
|
updateStatsOnUnblock(c, 0, 0, 0);
|
|
} else if (c->bstate.btype == BLOCKED_WAIT) {
|
|
if (c->cmd->proc == waitCommand) {
|
|
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
|
|
} else if (c->cmd->proc == waitaofCommand) {
|
|
addReplyArrayLen(c, 2);
|
|
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
|
|
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
|
|
} else if (c->cmd->proc == clusterCommand) {
|
|
addReplyErrorObject(c, shared.noreplicaserr);
|
|
} else {
|
|
serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name);
|
|
}
|
|
} else if (c->bstate.btype == BLOCKED_MODULE) {
|
|
moduleBlockedClientTimedOut(c, 0);
|
|
} else {
|
|
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
|
}
|
|
}
|
|
|
|
/* If one or more clients are blocked on the SHUTDOWN command, this function
|
|
* sends them an error reply and unblocks them. */
|
|
void replyToClientsBlockedOnShutdown(void) {
|
|
if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return;
|
|
listNode *ln;
|
|
listIter li;
|
|
listRewind(server.clients, &li);
|
|
while ((ln = listNext(&li))) {
|
|
client *c = listNodeValue(ln);
|
|
if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) {
|
|
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
|
|
unblockClient(c, 1);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Mass-unblock clients because something changed in the instance that makes
|
|
* blocking no longer safe. For example clients blocked in list operations
|
|
* in an instance which turns from primary to replica is unsafe, so this function
|
|
* is called when a primary turns into a replica.
|
|
*
|
|
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
|
|
* it at the same time. */
|
|
void disconnectAllBlockedClients(void) {
|
|
listNode *ln;
|
|
listIter li;
|
|
|
|
listRewind(server.clients, &li);
|
|
while ((ln = listNext(&li))) {
|
|
client *c = listNodeValue(ln);
|
|
|
|
if (c->flag.blocked) {
|
|
/* POSTPONEd clients are an exception, when they'll be unblocked, the
|
|
* command processing will start from scratch, and the command will
|
|
* be either executed or rejected. (unlike LIST blocked clients for
|
|
* which the command is already in progress in a way. */
|
|
if (c->bstate.btype == BLOCKED_POSTPONE) continue;
|
|
|
|
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
|
|
"instance state changed (master -> replica?)");
|
|
c->flag.close_after_reply = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* This function should be called by the server every time a single command,
|
|
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
|
* being called by a client. It handles serving clients blocked in all scenarios
|
|
* where a specific key access requires to block until that key is available.
|
|
*
|
|
* All the keys with at least one client blocked that are signaled as ready
|
|
* are accumulated into the server.ready_keys list. This function will run
|
|
* the list and will serve clients accordingly.
|
|
* Note that the function will iterate again and again (for example as a result of serving BLMOVE
|
|
* we can have new blocking clients to serve because of the PUSH side of BLMOVE.)
|
|
*
|
|
* This function is normally "fair", that is, it will serve clients
|
|
* using a FIFO behavior. However this fairness is violated in certain
|
|
* edge cases, that is, when we have clients blocked at the same time
|
|
* in a sorted set and in a list, for the same key (a very odd thing to
|
|
* do client side, indeed!). Because mismatching clients (blocking for
|
|
* a different type compared to the current key type) are moved in the
|
|
* other side of the linked list. However as long as the key starts to
|
|
* be used only for a single type, like virtually any application will
|
|
* do, the function is already fair. */
|
|
void handleClientsBlockedOnKeys(void) {
|
|
/* In case we are already in the process of unblocking clients we should
|
|
* not make a recursive call, in order to prevent breaking fairness. */
|
|
static int in_handling_blocked_clients = 0;
|
|
if (in_handling_blocked_clients) return;
|
|
in_handling_blocked_clients = 1;
|
|
|
|
/* This function is called only when also_propagate is in its basic state
|
|
* (i.e. not from call(), module context, etc.) */
|
|
serverAssert(server.also_propagate.numops == 0);
|
|
|
|
/* If a command being unblocked causes another command to get unblocked,
|
|
* like a BLMOVE would do, then the new unblocked command will get processed
|
|
* right away rather than wait for later. */
|
|
while (listLength(server.ready_keys) != 0) {
|
|
list *l;
|
|
|
|
/* Point server.ready_keys to a fresh list and save the current one
|
|
* locally. This way as we run the old list we are free to call
|
|
* signalKeyAsReady() that may push new elements in server.ready_keys
|
|
* when handling clients blocked into BLMOVE. */
|
|
l = server.ready_keys;
|
|
server.ready_keys = listCreate();
|
|
|
|
while (listLength(l) != 0) {
|
|
listNode *ln = listFirst(l);
|
|
readyList *rl = ln->value;
|
|
|
|
/* First of all remove this key from db->ready_keys so that
|
|
* we can safely call signalKeyAsReady() against this key. */
|
|
dictDelete(rl->db->ready_keys, rl->key);
|
|
|
|
handleClientsBlockedOnKey(rl);
|
|
|
|
/* Free this item. */
|
|
decrRefCount(rl->key);
|
|
zfree(rl);
|
|
listDelNode(l, ln);
|
|
}
|
|
listRelease(l); /* We have the new list on place at this point. */
|
|
}
|
|
in_handling_blocked_clients = 0;
|
|
}
|
|
|
|
/* Set a client in blocking mode for the specified key, with the specified timeout.
|
|
* The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
|
|
* waiting for an empty key in order to awake the client. The client is blocked
|
|
* for all the 'numkeys' keys as in the 'keys' argument.
|
|
* The client will unblocked as soon as one of the keys in 'keys' value was updated.
|
|
* the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key
|
|
* is updated to become unavailable, either by type change (override), deletion or swapdb */
|
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) {
|
|
dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry;
|
|
list *l;
|
|
int j;
|
|
|
|
if (!c->flag.reprocessing_command) {
|
|
/* If the client is re-processing the command, we do not set the timeout
|
|
* because we need to retain the client's original timeout. */
|
|
c->bstate.timeout = timeout;
|
|
}
|
|
|
|
for (j = 0; j < numkeys; j++) {
|
|
/* If the key already exists in the dictionary ignore it. */
|
|
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys, keys[j], NULL))) {
|
|
continue;
|
|
}
|
|
incrRefCount(keys[j]);
|
|
|
|
/* And in the other "side", to map keys -> clients */
|
|
db_blocked_entry = dictAddRaw(c->db->blocking_keys, keys[j], &db_blocked_existing_entry);
|
|
|
|
/* In case key[j] did not have blocking clients yet, we need to create a new list */
|
|
if (db_blocked_entry != NULL) {
|
|
l = listCreate();
|
|
dictSetVal(c->db->blocking_keys, db_blocked_entry, l);
|
|
incrRefCount(keys[j]);
|
|
} else {
|
|
l = dictGetVal(db_blocked_existing_entry);
|
|
}
|
|
listAddNodeTail(l, c);
|
|
dictSetVal(c->bstate.keys, client_blocked_entry, listLast(l));
|
|
|
|
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
|
|
* wants to be awakened if key is deleted (like XREADGROUP) */
|
|
if (unblock_on_nokey) {
|
|
db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry);
|
|
if (db_blocked_entry) {
|
|
incrRefCount(keys[j]);
|
|
dictSetUnsignedIntegerVal(db_blocked_entry, 1);
|
|
} else {
|
|
dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1);
|
|
}
|
|
}
|
|
}
|
|
c->bstate.unblock_on_nokey = unblock_on_nokey;
|
|
/* Currently we assume key blocking will require reprocessing the command.
|
|
* However in case of modules, they have a different way to handle the reprocessing
|
|
* which does not require setting the pending command flag */
|
|
if (btype != BLOCKED_MODULE) c->flag.pending_command = 1;
|
|
blockClient(c, btype);
|
|
}
|
|
|
|
/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP.
|
|
* Internal function for unblockClient() */
|
|
static void unblockClientWaitingData(client *c) {
|
|
dictEntry *de;
|
|
dictIterator *di;
|
|
|
|
if (dictSize(c->bstate.keys) == 0) return;
|
|
|
|
di = dictGetIterator(c->bstate.keys);
|
|
/* The client may wait for multiple keys, so unblock it for every key. */
|
|
while ((de = dictNext(di)) != NULL) {
|
|
releaseBlockedEntry(c, de, 0);
|
|
}
|
|
dictReleaseIterator(di);
|
|
dictEmpty(c->bstate.keys, NULL);
|
|
}
|
|
|
|
static blocking_type getBlockedTypeByType(int type) {
|
|
switch (type) {
|
|
case OBJ_LIST: return BLOCKED_LIST;
|
|
case OBJ_ZSET: return BLOCKED_ZSET;
|
|
case OBJ_MODULE: return BLOCKED_MODULE;
|
|
case OBJ_STREAM: return BLOCKED_STREAM;
|
|
default: return BLOCKED_NONE;
|
|
}
|
|
}
|
|
|
|
/* If the specified key has clients blocked waiting for list pushes, this
|
|
* function will put the key reference into the server.ready_keys list.
|
|
* Note that db->ready_keys is a hash table that allows us to avoid putting
|
|
* the same key again and again in the list in case of multiple pushes
|
|
* made by a script or in the context of MULTI/EXEC.
|
|
*
|
|
* The list will be finally processed by handleClientsBlockedOnKeys() */
|
|
static void signalKeyAsReadyLogic(serverDb *db, robj *key, int type, int deleted) {
|
|
readyList *rl;
|
|
|
|
/* Quick returns. */
|
|
int btype = getBlockedTypeByType(type);
|
|
if (btype == BLOCKED_NONE) {
|
|
/* The type can never block. */
|
|
return;
|
|
}
|
|
if (!server.blocked_clients_by_type[btype] && !server.blocked_clients_by_type[BLOCKED_MODULE]) {
|
|
/* No clients block on this type. Note: Blocked modules are represented
|
|
* by BLOCKED_MODULE, even if the intention is to wake up by normal
|
|
* types (list, zset, stream), so we need to check that there are no
|
|
* blocked modules before we do a quick return here. */
|
|
return;
|
|
}
|
|
|
|
if (deleted) {
|
|
/* Key deleted and no clients blocking for this key? No need to queue it. */
|
|
if (dictFind(db->blocking_keys_unblock_on_nokey, key) == NULL) return;
|
|
/* Note: if we made it here it means the key is also present in db->blocking_keys */
|
|
} else {
|
|
/* No clients blocking for this key? No need to queue it. */
|
|
if (dictFind(db->blocking_keys, key) == NULL) return;
|
|
}
|
|
|
|
dictEntry *de, *existing;
|
|
de = dictAddRaw(db->ready_keys, key, &existing);
|
|
if (de) {
|
|
/* We add the key in the db->ready_keys dictionary in order
|
|
* to avoid adding it multiple times into a list with a simple O(1)
|
|
* check. */
|
|
incrRefCount(key);
|
|
} else {
|
|
/* Key was already signaled? No need to queue it again. */
|
|
return;
|
|
}
|
|
|
|
/* Ok, we need to queue this key into server.ready_keys. */
|
|
rl = zmalloc(sizeof(*rl));
|
|
rl->key = key;
|
|
rl->db = db;
|
|
incrRefCount(key);
|
|
listAddNodeTail(server.ready_keys, rl);
|
|
}
|
|
|
|
/* Helper function to wrap the logic of removing a client blocked key entry
|
|
* In this case we would like to do the following:
|
|
* 1. unlink the client from the global DB locked client list
|
|
* 2. remove the entry from the global db blocking list in case the list is empty
|
|
* 3. in case the global list is empty, also remove the key from the global dict of keys
|
|
* which should trigger unblock on key deletion
|
|
* 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys,
|
|
* but unblocked when only one of them is triggered, we would like to avoid deleting each key separately
|
|
* and instead clear the dictionary in one-shot. this is why the remove_key argument is provided
|
|
* to support this logic in unblockClientWaitingData
|
|
*/
|
|
static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
|
|
list *l;
|
|
listNode *pos;
|
|
void *key;
|
|
dictEntry *unblock_on_nokey_entry;
|
|
|
|
key = dictGetKey(de);
|
|
pos = dictGetVal(de);
|
|
/* Remove this client from the list of clients waiting for this key. */
|
|
l = dictFetchValue(c->db->blocking_keys, key);
|
|
serverAssertWithInfo(c, key, l != NULL);
|
|
listUnlinkNode(l, pos);
|
|
/* If the list is empty we need to remove it to avoid wasting memory
|
|
* We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict.
|
|
* However, in case the list is not empty, we will have to still perform reference accounting
|
|
* on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference.
|
|
* Why? because it is possible that some more clients are blocked on the same key but without
|
|
* require to be triggered on key deletion, we do not want these to be later triggered by the
|
|
* signalDeletedKeyAsReady. */
|
|
if (listLength(l) == 0) {
|
|
dictDelete(c->db->blocking_keys, key);
|
|
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
|
|
} else if (c->bstate.unblock_on_nokey) {
|
|
unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey, key);
|
|
/* it is not possible to have a client blocked on nokey with no matching entry */
|
|
serverAssertWithInfo(c, key, unblock_on_nokey_entry != NULL);
|
|
if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) {
|
|
/* in case the count is zero, we can delete the entry */
|
|
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
|
|
}
|
|
}
|
|
if (remove_key) dictDelete(c->bstate.keys, key);
|
|
}
|
|
|
|
void signalKeyAsReady(serverDb *db, robj *key, int type) {
|
|
signalKeyAsReadyLogic(db, key, type, 0);
|
|
}
|
|
|
|
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type) {
|
|
signalKeyAsReadyLogic(db, key, type, 1);
|
|
}
|
|
|
|
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
|
* whenever a key is ready. we iterate over all the clients blocked on this key
|
|
* and try to re-execute the command (in case the key is still available). */
|
|
static void handleClientsBlockedOnKey(readyList *rl) {
|
|
/* We serve clients in the same order they blocked for
|
|
* this key, from the first blocked to the last. */
|
|
dictEntry *de = dictFind(rl->db->blocking_keys, rl->key);
|
|
|
|
if (de) {
|
|
list *clients = dictGetVal(de);
|
|
listNode *ln;
|
|
listIter li;
|
|
listRewind(clients, &li);
|
|
|
|
/* Avoid processing more than the initial count so that we're not stuck
|
|
* in an endless loop in case the reprocessing of the command blocks again. */
|
|
long count = listLength(clients);
|
|
while ((ln = listNext(&li)) && count--) {
|
|
client *receiver = listNodeValue(ln);
|
|
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS);
|
|
/* 1. In case new key was added/touched we need to verify it satisfy the
|
|
* blocked type, since we might process the wrong key type.
|
|
* 2. We want to serve clients blocked on module keys
|
|
* regardless of the object type: we don't know what the
|
|
* module is trying to accomplish right now.
|
|
* 3. In case of XREADGROUP call we will want to unblock on any change in object type
|
|
* or in case the key was deleted, since the group is no longer valid. */
|
|
if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
|
|
(o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || (receiver->bstate.unblock_on_nokey)) {
|
|
if (receiver->bstate.btype != BLOCKED_MODULE)
|
|
unblockClientOnKey(receiver, rl->key);
|
|
else
|
|
moduleUnblockClientOnKey(receiver, rl->key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* block a client for replica acknowledgement */
|
|
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
|
|
c->bstate.timeout = timeout;
|
|
c->bstate.reploffset = offset;
|
|
c->bstate.numreplicas = numreplicas;
|
|
c->bstate.numlocal = numlocal;
|
|
listAddNodeHead(server.clients_waiting_acks, c);
|
|
blockClient(c, BLOCKED_WAIT);
|
|
}
|
|
|
|
/* Postpone client from executing a command. For example the server might be busy
|
|
* requesting to avoid processing clients commands which will be processed later
|
|
* when the it is ready to accept them. */
|
|
void blockPostponeClient(client *c) {
|
|
c->bstate.timeout = 0;
|
|
blockClient(c, BLOCKED_POSTPONE);
|
|
listAddNodeTail(server.postponed_clients, c);
|
|
c->postponed_list_node = listLast(server.postponed_clients);
|
|
/* Mark this client to execute its command */
|
|
c->flag.pending_command = 1;
|
|
}
|
|
|
|
/* Block client due to shutdown command */
|
|
void blockClientShutdown(client *c) {
|
|
blockClient(c, BLOCKED_SHUTDOWN);
|
|
}
|
|
|
|
/* Unblock a client once a specific key became available for it.
|
|
* This function will remove the client from the list of clients blocked on this key
|
|
* and also remove the key from the dictionary of keys this client is blocked on.
|
|
* in case the client has a command pending it will process it immediately. */
|
|
static void unblockClientOnKey(client *c, robj *key) {
|
|
dictEntry *de;
|
|
|
|
de = dictFind(c->bstate.keys, key);
|
|
releaseBlockedEntry(c, de, 1);
|
|
|
|
/* Only in case of blocking API calls, we might be blocked on several keys.
|
|
however we should force unblock the entire blocking keys */
|
|
serverAssert(c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_LIST ||
|
|
c->bstate.btype == BLOCKED_ZSET);
|
|
|
|
/* We need to unblock the client before calling processCommandAndResetClient
|
|
* because it checks the CLIENT_BLOCKED flag */
|
|
unblockClient(c, 0);
|
|
/* In case this client was blocked on keys during command
|
|
* we need to re process the command again */
|
|
if (c->flag.pending_command) {
|
|
c->flag.pending_command = 0;
|
|
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
|
|
* to run atomically, this is why we must enter the execution unit here before
|
|
* running the command, and exit the execution unit after calling the unblock handler (if exists).
|
|
* Notice that we also must set the current client so it will be available
|
|
* when we will try to send the client side caching notification (done on 'afterCommand'). */
|
|
client *old_client = server.current_client;
|
|
server.current_client = c;
|
|
enterExecutionUnit(1, 0);
|
|
processCommandAndResetClient(c);
|
|
if (!c->flag.blocked) {
|
|
if (c->flag.module) {
|
|
moduleCallCommandUnblockedHandler(c);
|
|
} else {
|
|
queueClientForReprocessing(c);
|
|
}
|
|
}
|
|
exitExecutionUnit();
|
|
afterCommand(c);
|
|
server.current_client = old_client;
|
|
}
|
|
}
|
|
|
|
/* Unblock a client blocked on the specific key from module context.
|
|
* This function will try to serve the module call, and in case it succeeds,
|
|
* it will add the client to the list of module unblocked clients which will
|
|
* be processed in moduleHandleBlockedClients. */
|
|
static void moduleUnblockClientOnKey(client *c, robj *key) {
|
|
long long prev_error_replies = server.stat_total_error_replies;
|
|
client *old_client = server.current_client;
|
|
server.current_client = c;
|
|
monotime replyTimer;
|
|
elapsedStart(&replyTimer);
|
|
|
|
if (moduleTryServeClientBlockedOnKey(c, key)) {
|
|
updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
|
|
moduleUnblockClient(c);
|
|
}
|
|
/* We need to call afterCommand even if the client was not unblocked
|
|
* in order to propagate any changes that could have been done inside
|
|
* moduleTryServeClientBlockedOnKey */
|
|
afterCommand(c);
|
|
server.current_client = old_client;
|
|
}
|
|
|
|
/* Unblock a client which is currently Blocked on and provided a timeout.
|
|
* The implementation will first reply to the blocked client with null response
|
|
* or, in case of module blocked client the timeout callback will be used.
|
|
* In this case since we might have a command pending
|
|
* we want to remove the pending flag to indicate we already responded to the
|
|
* command with timeout reply. */
|
|
void unblockClientOnTimeout(client *c) {
|
|
/* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
|
|
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
|
|
|
|
replyToBlockedClientTimedOut(c);
|
|
if (c->flag.pending_command) c->flag.pending_command = 0;
|
|
unblockClient(c, 1);
|
|
}
|
|
|
|
/* Unblock a client which is currently Blocked with error.
|
|
* If err_str is provided it will be used to reply to the blocked client */
|
|
void unblockClientOnError(client *c, const char *err_str) {
|
|
if (err_str) addReplyError(c, err_str);
|
|
updateStatsOnUnblock(c, 0, 0, 1);
|
|
if (c->flag.pending_command) c->flag.pending_command = 0;
|
|
unblockClient(c, 1);
|
|
}
|
|
|
|
void blockedBeforeSleep(void) {
|
|
/* Handle precise timeouts of blocked clients. */
|
|
handleBlockedClientsTimeout();
|
|
|
|
/* Unblock all the clients blocked for synchronous replication
|
|
* in WAIT or WAITAOF. */
|
|
if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas();
|
|
|
|
/* Try to process blocked clients every once in while.
|
|
*
|
|
* Example: A module calls RM_SignalKeyAsReady from within a timer callback
|
|
* (So we don't visit processCommand() at all).
|
|
*
|
|
* This may unblock clients, so must be done before processUnblockedClients */
|
|
handleClientsBlockedOnKeys();
|
|
|
|
/* Check if there are clients unblocked by modules that implement
|
|
* blocking commands. */
|
|
if (moduleCount()) moduleHandleBlockedClients();
|
|
|
|
/* Try to process pending commands for clients that were just unblocked. */
|
|
if (listLength(server.unblocked_clients)) processUnblockedClients();
|
|
}
|