diff --git a/src/blocked.c b/src/blocked.c index b935ab9e0..ff38f70b3 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -65,23 +65,21 @@ #include "latency.h" #include "monotonic.h" -void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted); -int getListPositionFromObjectOrReply(client *c, robj *arg, int *position); +/* forward declarations */ +static void unblockClientWaitingData(client *c); +static void handleClientsBlockedOnKey(readyList *rl); +static void unblockClientOnKey(client *c, robj *key); +static void moduleUnblockClientOnKey(client *c, robj *key); +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); -/* This structure represents the blocked key information that we store - * in the client structure. Each client blocked on keys, has a - * client->bpop.keys hash table. The keys of the hash table are Redis - * keys pointers to 'robj' structures. The value is this structure. - * The structure has two goals: firstly we store the list node that this - * client uses to be listed in the database "blocked clients for this key" - * list, so we can later unblock in O(1) without a list scan. - * Secondly for certain blocking types, we have additional info. Right now - * the only use for additional info we have is when clients are blocked - * on streams, as we have to remember the ID it blocked for. */ -typedef struct bkinfo { - listNode *listnode; /* List node for db->blocking_keys[key] list. */ - streamID stream_id; /* Stream ID if we blocked in a stream. */ -} bkinfo; +void initClientBlockingState(client *c) { + c->bstate.btype = BLOCKED_NONE; + c->bstate.timeout = 0; + c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); + c->bstate.numreplicas = 0; + c->bstate.reploffset = 0; + c->bstate.unblock_on_nokey = 0; +} /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, @@ -93,24 +91,22 @@ void blockClient(client *c, int btype) { btype != BLOCKED_POSTPONE)); c->flags |= CLIENT_BLOCKED; - c->btype = btype; + c->bstate.btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); - if (btype == BLOCKED_POSTPONE) { - listAddNodeTail(server.postponed_clients, c); - c->postponed_list_node = listLast(server.postponed_clients); - /* Mark this client to execute its command */ - c->flags |= CLIENT_PENDING_COMMAND; - } } -/* This function is called after a client has finished a blocking operation - * in order to update the total command duration, log the command into - * the Slow log if needed, and log the reply duration event if needed. */ +/* Usually when a client is unblocked due to being blocked while processing some command + * he will attempt to reprocess the command which will update the statistics. + * However in case the client was timed out or in case of module blocked client is being unblocked + * the command will not be reprocessed and we need to make stats update. + * This function will make updates to the commandstats, slowlog and monitors.*/ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; c->lastcmd->microseconds += total_cmd_duration; + c->lastcmd->calls++; + server.stat_numcommands++; if (had_errors) c->lastcmd->failed_calls++; if (server.latency_tracking_enabled) @@ -177,29 +173,27 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->btype == BLOCKED_WAIT) { + } else if (c->bstate.btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); - } else if (c->btype == BLOCKED_MODULE) { + } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); - } else if (c->btype == BLOCKED_POSTPONE) { + } else if (c->bstate.btype == BLOCKED_POSTPONE) { listDelNode(server.postponed_clients,c->postponed_list_node); c->postponed_list_node = NULL; - } else if (c->btype == BLOCKED_SHUTDOWN) { + } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ } else { serverPanic("Unknown btype in unblockClient()."); } - /* Reset the client for a new query since, for blocking commands - * we do not do it immediately after the command returns (when the - * client got blocked) in order to be still able to access the argument - * vector from module callbacks and updateStatsOnUnblock. */ - if (c->btype != BLOCKED_POSTPONE && c->btype != BLOCKED_SHUTDOWN) { + /* Reset the client for a new query, unless the client has pending command to process + * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { freeClientOriginalArgv(c); resetClient(c); } @@ -207,9 +201,10 @@ void unblockClient(client *c) { /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ server.blocked_clients--; - server.blocked_clients_by_type[c->btype]--; + server.blocked_clients_by_type[c->bstate.btype]--; c->flags &= ~CLIENT_BLOCKED; - c->btype = BLOCKED_NONE; + c->bstate.btype = BLOCKED_NONE; + c->bstate.unblock_on_nokey = 0; removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } @@ -218,13 +213,14 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { addReplyNullArray(c); - } else if (c->btype == BLOCKED_WAIT) { - addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); - } else if (c->btype == BLOCKED_MODULE) { + updateStatsOnUnblock(c, 0, 0, 0); + } else if (c->bstate.btype == BLOCKED_WAIT) { + addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); @@ -240,7 +236,7 @@ void replyToClientsBlockedOnShutdown(void) { listRewind(server.clients, &li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) { + if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); unblockClient(c); } @@ -267,335 +263,29 @@ void disconnectAllBlockedClients(void) { * command processing will start from scratch, and the command will * be either executed or rejected. (unlike LIST blocked clients for * which the command is already in progress in a way. */ - if (c->btype == BLOCKED_POSTPONE) + if (c->bstate.btype == BLOCKED_POSTPONE) continue; - addReplyError(c, + unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> replica?)"); - unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } } -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a list key, and there may be new - * data to fetch (the key is ready). */ -void serveClientsBlockedOnListKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_LIST, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_LIST]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_LIST) continue; - - int deleted = 0; - robj *dstkey = receiver->bpop.target; - int wherefrom = receiver->bpop.blockpos.wherefrom; - int whereto = receiver->bpop.blockpos.whereto; - - /* Protect receiver->bpop.target, that will be - * freed by the next unblockClient() - * call. */ - if (dstkey) incrRefCount(dstkey); - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - serveClientBlockedOnList(receiver, o, - rl->key, dstkey, rl->db, - wherefrom, whereto, - &deleted); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - - if (dstkey) decrRefCount(dstkey); - - /* The list is empty and has been deleted. */ - if (deleted) break; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a sorted set key, and there may be new - * data to fetch (the key is ready). */ -void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_ZSET, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_ZSET) continue; - - int deleted = 0; - long llen = zsetLength(o); - long count = receiver->bpop.count; - int where = receiver->bpop.blockpos.wherefrom; - int use_nested_array = (receiver->lastcmd && - receiver->lastcmd->proc == bzmpopCommand) - ? 1 : 0; - int reply_nil_when_empty = use_nested_array; - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); - - /* Replicate the command. */ - int argc = 2; - robj *argv[3]; - argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax; - argv[1] = rl->key; - incrRefCount(rl->key); - if (count != -1) { - /* Replicate it as command with COUNT. */ - robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); - argv[2] = count_obj; - argc++; - } - alsoPropagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(argv[1]); - if (count != -1) decrRefCount(argv[2]); - - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - - /* The zset is empty and has been deleted. */ - if (deleted) break; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a stream key, and there may be new - * data to fetch (the key is ready). - * This function also handles the case where there may be clients blocked, - * via XREADGROUP, on an existing stream which was deleted. - * We need to unblock the clients in that case. - * The idea is that a client that is blocked via XREADGROUP is different from - * any other blocking type in the sense that it depends on the existence of both - * the key and the group. Even if the key is deleted and then revived with XADD - * it won't help any clients blocked on XREADGROUP because the group no longer - * exist, so they would fail with -NOGROUP anyway. - * The conclusion is that it's better to unblock these client (with error) upon - * the deletion of the key, rather than waiting for the first XADD.*/ -void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - /* This function may be called with o=NULL (in order to unblock - * XREADGROUP clients whose key was deleted) */ - stream *s = o? o->ptr : NULL; - - /* We need to provide the new data arrived on the stream - * to all the clients that are waiting for an offset smaller - * than the current top item. */ - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_STREAM) continue; - bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); - streamID *gt = &bki->stream_id; - - if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) { - /* If it's a blocking XREAD and the stream was either deleted - * or replaced with another key, we don't do anything (it's ok - * the the client blocks on a non-existing key). */ - continue; - } - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - - /* If we blocked in the context of a consumer - * group, we need to resolve the group and update the - * last ID the client is blocked for: this is needed - * because serving other clients in the same consumer - * group will alter the "last ID" of the consumer - * group, and clients blocked in a consumer group are - * always blocked for the ">" ID: we need to deliver - * only new messages and avoid unblocking the client - * otherwise. */ - streamCG *group = NULL; - if (receiver->bpop.xread_group) { - /* If it's a blocking XREADGROUP and the stream was either deleted - * or replaced with another key, we unblock the client */ - if (!o || o->type != OBJ_STREAM) { - addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); - goto unblock_receiver; - } - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* If the group was not found, send an error - * to the consumer. */ - if (!group) { - addReplyError(receiver, - "-NOGROUP the consumer group this client " - "was blocked on no longer exists"); - goto unblock_receiver; - } else { - *gt = group->last_id; - } - } - - if (streamCompareID(&s->last_id, gt) > 0) { - streamID start = *gt; - streamIncrID(&start); - - /* Lookup the consumer for the group, if any. */ - streamConsumer *consumer = NULL; - int noack = 0; - - if (group) { - noack = receiver->bpop.xread_group_noack; - sds name = receiver->bpop.xread_consumer->ptr; - consumer = streamLookupConsumer(group,name); - if (consumer == NULL) { - consumer = streamCreateConsumer(group,name,rl->key, - rl->db->id,SCC_DEFAULT); - if (noack) { - streamPropagateConsumerCreation(receiver,rl->key, - receiver->bpop.xread_group, - consumer->name); - } - } - } - - /* Emit the two elements sub-array consisting of - * the name of the stream and the data we - * extracted from it. Wrapped in a single-item - * array, since we have just one key. */ - if (receiver->resp == 2) { - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); - } else { - addReplyMapLen(receiver,1); - } - addReplyBulk(receiver,rl->key); - - streamPropInfo pi = { - rl->key, - receiver->bpop.xread_group - }; - streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count, - 0, group, consumer, noack, &pi); - /* Note that after we unblock the client, 'gt' - * and other receiver->bpop stuff are no longer - * valid, so we must do the setup above before - * the unblockClient call. */ - -unblock_receiver: - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - } - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * in order to check if we can serve clients blocked by modules using - * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: - * our goal here is to call the RedisModuleBlockedClient reply() callback to - * see if the key is really able to serve the client, and in that case, - * unblock it. */ -void serveClientsBlockedOnKeyByModule(readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_MODULE, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_MODULE) continue; - - /* Note that if *this* client cannot be served by this key, - * it does not mean that another client that is next into the - * list cannot be served as well: they may be blocked by - * different modules with different triggers to consider if a key - * is ready or not. This means we can't exit the loop but need - * to continue after the first failure. */ - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) { - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - moduleUnblockClient(receiver); - } - /* We need to call afterCommand even if the client was not unblocked - * in order to propagate any changes that could have been done inside - * moduleTryServeClientBlockedOnKey */ - afterCommand(receiver); - server.current_client = old_client; - } - } -} - /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after - * being called by a client. It handles serving clients blocked in - * lists, streams, and sorted sets, via a blocking commands. + * being called by a client. It handles serving clients blocked in all scenarios + * where a specific key access requires to block until that key is available. * - * All the keys with at least one client blocked that received at least - * one new element via some write operation are accumulated into - * the server.ready_keys list. This function will run the list and will - * serve clients accordingly. Note that the function will iterate again and - * again as a result of serving BLMOVE we can have new blocking clients - * to serve because of the PUSH side of BLMOVE. + * All the keys with at least one client blocked that are signaled as ready + * are accumulated into the server.ready_keys list. This function will run + * the list and will serve clients accordingly. + * Note that the function will iterate again and again (for example as a result of serving BLMOVE + * we can have new blocking clients to serve because of the PUSH side of BLMOVE.) * - * This function is normally "fair", that is, it will server clients + * This function is normally "fair", that is, it will serve clients * using a FIFO behavior. However this fairness is violated in certain * edge cases, that is, when we have clients blocked at the same time * in a sorted set and in a list, for the same key (a very odd thing to @@ -605,6 +295,14 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * be used only for a single type, like virtually any Redis application will * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { + + /* In case we are already in the process of unblocking clients we should + * not make a recursive call, in order to prevent breaking fairness. */ + static int in_handling_blocked_clients = 0; + if (in_handling_blocked_clients) + return; + in_handling_blocked_clients = 1; + /* This function is called only when also_propagate is in its basic state * (i.e. not from call(), module context, etc.) */ serverAssert(server.also_propagate.numops == 0); @@ -627,39 +325,7 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); - updateCachedTime(0); - - /* Serve clients blocked on the key. */ - robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); - if (!o) { - /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to - * take care of the propagation here, because afterCommand wasn't called */ - postExecutionUnitOperations(); - } else { - if (o->type == OBJ_LIST) - serveClientsBlockedOnListKey(o,rl); - else if (o->type == OBJ_ZSET) - serveClientsBlockedOnSortedSetKey(o,rl); - } - /* We need to try to serve stream clients even if the key no longer exists because - * XREADGROUP clients need to be unblocked in case the key is missing, either deleted - * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}. - * In this case we need to unblock all these clients. */ - serveClientsBlockedOnStreamKey(o,rl); - /* We want to serve clients blocked on module keys regardless of the object type, or - * whether the object exists or not: we don't know what the module is trying to - * accomplish right now. - * Please note that this function must be called only after handling non-module - * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o` - * to be stale. - * The scenario is that we have one client blocked on BLPOP while another module - * client is blocked by MODULE.SAME-AS-BLPOP on the same key. - * Of course we can call lookupKeyReadWithFlags again, but: - * 1) It takes CPU - * 2) It makes more sense to give priority to "native" blocking clients rather - * than module blocking clients - * */ - serveClientsBlockedOnKeyByModule(rl); + handleClientsBlockedOnKey(rl); /* Free this item. */ decrRefCount(rl->key); @@ -668,131 +334,80 @@ void handleClientsBlockedOnKeys(void) { } listRelease(l); /* We have the new list on place at this point. */ } + in_handling_blocked_clients = 0; } -/* This is how the current blocking lists/sorted sets/streams work, we use - * BLPOP as example, but the concept is the same for other list ops, sorted - * sets and XREAD. - * - If the user calls BLPOP and the key exists and contains a non empty list - * then LPOP is called instead. So BLPOP is semantically the same as LPOP - * if blocking is not required. - * - If instead BLPOP is called and the key does not exists or the list is - * empty we need to block. In order to do so we remove the notification for - * new data to read in the client socket (so that we'll not serve new - * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blocking_keys) mapping keys to a list of clients - * blocking for this keys. - * - If a PUSH operation against a key with blocked clients waiting is - * performed, we mark this key as "ready", and after the current command, - * MULTI/EXEC block, or script, is executed, we serve all the clients waiting - * for this list, from the one that blocked first, to the last, accordingly - * to the number of elements we have in the ready list. - */ - -/* Set a client in blocking mode for the specified key (list, zset or stream), - * with the specified timeout. The 'type' argument is BLOCKED_LIST, - * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are +/* Set a client in blocking mode for the specified key, with the specified timeout. + * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are * waiting for an empty key in order to awake the client. The client is blocked - * for all the 'numkeys' keys as in the 'keys' argument. When we block for - * stream keys, we also provide an array of streamID structures: clients will - * be unblocked only when items with an ID greater or equal to the specified - * one is appended to the stream. - * - * 'count' for those commands that support the optional count argument. - * Otherwise the value is 0. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) { + * for all the 'numkeys' keys as in the 'keys' argument. + * The client will unblocked as soon as one of the keys in 'keys' value was updated. + * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key + * is updated to become unavailable, either by type change (override), deletion or swapdb */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) { + dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry; list *l; int j; - c->bpop.count = count; - c->bpop.timeout = timeout; - c->bpop.target = target; - - if (blockpos != NULL) c->bpop.blockpos = *blockpos; - - if (target != NULL) incrRefCount(target); - + c->bstate.timeout = timeout; for (j = 0; j < numkeys; j++) { - /* Allocate our bkinfo structure, associated to each key the client - * is blocked for. */ - bkinfo *bki = zmalloc(sizeof(*bki)); - if (btype == BLOCKED_STREAM) - bki->stream_id = ids[j]; - /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { - zfree(bki); + if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { continue; } incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - dictEntry *de, *existing; - de = dictAddRaw(c->db->blocking_keys, keys[j], &existing); - if (de) { - incrRefCount(keys[j]); - /* For every key we take a list of clients blocked for it */ + db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry); + + /* In case key[j] did not have blocking clients yet, we need to create a new list */ + if (db_blocked_entry != NULL) { l = listCreate(); - dictSetVal(c->db->blocking_keys, de, l); + dictSetVal(c->db->blocking_keys, db_blocked_entry, l); + incrRefCount(keys[j]); } else { - l = dictGetVal(existing); + l = dictGetVal(db_blocked_existing_entry); } listAddNodeTail(l,c); - bki->listnode = listLast(l); + dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); + /* We need to add the key to blocking_keys_unblock_on_nokey, if the client * wants to be awakened if key is deleted (like XREADGROUP) */ if (unblock_on_nokey) { - de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL); - if (de) { + db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry); + if (db_blocked_entry) { incrRefCount(keys[j]); + dictSetUnsignedIntegerVal(db_blocked_entry, 1); + } else { + dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1); } } } + c->bstate.unblock_on_nokey = unblock_on_nokey; + c->flags |= CLIENT_PENDING_COMMAND; blockClient(c,btype); } -/* Unblock a client that's waiting in a blocking operation such as BLPOP. - * You should never call this function directly, but unblockClient() instead. */ -void unblockClientWaitingData(client *c) { +/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP. + * Internal function for unblockClient() */ +static void unblockClientWaitingData(client *c) { dictEntry *de; dictIterator *di; - list *l; - serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); - di = dictGetIterator(c->bpop.keys); + if (dictSize(c->bstate.keys) == 0) + return; + + di = dictGetIterator(c->bstate.keys); /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetKey(de); - bkinfo *bki = dictGetVal(de); - - /* Remove this client from the list of clients waiting for this key. */ - l = dictFetchValue(c->db->blocking_keys,key); - serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,bki->listnode); - /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) { - dictDelete(c->db->blocking_keys,key); - dictDelete(c->db->blocking_keys_unblock_on_nokey,key); - } + releaseBlockedEntry(c, de, 0); } dictReleaseIterator(di); - - /* Cleanup the client structure */ - dictEmpty(c->bpop.keys,NULL); - if (c->bpop.target) { - decrRefCount(c->bpop.target); - c->bpop.target = NULL; - } - if (c->bpop.xread_group) { - decrRefCount(c->bpop.xread_group); - decrRefCount(c->bpop.xread_consumer); - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; - } + dictEmpty(c->bstate.keys, NULL); } -static int getBlockedTypeByType(int type) { +static blocking_type getBlockedTypeByType(int type) { switch (type) { case OBJ_LIST: return BLOCKED_LIST; case OBJ_ZSET: return BLOCKED_ZSET; @@ -858,6 +473,52 @@ static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) listAddNodeTail(server.ready_keys,rl); } +/* Helper function to wrap the logic of removing a client blocked key entry + * In this case we would like to do the following: + * 1. unlink the client from the global DB locked client list + * 2. remove the entry from the global db blocking list in case the list is empty + * 3. in case the global list is empty, also remove the key from the global dict of keys + * which should trigger unblock on key deletion + * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys, + * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately + * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided + * to support this logic in unblockClientWaitingData + */ +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { + list *l; + listNode *pos; + void *key; + dictEntry *unblock_on_nokey_entry; + + key = dictGetKey(de); + pos = dictGetVal(de); + /* Remove this client from the list of clients waiting for this key. */ + l = dictFetchValue(c->db->blocking_keys, key); + serverAssertWithInfo(c,key,l != NULL); + listUnlinkNode(l,pos); + /* If the list is empty we need to remove it to avoid wasting memory + * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict. + * However, in case the list is not empty, we will have to still perform reference accounting + * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference. + * Why? because it is possible that some more clients are blocked on the same key but without + * require to be triggered on key deletion, we do not want these to be later triggered by the + * signalDeletedKeyAsReady. */ + if (listLength(l) == 0) { + dictDelete(c->db->blocking_keys, key); + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } else if (c->bstate.unblock_on_nokey) { + unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key); + /* it is not possible to have a client blocked on nokey with no matching entry */ + serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL); + if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) { + /* in case the count is zero, we can delete the entry */ + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } + } + if (remove_key) + dictDelete(c->bstate.keys, key); +} + void signalKeyAsReady(redisDb *db, robj *key, int type) { signalKeyAsReadyLogic(db, key, type, 0); } @@ -865,3 +526,156 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { signalKeyAsReadyLogic(db, key, type, 1); } + +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * whenever a key is ready. we iterate over all the clients blocked on this key + * and try to re-execute the command (in case the key is still available). */ +static void handleClientsBlockedOnKey(readyList *rl) { + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOKKUP_NOEFFECTS); + /* 1. In case new key was added/touched we need to verify it satisfy the + * blocked type, since we might process the wrong key type. + * 2. We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. + * 3. In case of XREADGROUP call we will want to unblock on any change in object type + * or in case the key was deleted, since the group is no longer valid. */ + if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) || + (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || + (receiver->bstate.unblock_on_nokey)) + { + if (receiver->bstate.btype != BLOCKED_MODULE) + unblockClientOnKey(receiver, rl->key); + else + moduleUnblockClientOnKey(receiver, rl->key); + } + } + } +} + +/* block a client due to wait command */ +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { + c->bstate.timeout = timeout; + c->bstate.reploffset = offset; + c->bstate.numreplicas = numreplicas; + listAddNodeHead(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAIT); +} + +/* Postpone client from executing a command. For example the server might be busy + * requesting to avoid processing clients commands which will be processed later + * when the it is ready to accept them. */ +void blockPostponeClient(client *c) { + c->bstate.timeout = 0; + blockClient(c,BLOCKED_POSTPONE); + listAddNodeTail(server.postponed_clients, c); + c->postponed_list_node = listLast(server.postponed_clients); + /* Mark this client to execute its command */ + c->flags |= CLIENT_PENDING_COMMAND; +} + +/* Block client due to shutdown command */ +void blockClientShutdown(client *c) { + blockClient(c, BLOCKED_SHUTDOWN); + /* Mark this client to execute its command */ +} + +/* Unblock a client once a specific key became available for it. + * This function will remove the client from the list of clients blocked on this key + * and also remove the key from the dictionary of keys this client is blocked on. + * in case the client has a command pending it will process it immediately. */ +static void unblockClientOnKey(client *c, robj *key) { + dictEntry *de; + + de = dictFind(c->bstate.keys, key); + releaseBlockedEntry(c, de, 1); + + /* Only in case of blocking API calls, we might be blocked on several keys. + however we should force unblock the entire blocking keys */ + serverAssert(c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET); + + unblockClient(c); + /* In case this client was blocked on keys during command + * we need to re process the command again */ + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~CLIENT_PENDING_COMMAND; + processCommandAndResetClient(c); + } +} + +/* Unblock a client blocked on the specific key from module context. + * This function will try to serve the module call, and in case it succeeds, + * it will add the client to the list of module unblocked clients which will + * be processed in moduleHandleBlockedClients. */ +static void moduleUnblockClientOnKey(client *c, robj *key) { + long long prev_error_replies = server.stat_total_error_replies; + client *old_client = server.current_client; + server.current_client = c; + monotime replyTimer; + elapsedStart(&replyTimer); + + if (moduleTryServeClientBlockedOnKey(c, key)) { + updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + moduleUnblockClient(c); + } + /* We need to call afterCommand even if the client was not unblocked + * in order to propagate any changes that could have been done inside + * moduleTryServeClientBlockedOnKey */ + afterCommand(c); + server.current_client = old_client; +} + +/* Unblock a client which is currently Blocked on and provided a timeout. + * The implementation will first reply to the blocked client with null response + * or, in case of module blocked client the timeout callback will be used. + * In this case since we might have a command pending + * we want to remove the pending flag to indicate we already responded to the + * command with timeout reply. */ +void unblockClientOnTimeout(client *c) { + replyToBlockedClientTimedOut(c); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c); +} + +/* Unblock a client which is currently Blocked with error. + * If err_str is provided it will be used to reply to the blocked client */ +void unblockClientOnError(client *c, const char *err_str) { + if (err_str) + addReplyError(c, err_str); + updateStatsOnUnblock(c, 0, 0, 1); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c); +} + +/* sets blocking_keys to the total number of keys which has at least one client blocked on them + * sets blocking_keys_on_nokey to the total number of keys which has at least one client + * blocked on them to be written or deleted */ +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) { + unsigned long bkeys=0, bkeys_on_nokey=0; + for (int j = 0; j < server.dbnum; j++) { + bkeys += dictSize(server.db[j].blocking_keys); + bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); + } + if (blocking_keys) + *blocking_keys = bkeys; + if (bloking_keys_on_nokey) + *bloking_keys_on_nokey = bkeys_on_nokey; +} diff --git a/src/cluster.c b/src/cluster.c index 7414f830f..10be81930 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -7215,10 +7215,10 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { if (c->flags & CLIENT_BLOCKED && - (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM || - c->btype == BLOCKED_MODULE)) + (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_MODULE)) { dictEntry *de; dictIterator *di; @@ -7234,11 +7234,11 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { /* If the client is blocked on module, but not on a specific key, * don't unblock it (except for the CLUSTER_FAIL case above). */ - if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) + if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0; /* All keys must belong to the same slot, so check first key only. */ - di = dictGetIterator(c->bpop.keys); + di = dictGetIterator(c->bstate.keys); if ((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); diff --git a/src/db.c b/src/db.c index e82402125..aad822630 100644 --- a/src/db.c +++ b/src/db.c @@ -1147,7 +1147,7 @@ void shutdownCommand(client *c) { return; } - blockClient(c, BLOCKED_SHUTDOWN); + blockClientShutdown(c); if (prepareForShutdown(flags) == C_OK) exit(0); /* If we're here, then shutdown is ongoing (the client is still blocked) or * failed (the client has received an error). */ diff --git a/src/dict.h b/src/dict.h index c818d6d4d..16f576ffd 100644 --- a/src/dict.h +++ b/src/dict.h @@ -131,6 +131,15 @@ typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref); #define dictSetDoubleVal(entry, _val_) \ do { (entry)->v.d = _val_; } while(0) +#define dictIncrSignedIntegerVal(entry, _val_) \ + ((entry)->v.s64 += _val_) + +#define dictIncrUnsignedIntegerVal(entry, _val_) \ + ((entry)->v.u64 += _val_) + +#define dictIncrDoubleVal(entry, _val_) \ + ((entry)->v.d += _val_) + #define dictFreeKey(d, entry) \ if ((d)->type->keyDestructor) \ (d)->type->keyDestructor((d), (entry)->key) diff --git a/src/module.c b/src/module.c index 7fbc1d52b..034105d3e 100644 --- a/src/module.c +++ b/src/module.c @@ -6158,7 +6158,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch server.replication_allowed = replicate && server.replication_allowed; /* Run the command */ - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_FROM_MODULE; + int call_flags = CMD_CALL_FROM_MODULE; if (replicate) { if (!(flags & REDISMODULE_ARGV_NO_AOF)) call_flags |= CMD_CALL_PROPAGATE_AOF; @@ -7282,7 +7282,7 @@ void RM_LatencyAddSample(const char *event, mstime_t latency) { * The structure RedisModuleBlockedClient will be always deallocated when * running the list of clients blocked by a module that need to be unblocked. */ void unblockClientFromModule(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Call the disconnection callback if any. Note that * bc->disconnect_callback is set to NULL if the client gets disconnected @@ -7346,8 +7346,8 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int islua = scriptIsRunning(); int ismulti = server.in_exec; - c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; ctx->module->blocked_clients++; /* We need to handle the invalid operation of calling modules blocking @@ -7371,16 +7371,16 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->unblocked = 0; bc->background_timer = 0; bc->background_duration = 0; - c->bpop.timeout = timeout; + c->bstate.timeout = timeout; if (islua || ismulti) { - c->bpop.module_blocked_handle = NULL; + c->bstate.module_blocked_handle = NULL; addReplyError(c, islua ? "Blocking module command called from Lua script" : "Blocking module command called from transaction"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { blockClient(c,BLOCKED_MODULE); } @@ -7397,7 +7397,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() @@ -7566,14 +7566,14 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { /* This API is used by the Redis core to unblock a client that was blocked * by a module. */ void moduleUnblockClient(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; moduleUnblockClientByHandle(bc,NULL); } /* Return true if the client 'c' was blocked by a module using * RM_BlockClientOnKeys(). */ int moduleClientIsBlockedOnKeys(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; return bc->blocked_on_keys; } @@ -7740,10 +7740,10 @@ void moduleHandleBlockedClients(void) { * moduleBlockedClientTimedOut(). */ int moduleBlockedClientMayTimeout(client *c) { - if (c->btype != BLOCKED_MODULE) + if (c->bstate.btype != BLOCKED_MODULE) return 1; - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; return (bc && bc->timeout_callback != NULL); } @@ -7752,7 +7752,7 @@ int moduleBlockedClientMayTimeout(client *c) { * does not need to do any cleanup. Eventually the module will call the * API to unblock the client and the memory will be released. */ void moduleBlockedClientTimedOut(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() @@ -7767,9 +7767,8 @@ void moduleBlockedClientTimedOut(client *c) { long long prev_error_replies = server.stat_total_error_replies; bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); - if (!bc->blocked_on_keys) { - updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); - } + updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); + /* For timeout events, we do not want to call the disconnect callback, * because the blocked client will be automatically disconnected in * this case, and the user can still hook using the timeout callback. */ diff --git a/src/networking.c b/src/networking.c index 054bca6a0..ed4f88581 100644 --- a/src/networking.c +++ b/src/networking.c @@ -165,6 +165,7 @@ client *createClient(connection *conn) { c->flags = 0; c->slot = -1; c->ctime = c->lastinteraction = server.unixtime; + c->duration = 0; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; @@ -184,15 +185,7 @@ client *createClient(connection *conn) { c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); - c->btype = BLOCKED_NONE; - c->bpop.timeout = 0; - c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType); - c->bpop.target = NULL; - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; - c->bpop.xread_group_noack = 0; - c->bpop.numreplicas = 0; - c->bpop.reploffset = 0; + initClientBlockingState(c); c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); @@ -1588,7 +1581,7 @@ void freeClient(client *c) { /* Deallocate structures used to block on blocking ops. */ if (c->flags & CLIENT_BLOCKED) unblockClient(c); - dictRelease(c->bpop.keys); + dictRelease(c->bstate.keys); /* UNWATCH all the keys */ unwatchAllKeys(c); @@ -2033,6 +2026,8 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; c->slot = -1; + c->duration = 0; + c->flags &= ~CLIENT_EXECUTING_COMMAND; if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -3142,12 +3137,11 @@ NULL * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) { if (unblock_error) - addReplyError(target, + unblockClientOnError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else - replyToBlockedClientTimedOut(target); - unblockClient(target); - updateStatsOnUnblock(target, 0, 0, 1); + unblockClientOnTimeout(target); + addReply(c,shared.cone); } else { addReply(c,shared.czero); diff --git a/src/replication.c b/src/replication.c index 1b2ef6731..55636fd77 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3480,11 +3480,7 @@ void waitCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - c->bpop.timeout = timeout; - c->bpop.reploffset = offset; - c->bpop.numreplicas = numreplicas; - listAddNodeHead(server.clients_waiting_acks,c); - blockClient(c,BLOCKED_WAIT); + blockForReplication(c,timeout,offset,numreplicas); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3518,16 +3514,16 @@ void processClientsWaitingReplicas(void) { * offset and number of replicas, we remember it so the next client * may be unblocked without calling replicationCountAcksByOffset() * if the requested offset / replicas were equal or less. */ - if (last_offset && last_offset >= c->bpop.reploffset && - last_numreplicas >= c->bpop.numreplicas) + if (last_offset && last_offset >= c->bstate.reploffset && + last_numreplicas >= c->bstate.numreplicas) { unblockClient(c); addReplyLongLong(c,last_numreplicas); } else { - int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); + int numreplicas = replicationCountAcksByOffset(c->bstate.reploffset); - if (numreplicas >= c->bpop.numreplicas) { - last_offset = c->bpop.reploffset; + if (numreplicas >= c->bstate.numreplicas) { + last_offset = c->bstate.reploffset; last_numreplicas = numreplicas; unblockClient(c); addReplyLongLong(c,numreplicas); diff --git a/src/script.c b/src/script.c index 2a770fa61..fd47e390e 100644 --- a/src/script.c +++ b/src/script.c @@ -559,7 +559,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) { goto error; } - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; + int call_flags = CMD_CALL_NONE; if (run_ctx->repl_flags & PROPAGATE_AOF) { call_flags |= CMD_CALL_PROPAGATE_AOF; } diff --git a/src/script_lua.c b/src/script_lua.c index b22f9e5b9..c86cdc2d1 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -981,7 +981,7 @@ cleanup: c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - freeClientArgv(c); + resetClient(c); inuse--; if (raise_error) { diff --git a/src/server.c b/src/server.c index 380b20d32..eaad37c07 100644 --- a/src/server.c +++ b/src/server.c @@ -92,6 +92,10 @@ const char *replstateToString(int replstate); /*============================ Utility functions ============================ */ +/* This macro tells if we are in the context of loading an AOF. */ +#define isAOFLoadingContext() \ + ((server.current_client && server.current_client->id == CLIENT_ID_AOF) ? 1 : 0) + /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); @@ -3374,8 +3378,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) { * * The following flags can be passed: * CMD_CALL_NONE No flags. - * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. - * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to slaves if it modified the dataset @@ -3411,6 +3413,15 @@ void call(client *c, int flags) { long long dirty; uint64_t client_old_flags = c->flags; struct redisCommand *real_cmd = c->realcmd; + /* When call() is issued during loading the AOF we don't want commands called + * from module, exec or LUA to go into the slowlog or to populate statistics. */ + int update_command_stats = !isAOFLoadingContext(); + + /* We want to be aware of a client which is making a first time attempt to execute this command + * and a client which is reprocessing command again (after being unblocked). + * Blocked clients can be blocked in different places and not always it means the call() function has been + * called. For example this is required for avoiding double logging to monitors.*/ + int reprocessing_command = ((!server.execution_nesting) && (c->flags & CLIENT_EXECUTING_COMMAND)) ? 1 : 0; /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ @@ -3429,10 +3440,11 @@ void call(client *c, int flags) { const long long call_timer = ustime(); - /* Update cache time, in case we have nested calls we want to - * update only on the first call */ + /* Update cache time, and indicate we are starting command execution. + * in case we have nested calls we want to update only on the first call */ if (server.execution_nesting++ == 0) { updateCachedTimeWithUs(0,call_timer); + c->flags |= CLIENT_EXECUTING_COMMAND; } monotime monotonic_start = 0; @@ -3440,7 +3452,13 @@ void call(client *c, int flags) { monotonic_start = getMonotonicUs(); c->cmd->proc(c); - server.execution_nesting--; + + if (--server.execution_nesting == 0) { + /* In case client is blocked after trying to execute the command, + * it means the execution is not yet completed and we MIGHT reprocess the command in the future. */ + if (!(c->flags & CLIENT_BLOCKED)) + c->flags &= ~(CLIENT_EXECUTING_COMMAND); + } /* In order to avoid performance implication due to querying the clock using a system call 3 times, * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */ @@ -3450,7 +3468,7 @@ void call(client *c, int flags) { else duration = ustime() - call_timer; - c->duration = duration; + c->duration += duration; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; @@ -3471,11 +3489,6 @@ void call(client *c, int flags) { c->flags |= CLIENT_CLOSE_AFTER_REPLY; } - /* When EVAL is called loading the AOF we don't want commands called - * from Lua to go into the slowlog or to populate statistics. */ - if (server.loading && c->flags & CLIENT_SCRIPT) - flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); - /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ @@ -3493,7 +3506,7 @@ void call(client *c, int flags) { /* Record the latency this command induced on the main thread. * unless instructed by the caller not to log. (happens when processing * a MULTI-EXEC from inside an AOF). */ - if (flags & CMD_CALL_SLOWLOG) { + if (update_command_stats) { char *latency_event = (real_cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); @@ -3501,12 +3514,15 @@ void call(client *c, int flags) { /* Log the command into the Slow log if needed. * If the client is blocked we will handle slowlog when it is unblocked. */ - if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) - slowlogPushCurrentCommand(c, real_cmd, duration); + if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) + slowlogPushCurrentCommand(c, real_cmd, c->duration); - /* Send the command to clients in MONITOR mode if applicable. - * Administrative commands are considered too dangerous to be shown. */ - if (!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { + /* Send the command to clients in MONITOR mode if applicable, + * since some administrative commands are considered too dangerous to be shown. + * Other exceptions is a client which is unblocked and retring to process the command + * or we are currently in the process of loading AOF. */ + if (update_command_stats && !reprocessing_command && + !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { robj **argv = c->original_argv ? c->original_argv : c->argv; int argc = c->original_argv ? c->original_argc : c->argc; replicationFeedMonitors(c,server.monitors,c->db->id,argv,argc); @@ -3517,13 +3533,13 @@ void call(client *c, int flags) { if (!(c->flags & CLIENT_BLOCKED)) freeClientOriginalArgv(c); - /* populate the per-command statistics that we show in INFO commandstats. */ - if (flags & CMD_CALL_STATS) { - real_cmd->microseconds += duration; + /* populate the per-command statistics that we show in INFO commandstats. + * If the client is blocked we will handle latency stats and duration when it is unblocked. */ + if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) { real_cmd->calls++; - /* If the client is blocked we will handle latency stats when it is unblocked. */ + real_cmd->microseconds += c->duration; if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED)) - updateCommandLatencyHistogram(&(real_cmd->latency_histogram), duration*1000); + updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration*1000); } /* Propagate the command into the AOF and replication link. @@ -3584,7 +3600,8 @@ void call(client *c, int flags) { } } - server.stat_numcommands++; + if (!(c->flags & CLIENT_BLOCKED)) + server.stat_numcommands++; /* Record peak memory after each command and before the eviction that runs * before the next command. */ @@ -3735,6 +3752,10 @@ int processCommand(client *c) { moduleCallCommandFilters(c); + /* in case we are starting to ProcessCommand and we already have a command we assume + * this is a reprocessing of this command, so we do not want to perform some of the actions again. */ + int client_reprocessing_command = c->cmd ? 1 : 0; + /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { securityWarningCommand(c); @@ -3746,36 +3767,40 @@ int processCommand(client *c) { if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && !(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS)) { - c->bpop.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockPostponeClient(c); return C_OK; } /* Now lookup the command and check ASAP about trivial error conditions - * such as wrong arity, bad command name and so forth. */ - c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); - sds err; - if (!commandCheckExistence(c, &err)) { - rejectCommandSds(c, err); - return C_OK; - } - if (!commandCheckArity(c, &err)) { - rejectCommandSds(c, err); - return C_OK; - } - - /* Check if the command is marked as protected and the relevant configuration allows it */ - if (c->cmd->flags & CMD_PROTECTED) { - if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) || - (c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) - { - rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", " - "you can run it from a local connection, otherwise you need to set this option " - "in the configuration file, and then restart the server.", - c->cmd->proc == debugCommand ? "DEBUG" : "MODULE", - c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command"); + * such as wrong arity, bad command name and so forth. + * In case we are reprocessing a command after it was blocked, + * we do not have to repeat the same checks */ + if (!client_reprocessing_command) { + c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); + sds err; + if (!commandCheckExistence(c, &err)) { + rejectCommandSds(c, err); return C_OK; + } + if (!commandCheckArity(c, &err)) { + rejectCommandSds(c, err); + return C_OK; + } + + /* Check if the command is marked as protected and the relevant configuration allows it */ + if (c->cmd->flags & CMD_PROTECTED) { + if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) || + (c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) + { + rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", " + "you can run it from a local connection, otherwise you need to set this option " + "in the configuration file, and then restart the server.", + c->cmd->proc == debugCommand ? "DEBUG" : "MODULE", + c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command"); + return C_OK; + + } } } @@ -4028,8 +4053,7 @@ int processCommand(client *c) { ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { - c->bpop.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockPostponeClient(c); return C_OK; } @@ -5444,7 +5468,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { /* Clients */ if (all_sections || (dictFind(section_dict,"clients") != NULL)) { size_t maxin, maxout; + unsigned long blocking_keys, blocking_keys_on_nokey; getExpansiveClientsInfo(&maxin,&maxout); + totalNumberOfBlockingKeys(&blocking_keys, &blocking_keys_on_nokey); if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Clients\r\n" @@ -5455,14 +5481,18 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" "tracking_clients:%d\r\n" - "clients_in_timeout_table:%llu\r\n", + "clients_in_timeout_table:%llu\r\n" + "total_blocking_keys:%lu\r\n" + "total_blocking_keys_on_nokey:%lu\r\n", listLength(server.clients)-listLength(server.slaves), getClusterConnectionsCount(), server.maxclients, maxin, maxout, server.blocked_clients, server.tracking_clients, - (unsigned long long) raxSize(server.clients_timeout_table)); + (unsigned long long) raxSize(server.clients_timeout_table), + blocking_keys, + blocking_keys_on_nokey); } /* Memory */ diff --git a/src/server.h b/src/server.h index 765b67d62..1201cab3b 100644 --- a/src/server.h +++ b/src/server.h @@ -359,7 +359,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */ +#define CLIENT_EXECUTING_COMMAND (1<<29) /* Indicates that the client is currently in the process of handling + a command. usually this will be marked only during call() + however, blocked clients might have this flag kept until they + will try to reprocess the command. */ + #define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully * parsed command ready for execution. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to @@ -388,15 +392,18 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ -#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ -#define BLOCKED_LIST 1 /* BLPOP & co. */ -#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ -#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ -#define BLOCKED_STREAM 4 /* XREAD. */ -#define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */ -#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */ -#define BLOCKED_NUM 8 /* Number of blocked states. */ +typedef enum blocking_type { + BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ + BLOCKED_LIST, /* BLPOP & co. */ + BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_MODULE, /* Blocked by a loadable module. */ + BLOCKED_STREAM, /* XREAD. */ + BLOCKED_ZSET, /* BZPOP et al. */ + BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ + BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_NUM, /* Number of blocked states. */ + BLOCKED_END /* End of enumeration */ +} blocking_type; /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -569,13 +576,11 @@ typedef enum { /* Command call flags, see call() function */ #define CMD_CALL_NONE 0 -#define CMD_CALL_SLOWLOG (1<<0) -#define CMD_CALL_STATS (1<<1) -#define CMD_CALL_PROPAGATE_AOF (1<<2) -#define CMD_CALL_PROPAGATE_REPL (1<<3) +#define CMD_CALL_PROPAGATE_AOF (1<<0) +#define CMD_CALL_PROPAGATE_REPL (1<<1) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) -#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) -#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */ +#define CMD_CALL_FULL (CMD_CALL_PROPAGATE) +#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */ /* Command propagation flags, see propagateNow() function */ #define PROPAGATE_NONE 0 @@ -992,27 +997,13 @@ typedef struct multiState { * The fields used depend on client->btype. */ typedef struct blockingState { /* Generic fields. */ - long count; /* Elements to pop if count was specified (BLMPOP/BZMPOP), -1 otherwise. */ - mstime_t timeout; /* Blocking operation timeout. If UNIX current time - * is > timeout then the operation timed out. */ - - /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */ - dict *keys; /* The keys we are waiting to terminate a blocking - * operation such as BLPOP or XREAD. Or NULL. */ - robj *target; /* The key that should receive the element, - * for BLMOVE. */ - struct blockPos { - int wherefrom; /* Where to pop from */ - int whereto; /* Where to push to */ - } blockpos; /* The positions in the src/dst lists/zsets - * where we want to pop/push an element - * for BLPOP, BRPOP, BLMOVE and BZMPOP. */ - - /* BLOCK_STREAM */ - size_t xread_count; /* XREAD COUNT option. */ - robj *xread_group; /* XREADGROUP group name. */ - robj *xread_consumer; /* XREADGROUP consumer name. */ - int xread_group_noack; + blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */ + mstime_t timeout; /* Blocking operation timeout. If UNIX current time + * is > timeout then the operation timed out. */ + int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys + is deleted or does not exist anymore */ + /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */ + dict *keys; /* The keys we are blocked on */ /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ @@ -1029,7 +1020,7 @@ typedef struct blockingState { * operation such as B[LR]POP, but received new data in the context of the * last executed command. * - * After the execution of every command or script, we run this list to check + * After the execution of every command or script, we iterate over this list to check * if as a result we should serve data to clients blocked, unblocking them. * Note that server.ready_keys will not have duplicates as there dictionary * also called ready_keys in every structure representing a Redis database, @@ -1167,8 +1158,7 @@ typedef struct client { int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ int slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ - int btype; /* Type of blocking op if CLIENT_BLOCKED. */ - blockingState bpop; /* blocking state */ + blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ @@ -2635,7 +2625,6 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); robj *listTypeDup(robj *o); void listTypeDelRange(robj *o, long start, long stop); -void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted); typedef enum { @@ -2938,6 +2927,7 @@ int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); int processCommand(client *c); int processPendingCommandAndInputBuffer(client *c); +int processCommandAndResetClient(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); @@ -3166,6 +3156,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, #define LOOKUP_NOSTATS (1<<2) /* Don't update keyspace hits/misses counters. */ #define LOOKUP_WRITE (1<<3) /* Delete expired keys even in replicas. */ #define LOOKUP_NOEXPIRE (1<<4) /* Avoid deleting lazy expired keys. */ +#define LOKKUP_NOEFFECTS (LOOKUP_NONOTIFY | LOOKUP_NOSTATS | LOOKUP_NOTOUCH | LOOKUP_NOEXPIRE) /* Avoid any effects from fetching the key */ void dbAdd(redisDb *db, robj *key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val); @@ -3281,20 +3272,28 @@ typedef struct luaScript { robj *body; } luaScript; -/* Blocked clients */ +/* Blocked clients API */ void processUnblockedClients(void); +void initClientBlockingState(client *c); void blockClient(client *c, int btype); void unblockClient(client *c); +void unblockClientOnTimeout(client *c); +void unblockClientOnError(client *c, const char *err_str); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); +void blockClientShutdown(client *c); +void blockPostponeClient(client *c); +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey); + /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); diff --git a/src/t_list.c b/src/t_list.c index d7906d391..ee5f6fd91 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1200,103 +1200,6 @@ void rpoplpushCommand(client *c) { lmoveGenericCommand(c, LIST_TAIL, LIST_HEAD); } -/*----------------------------------------------------------------------------- - * Blocking POP operations - *----------------------------------------------------------------------------*/ - -/* This is a helper function for handleClientsBlockedOnKeys(). Its work - * is to serve a specific client (receiver) that is blocked on 'key' - * in the context of the specified 'db', doing the following: - * - * 1) Provide the client with the 'value' element or a range of elements. - * We will do the pop in here and caller does not need to bother the return. - * 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the - * 'value' element on the destination list (the "push" side of the command). - * 3) Propagate the resulting BRPOP, BLPOP, BLMPOP and additional xPUSH if any into - * the AOF and replication channel. - * - * The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that - * we can propagate the command properly. - * - * The argument 'whereto' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element is to be pushed to the head or tail so that we can - * propagate the command properly. - * - * 'deleted' is an optional output argument to get an indication - * if the key got deleted by this function. */ -void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted) -{ - robj *argv[5]; - robj *value = NULL; - - if (deleted) *deleted = 0; - - if (dstkey == NULL) { - /* Propagate the [LR]POP operation. */ - argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop : - shared.rpop; - argv[1] = key; - - if (receiver->lastcmd->proc == blmpopCommand) { - /* Propagate the [LR]POP COUNT operation. */ - long count = receiver->bpop.count; - serverAssert(count > 0); - long llen = listTypeLength(o); - serverAssert(llen > 0); - - argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count); - alsoPropagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(argv[2]); - - /* Pop a range of elements in a nested arrays way. */ - listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, 0, deleted); - return; - } - - alsoPropagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL); - - /* BRPOP/BLPOP */ - value = listTypePop(o, wherefrom); - serverAssert(value != NULL); - - addReplyArrayLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,value); - - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted); - } else { - /* BLMOVE */ - robj *dstobj = - lookupKeyWrite(receiver->db,dstkey); - if (!(dstobj && - checkType(receiver,dstobj,OBJ_LIST))) - { - value = listTypePop(o, wherefrom); - serverAssert(value != NULL); - - /* "lpush" or "rpush" notify event will be notified by lmoveHandlePush. */ - lmoveHandlePush(receiver,dstkey,dstobj,value,whereto); - /* Propagate the LMOVE/RPOPLPUSH operation. */ - int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand); - argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove; - argv[1] = key; - argv[2] = dstkey; - argv[3] = getStringObjectFromListPosition(wherefrom); - argv[4] = getStringObjectFromListPosition(whereto); - alsoPropagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL); - - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted); - } - } - - if (value) decrRefCount(value); -} - /* Blocking RPOP/LPOP/LMPOP * * 'numkeys' is the number of keys. @@ -1368,8 +1271,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i } /* If the keys do not exist we must block */ - struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL,0); + blockForKeys(c,BLOCKED_LIST,keys,numkeys,timeout,0); } /* BLPOP [ ...] */ @@ -1393,8 +1295,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou addReplyNull(c); } else { /* The list is empty and the client blocks. */ - struct blockPos pos = {wherefrom, whereto}; - blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL,0); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,0); } } else { /* The list exists and has elements, so diff --git a/src/t_stream.c b/src/t_stream.c index b2e58dc15..16ad044a2 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2409,27 +2409,19 @@ void xreadCommand(client *c) { addReplyNullArray(c); goto cleanup; } - blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, - -1, timeout, NULL, NULL, ids, xreadgroup); - /* If no COUNT is given and we block, set a relatively small count: - * in case the ID provided is too low, we do not want the server to - * block just to serve this client a huge stream of messages. */ - c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; - - /* If this is a XREADGROUP + GROUP we need to remember for which - * group and consumer name we are blocking, so later when one of the - * keys receive more data, we can call streamReplyWithRange() passing - * the right arguments. */ - if (groupname) { - incrRefCount(groupname); - incrRefCount(consumername); - c->bpop.xread_group = groupname; - c->bpop.xread_consumer = consumername; - c->bpop.xread_group_noack = noack; - } else { - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; + /* We change the '$' to the current last ID for this stream. this is + * Since later on when we unblock on arriving data - we would like to + * re-process the command and in case '$' stays we will spin-block forever. + */ + for (int id_idx = 0; id_idx < streams_count; id_idx++) { + int arg_idx = id_idx + streams_arg + streams_count; + if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) { + robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]); + rewriteClientCommandArgument(c, arg_idx, argv_streamid); + decrRefCount(argv_streamid); + } } + blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup); goto cleanup; } diff --git a/src/t_zset.c b/src/t_zset.c index eb87c5d7d..048759d1f 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -4071,8 +4071,7 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where, } /* If the keys do not exist we must block */ - struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL,0); + blockForKeys(c,BLOCKED_ZSET,keys,numkeys,timeout,0); } // BZPOPMIN key [key ...] timeout diff --git a/src/timeout.c b/src/timeout.c index 678a24857..b62423a9e 100644 --- a/src/timeout.c +++ b/src/timeout.c @@ -36,12 +36,11 @@ * Otherwise 0 is returned and no operation is performed. */ int checkBlockedClientTimeout(client *c, mstime_t now) { if (c->flags & CLIENT_BLOCKED && - c->bpop.timeout != 0 - && c->bpop.timeout < now) + c->bstate.timeout != 0 + && c->bstate.timeout < now) { /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); + unblockClientOnTimeout(c); return 1; } else { return 0; @@ -71,7 +70,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * into keys no longer served by this server. */ if (server.cluster_enabled) { if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); + unblockClientOnError(c, NULL); } } return 0; @@ -112,8 +111,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) { * to handle blocked clients timeouts. The client is not added to the list * if its timeout is zero (block forever). */ void addClientToTimeoutTable(client *c) { - if (c->bpop.timeout == 0) return; - uint64_t timeout = c->bpop.timeout; + if (c->bstate.timeout == 0) return; + uint64_t timeout = c->bstate.timeout; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,c); if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) @@ -125,7 +124,7 @@ void addClientToTimeoutTable(client *c) { void removeClientFromTimeoutTable(client *c) { if (!(c->flags & CLIENT_IN_TO_TABLE)) return; c->flags &= ~CLIENT_IN_TO_TABLE; - uint64_t timeout = c->bpop.timeout; + uint64_t timeout = c->bstate.timeout; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,c); raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 435e8fcde..175506c17 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -182,7 +182,8 @@ start_server {tags {"repl external:skip"}} { [$B lrange foo 0 -1] eq {a b c} } else { fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]" - } + } + assert_match {*calls=1,*,rejected_calls=0,failed_calls=1*} [cmdrstat blpop $B] } } } diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 64b456904..cc159a6ce 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -359,6 +359,14 @@ int test_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ return REDISMODULE_OK; } +/* wrapper for RM_Call which also replicates the module command */ +int test_rm_call_replicate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + test_rm_call(ctx, argv, argc); + RedisModule_ReplicateVerbatim(ctx); + + return REDISMODULE_OK; +} + /* wrapper for RM_Call with flags */ int test_rm_call_flags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ if(argc < 3){ @@ -497,6 +505,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx, "test.rm_call_flags", test_rm_call_flags,"allow-stale", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "test.rm_call_replicate", test_rm_call_replicate,"allow-stale", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index aceec3cca..4617334e7 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -173,6 +173,49 @@ start_server {tags {"introspection"}} { $rd close } + + test {MONITOR log blocked command only once} { + + # need to reconnect in order to reset the clients state + reconnect + + set rd [redis_deferring_client] + set bc [redis_deferring_client] + r del mylist + + $rd monitor + $rd read ; # Discard the OK + + $bc blpop mylist 0 + wait_for_blocked_clients_count 1 + r lpush mylist 1 + wait_for_blocked_clients_count 0 + r lpush mylist 2 + + # we expect to see the blpop on the monitor first + assert_match {*"blpop"*"mylist"*"0"*} [$rd read] + + # we scan out all the info commands on the monitor + set monitor_output [$rd read] + while { [string match {*"info"*} $monitor_output] } { + set monitor_output [$rd read] + } + + # we expect to locate the lpush right when the client was unblocked + assert_match {*"lpush"*"mylist"*"1"*} $monitor_output + + # we scan out all the info commands + set monitor_output [$rd read] + while { [string match {*"info"*} $monitor_output] } { + set monitor_output [$rd read] + } + + # we expect to see the next lpush and not duplicate blpop command + assert_match {*"lpush"*"mylist"*"2"*} $monitor_output + + $rd close + $bc close + } test {CLIENT GETNAME should return NIL if name is not assigned} { r client getname diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index ce4aaae44..90a369da2 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -1,4 +1,5 @@ set testmodule [file normalize tests/modules/propagate.so] +set miscmodule [file normalize tests/modules/misc.so] set keyspace_events [file normalize tests/modules/keyspace_events.so] tags "modules" { @@ -721,5 +722,42 @@ tags "modules aof" { assert_equal [s 0 unexpected_error_replies] 0 } } + test "Modules RM_Call does not update stats during aof load: AOF-load type $aofload_type" { + start_server [list overrides [list loadmodule "$miscmodule"]] { + # Enable the AOF + r config set appendonly yes + r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite. + waitForBgrewriteaof r + + r config resetstat + r set foo bar + r EVAL {return redis.call('SET', KEYS[1], ARGV[1])} 1 foo bar2 + r test.rm_call_replicate set foo bar3 + r EVAL {return redis.call('test.rm_call_replicate',ARGV[1],KEYS[1],ARGV[2])} 1 foo set bar4 + + r multi + r set foo bar5 + r EVAL {return redis.call('SET', KEYS[1], ARGV[1])} 1 foo bar6 + r test.rm_call_replicate set foo bar7 + r EVAL {return redis.call('test.rm_call_replicate',ARGV[1],KEYS[1],ARGV[2])} 1 foo set bar8 + r exec + + assert_match {*calls=8,*,rejected_calls=0,failed_calls=0} [cmdrstat set r] + + + # Load the AOF + if {$aofload_type == "debug_cmd"} { + r config resetstat + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + assert_no_match {*calls=*} [cmdrstat set r] + + } + } } } diff --git a/tests/unit/slowlog.tcl b/tests/unit/slowlog.tcl index 8ce1b1c27..bc15c6411 100644 --- a/tests/unit/slowlog.tcl +++ b/tests/unit/slowlog.tcl @@ -200,4 +200,26 @@ start_server {tags {"slowlog"} overrides {slowlog-log-slower-than 1000000}} { assert_equal 3 [llength [r slowlog get -1]] assert_equal 3 [llength [r slowlog get 3]] } + + test {SLOWLOG - blocking command is reported only after unblocked} { + # Cleanup first + r del mylist + # create a test client + set rd [redis_deferring_client] + + # config the slowlog and reset + r config set slowlog-log-slower-than 0 + r config set slowlog-max-len 110 + r slowlog reset + + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 50 20 + assert_equal 0 [llength [regexp -all -inline (?=BLPOP) [r slowlog get]]] + + r LPUSH mylist 1 + wait_for_blocked_clients_count 0 50 20 + assert_equal 1 [llength [regexp -all -inline (?=BLPOP) [r slowlog get]]] + + $rd close + } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 0d00fcedb..f7e043f99 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -1101,10 +1101,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { r rpush k hello r pexpire k 100 set rd [redis_deferring_client] + $rd deferred 0 $rd select 9 - assert_equal {OK} [$rd read] - $rd client id - set id [$rd read] + set id [$rd client id] + $rd deferred 1 $rd brpop k 1 wait_for_blocked_clients_count 1 after 101 @@ -1115,6 +1115,13 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { assert_match "*flags=b*" [r client list id $id] r client unblock $id assert_equal {} [$rd read] + $rd deferred 0 + # We want to force key deletion to be propagated to the replica + # in order to verify it was expiered on the replication stream. + $rd set somekey1 someval1 + $rd exists k + r set somekey2 someval2 + assert_replication_stream $repl { {select *} {flushall} @@ -1123,11 +1130,14 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { {pexpireat k *} {swapdb 1 9} {select 9} + {set somekey1 someval1} {del k} + {select 1} + {set somekey2 someval2} } close_replication_stream $repl - # Restore server and client state r debug set-active-expire 1 + # Restore server and client state r select 9 } {OK} {singledb:skip needs:debug} @@ -1154,6 +1164,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { assert_match "*flags=b*" [r client list id $id] r client unblock $id assert_equal {} [$rd read] + # We want to force key deletion to be propagated to the replica + # in order to verify it was expiered on the replication stream. + $rd exists k + assert_equal {0} [$rd read] assert_replication_stream $repl { {select *} {flushall} @@ -2161,4 +2175,120 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} { assert_equal [lpop k] [string repeat x 31] set _ $k } {12 0 9223372036854775808 2147483647 32767 127} + + test "Unblock fairness is kept while pipelining" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + # delete the list in case already exists + r del mylist + + # block a client on the list + $rd1 BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # pipline on other client a list push and a blocking pop + # we should expect the fainess to be kept and have $rd1 + # being unblocked + set buf "" + append buf "LPUSH mylist 1\r\n" + append buf "BLPOP mylist 0\r\n" + $rd2 write $buf + $rd2 flush + + # we check that we still have 1 blocked client + # and that the first blocked client has been served + assert_equal [$rd1 read] {mylist 1} + assert_equal [$rd2 read] {1} + wait_for_blocked_clients_count 1 + + # We no unblock the last client and verify it was served last + r LPUSH mylist 2 + wait_for_blocked_clients_count 0 + assert_equal [$rd2 read] {mylist 2} + + $rd1 close + $rd2 close + } + + test "Unblock fairness is kept during nested unblock" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + + # delete the list in case already exists + r del l1{t} l2{t} l3{t} + + # block a client on the list + $rd1 BRPOPLPUSH l1{t} l3{t} 0 + wait_for_blocked_clients_count 1 + + $rd2 BLPOP l2{t} 0 + wait_for_blocked_clients_count 2 + + $rd3 BLMPOP 0 2 l2{t} l3{t} LEFT COUNT 1 + wait_for_blocked_clients_count 3 + + r multi + r lpush l1{t} 1 + r lpush l2{t} 2 + r exec + + wait_for_blocked_clients_count 0 + + assert_equal [$rd1 read] {1} + assert_equal [$rd2 read] {l2{t} 2} + assert_equal [$rd3 read] {l3{t} 1} + + $rd1 close + $rd2 close + $rd3 close + } + + test "Blocking command acounted only once in commandstats" { + # cleanup first + r del mylist + + # create a test client + set rd [redis_deferring_client] + + # reset the server stats + r config resetstat + + # block a client on the list + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # unblock the list + r LPUSH mylist 1 + wait_for_blocked_clients_count 0 + + assert_match {*calls=1,*,rejected_calls=0,failed_calls=0} [cmdrstat blpop r] + + $rd close + } + + test "Blocking command acounted only once in commandstats after timeout" { + # cleanup first + r del mylist + + # create a test client + set rd [redis_deferring_client] + $rd client id + set id [$rd read] + + # reset the server stats + r config resetstat + + # block a client on the list + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # unblock the client on timeout + r client unblock $id timeout + + assert_match {*calls=1,*,rejected_calls=0,failed_calls=0} [cmdrstat blpop r] + + $rd close + } } ;# stop servers diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index bae275b4e..5405d1b0d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -223,7 +223,7 @@ start_server { r XDEL mystream 667 set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">" - after 20 + wait_for_blocked_clients_count 0 assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} $rd close } @@ -234,8 +234,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r DEL mystream - assert_error "*no longer exists*" {$rd read} + assert_error "NOGROUP*" {$rd read} $rd close } @@ -245,8 +246,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SET mystream val1 - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } @@ -256,11 +258,12 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r MULTI r DEL mystream r SADD mystream e1 r EXEC - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } @@ -270,8 +273,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r FLUSHALL - assert_error "*no longer exists*" {$rd read} + assert_error "*NOGROUP*" {$rd read} $rd close } @@ -286,8 +290,9 @@ start_server { $rd SELECT 9 $rd read $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SWAPDB 4 9 - assert_error "*no longer exists*" {$rd read} + assert_error "*NOGROUP*" {$rd read} $rd close } {0} {external:skip} @@ -303,8 +308,9 @@ start_server { $rd SELECT 9 $rd read $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SWAPDB 4 9 - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } {0} {external:skip} @@ -313,6 +319,7 @@ start_server { r XADD mystream 666 f v set rd [redis_deferring_client] $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 r DEL mystream r XADD mystream 667 f v @@ -326,6 +333,7 @@ start_server { r XADD mystream 666 f v set rd [redis_deferring_client] $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 r SET mystream val1 r DEL mystream @@ -410,6 +418,55 @@ start_server { $rd close } + test {Blocking XREADGROUP for stream key that has clients blocked on list} { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + + # First delete the stream + r DEL mystream + + # now place a client blocked on non-existing key as list + $rd2 BLPOP mystream 0 + + # wait until we verify the client is blocked + wait_for_blocked_clients_count 1 + + # verify we only have 1 regular blocking key + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now write mystream as stream + r XADD mystream 666 key value + r XGROUP CREATE mystream mygroup $ MKSTREAM + + # block another client on xreadgroup + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream ">" + + # wait until we verify we have 2 blocked clients (one for the list and one for the stream) + wait_for_blocked_clients_count 2 + + # verify we have 1 blocking key which also have clients blocked on nokey condition + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now delete the key and verify we have no clients blocked on nokey condition + r DEL mystream + assert_error "NOGROUP*" {$rd read} + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # close the only left client and make sure we have no more blocking keys + $rd2 close + + # wait until we verify we have no more blocked clients + wait_for_blocked_clients_count 0 + + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + $rd close + } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { r config resetstat r del mystream