parent
96688aa646
commit
96a54866ab
20
src/adlist.c
20
src/adlist.c
@ -327,12 +327,11 @@ listNode *listIndex(list *list, long index) {
|
||||
}
|
||||
|
||||
/* Rotate the list removing the tail node and inserting it to the head. */
|
||||
void listRotate(list *list) {
|
||||
listNode *tail = list->tail;
|
||||
|
||||
void listRotateTailToHead(list *list) {
|
||||
if (listLength(list) <= 1) return;
|
||||
|
||||
/* Detach current tail */
|
||||
listNode *tail = list->tail;
|
||||
list->tail = tail->prev;
|
||||
list->tail->next = NULL;
|
||||
/* Move it as head */
|
||||
@ -342,6 +341,21 @@ void listRotate(list *list) {
|
||||
list->head = tail;
|
||||
}
|
||||
|
||||
/* Rotate the list removing the head node and inserting it to the tail. */
|
||||
void listRotateHeadToTail(list *list) {
|
||||
if (listLength(list) <= 1) return;
|
||||
|
||||
listNode *head = list->head;
|
||||
/* Detach current head */
|
||||
list->head = head->next;
|
||||
list->head->prev = NULL;
|
||||
/* Move it as tail */
|
||||
list->tail->next = head;
|
||||
head->next = NULL;
|
||||
head->prev = list->tail;
|
||||
list->tail = head;
|
||||
}
|
||||
|
||||
/* Add all the elements of the list 'o' at the end of the
|
||||
* list 'l'. The list 'other' remains empty but otherwise valid. */
|
||||
void listJoin(list *l, list *o) {
|
||||
|
@ -85,7 +85,8 @@ listNode *listSearchKey(list *list, void *key);
|
||||
listNode *listIndex(list *list, long index);
|
||||
void listRewind(list *list, listIter *li);
|
||||
void listRewindTail(list *list, listIter *li);
|
||||
void listRotate(list *list);
|
||||
void listRotateTailToHead(list *list);
|
||||
void listRotateHeadToTail(list *list);
|
||||
void listJoin(list *l, list *o);
|
||||
|
||||
/* Directions for iterators */
|
||||
|
@ -64,6 +64,21 @@
|
||||
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
||||
|
||||
/* This structure represents the blocked key information that we store
|
||||
* in the client structure. Each client blocked on keys, has a
|
||||
* client->bpop.keys hash table. The keys of the hash table are Redis
|
||||
* keys pointers to 'robj' structures. The value is this structure.
|
||||
* The structure has two goals: firstly we store the list node that this
|
||||
* client uses to be listed in the database "blocked clients for this key"
|
||||
* list, so we can later unblock in O(1) without a list scan.
|
||||
* Secondly for certain blocking types, we have additional info. Right now
|
||||
* the only use for additional info we have is when clients are blocked
|
||||
* on streams, as we have to remember the ID it blocked for. */
|
||||
typedef struct bkinfo {
|
||||
listNode *listnode; /* List node for db->blocking_keys[key] list. */
|
||||
streamID stream_id; /* Stream ID if we blocked in a stream. */
|
||||
} bkinfo;
|
||||
|
||||
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
|
||||
* flag is set client query buffer is not longer processed, but accumulated,
|
||||
* and will be processed when the client is unblocked. */
|
||||
@ -211,8 +226,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
if (receiver->btype != BLOCKED_LIST) {
|
||||
/* Put at the tail, so that at the next call
|
||||
* we'll not run into it again. */
|
||||
listDelNode(clients,clientnode);
|
||||
listAddNodeTail(clients,receiver);
|
||||
listRotateHeadToTail(clients);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -273,8 +287,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||
if (receiver->btype != BLOCKED_ZSET) {
|
||||
/* Put at the tail, so that at the next call
|
||||
* we'll not run into it again. */
|
||||
listDelNode(clients,clientnode);
|
||||
listAddNodeTail(clients,receiver);
|
||||
listRotateHeadToTail(clients);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -320,8 +333,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
while((ln = listNext(&li))) {
|
||||
client *receiver = listNodeValue(ln);
|
||||
if (receiver->btype != BLOCKED_STREAM) continue;
|
||||
streamID *gt = dictFetchValue(receiver->bpop.keys,
|
||||
rl->key);
|
||||
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
|
||||
streamID *gt = &bki->stream_id;
|
||||
|
||||
/* If we blocked in the context of a consumer
|
||||
* group, we need to resolve the group and update the
|
||||
@ -419,8 +432,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
||||
* ready to be served, so they'll remain in the list
|
||||
* sometimes. We want also be able to skip clients that are
|
||||
* not blocked for the MODULE type safely. */
|
||||
listDelNode(clients,clientnode);
|
||||
listAddNodeTail(clients,receiver);
|
||||
listRotateHeadToTail(clients);
|
||||
|
||||
if (receiver->btype != BLOCKED_MODULE) continue;
|
||||
|
||||
@ -551,17 +563,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
if (target != NULL) incrRefCount(target);
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
/* The value associated with the key name in the bpop.keys dictionary
|
||||
* is NULL for lists and sorted sets, or the stream ID for streams. */
|
||||
void *key_data = NULL;
|
||||
if (btype == BLOCKED_STREAM) {
|
||||
key_data = zmalloc(sizeof(streamID));
|
||||
memcpy(key_data,ids+j,sizeof(streamID));
|
||||
}
|
||||
/* Allocate our bkinfo structure, associated to each key the client
|
||||
* is blocked for. */
|
||||
bkinfo *bki = zmalloc(sizeof(*bki));
|
||||
if (btype == BLOCKED_STREAM)
|
||||
bki->stream_id = ids[j];
|
||||
|
||||
/* If the key already exists in the dictionary ignore it. */
|
||||
if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) {
|
||||
zfree(key_data);
|
||||
if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
|
||||
zfree(bki);
|
||||
continue;
|
||||
}
|
||||
incrRefCount(keys[j]);
|
||||
@ -580,6 +590,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
l = dictGetVal(de);
|
||||
}
|
||||
listAddNodeTail(l,c);
|
||||
bki->listnode = listLast(l);
|
||||
}
|
||||
blockClient(c,btype);
|
||||
}
|
||||
@ -596,11 +607,12 @@ void unblockClientWaitingData(client *c) {
|
||||
/* The client may wait for multiple keys, so unblock it for every key. */
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *key = dictGetKey(de);
|
||||
bkinfo *bki = dictGetVal(de);
|
||||
|
||||
/* Remove this client from the list of clients waiting for this key. */
|
||||
l = dictFetchValue(c->db->blocking_keys,key);
|
||||
serverAssertWithInfo(c,key,l != NULL);
|
||||
listDelNode(l,listSearchKey(l,c));
|
||||
listDelNode(l,bki->listnode);
|
||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||
if (listLength(l) == 0)
|
||||
dictDelete(c->db->blocking_keys,key);
|
||||
|
@ -1666,7 +1666,7 @@ void clientsCron(void) {
|
||||
/* Rotate the list, take the current head, process.
|
||||
* This way if the client must be removed from the list it's the
|
||||
* first element and we don't incur into O(N) computation. */
|
||||
listRotate(server.clients);
|
||||
listRotateTailToHead(server.clients);
|
||||
head = listFirst(server.clients);
|
||||
c = listNodeValue(head);
|
||||
/* The following functions do different service checks on the client.
|
||||
|
Loading…
x
Reference in New Issue
Block a user