diff --git a/src/adlist.c b/src/adlist.c index 359262521..7b7b012ce 100644 --- a/src/adlist.c +++ b/src/adlist.c @@ -327,12 +327,11 @@ listNode *listIndex(list *list, long index) { } /* Rotate the list removing the tail node and inserting it to the head. */ -void listRotate(list *list) { - listNode *tail = list->tail; - +void listRotateTailToHead(list *list) { if (listLength(list) <= 1) return; /* Detach current tail */ + listNode *tail = list->tail; list->tail = tail->prev; list->tail->next = NULL; /* Move it as head */ @@ -342,6 +341,21 @@ void listRotate(list *list) { list->head = tail; } +/* Rotate the list removing the head node and inserting it to the tail. */ +void listRotateHeadToTail(list *list) { + if (listLength(list) <= 1) return; + + listNode *head = list->head; + /* Detach current head */ + list->head = head->next; + list->head->prev = NULL; + /* Move it as tail */ + list->tail->next = head; + head->next = NULL; + head->prev = list->tail; + list->tail = head; +} + /* Add all the elements of the list 'o' at the end of the * list 'l'. The list 'other' remains empty but otherwise valid. */ void listJoin(list *l, list *o) { diff --git a/src/adlist.h b/src/adlist.h index 996a2d83f..d1d34a10f 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -89,7 +89,8 @@ listNode *listSearchKey(list *list, void *key); listNode *listIndex(list *list, long index); void listRewind(list *list, listIter *li); void listRewindTail(list *list, listIter *li); -void listRotate(list *list); +void listRotateTailToHead(list *list); +void listRotateHeadToTail(list *list); void listJoin(list *l, list *o); /* Directions for iterators */ diff --git a/src/blocked.cpp b/src/blocked.cpp index 6d2d99687..5dd0c4c3e 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -65,6 +65,21 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); +/* This structure represents the blocked key information that we store + * in the client structure. Each client blocked on keys, has a + * client->bpop.keys hash table. The keys of the hash table are Redis + * keys pointers to 'robj' structures. The value is this structure. + * The structure has two goals: firstly we store the list node that this + * client uses to be listed in the database "blocked clients for this key" + * list, so we can later unblock in O(1) without a list scan. + * Secondly for certain blocking types, we have additional info. Right now + * the only use for additional info we have is when clients are blocked + * on streams, as we have to remember the ID it blocked for. */ +typedef struct bkinfo { + listNode *listnode; /* List node for db->blocking_keys[key] list. */ + streamID stream_id; /* Stream ID if we blocked in a stream. */ +} bkinfo; + /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ @@ -230,8 +245,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -293,8 +307,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_ZSET) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -341,8 +354,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { client *receiver = (client*)listNodeValue(ln); if (receiver->btype != BLOCKED_STREAM) continue; std::unique_locklock)> lock(receiver->lock); - streamID *gt = (streamID*)dictFetchValue(receiver->bpop.keys, - rl->key); + bkinfo *bki = (bkinfo*)dictFetchValue(receiver->bpop.keys,rl->key); + streamID *gt = &bki->stream_id; /* If we blocked in the context of a consumer * group, we need to resolve the group and update the @@ -440,8 +453,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * ready to be served, so they'll remain in the list * sometimes. We want also be able to skip clients that are * not blocked for the MODULE type safely. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); if (receiver->btype != BLOCKED_MODULE) continue; @@ -573,17 +585,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { - /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists and sorted sets, or the stream ID for streams. */ - void *key_data = NULL; - if (btype == BLOCKED_STREAM) { - key_data = zmalloc(sizeof(streamID), MALLOC_SHARED); - memcpy(key_data,ids+j,sizeof(streamID)); - } + /* Allocate our bkinfo structure, associated to each key the client + * is blocked for. */ + bkinfo *bki = (bkinfo*)zmalloc(sizeof(*bki)); + if (btype == BLOCKED_STREAM) + bki->stream_id = ids[j]; /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { - zfree(key_data); + if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { + zfree(bki); continue; } incrRefCount(keys[j]); @@ -602,6 +612,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo l = (list*)dictGetVal(de); } listAddNodeTail(l,c); + bki->listnode = listLast(l); } blockClient(c,btype); } @@ -618,11 +629,12 @@ void unblockClientWaitingData(client *c) { /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { robj *key = (robj*)dictGetKey(de); + bkinfo *bki = (bkinfo*)dictGetVal(de); /* Remove this client from the list of clients waiting for this key. */ l = (list*)dictFetchValue(c->db->blocking_keys,key); serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,listSearchKey(l,c)); + listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blocking_keys,key); diff --git a/src/server.cpp b/src/server.cpp index 3fe980b2b..89bbdf567 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1705,7 +1705,7 @@ void clientsCron(int iel) { /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ - listRotate(g_pserver->clients); + listRotateTailToHead(g_pserver->clients); head = (listNode*)listFirst(g_pserver->clients); c = (client*)listNodeValue(head); if (c->iel == iel)