handleClientsBlockedOnKeys() refactoring.
This commit is contained in:
parent
3984b4fdd9
commit
c98af3a550
119
src/blocked.c
119
src/blocked.c
@ -229,54 +229,13 @@ void disconnectAllBlockedClients(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function should be called by Redis every time a single command,
|
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||||
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
* when there may be clients blocked on a list key, and there may be new
|
||||||
* being called by a client. It handles serving clients blocked in
|
* data to fetch (the key is ready). */
|
||||||
* lists, streams, and sorted sets, via a blocking commands.
|
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||||
*
|
|
||||||
* All the keys with at least one client blocked that received at least
|
|
||||||
* one new element via some write operation are accumulated into
|
|
||||||
* the server.ready_keys list. This function will run the list and will
|
|
||||||
* serve clients accordingly. Note that the function will iterate again and
|
|
||||||
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
|
||||||
* to serve because of the PUSH side of BRPOPLPUSH.
|
|
||||||
*
|
|
||||||
* This function is normally "fair", that is, it will server clients
|
|
||||||
* using a FIFO behavior. However this fairness is violated in certain
|
|
||||||
* edge cases, that is, when we have clients blocked at the same time
|
|
||||||
* in a sorted set and in a list, for the same key (a very odd thing to
|
|
||||||
* do client side, indeed!). Because mismatching clients (blocking for
|
|
||||||
* a different type compared to the current key type) are moved in the
|
|
||||||
* other side of the linked list. However as long as the key starts to
|
|
||||||
* be used only for a single type, like virtually any Redis application will
|
|
||||||
* do, the function is already fair. */
|
|
||||||
void handleClientsBlockedOnKeys(void) {
|
|
||||||
while(listLength(server.ready_keys) != 0) {
|
|
||||||
list *l;
|
|
||||||
|
|
||||||
/* Point server.ready_keys to a fresh list and save the current one
|
|
||||||
* locally. This way as we run the old list we are free to call
|
|
||||||
* signalKeyAsReady() that may push new elements in server.ready_keys
|
|
||||||
* when handling clients blocked into BRPOPLPUSH. */
|
|
||||||
l = server.ready_keys;
|
|
||||||
server.ready_keys = listCreate();
|
|
||||||
|
|
||||||
while(listLength(l) != 0) {
|
|
||||||
listNode *ln = listFirst(l);
|
|
||||||
readyList *rl = ln->value;
|
|
||||||
|
|
||||||
/* First of all remove this key from db->ready_keys so that
|
|
||||||
* we can safely call signalKeyAsReady() against this key. */
|
|
||||||
dictDelete(rl->db->ready_keys,rl->key);
|
|
||||||
|
|
||||||
/* Serve clients blocked on list key. */
|
|
||||||
robj *o = lookupKeyWrite(rl->db,rl->key);
|
|
||||||
if (o != NULL && o->type == OBJ_LIST) {
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
/* We serve clients in the same order they blocked for
|
/* We serve clients in the same order they blocked for
|
||||||
* this key, from the first blocked to the last. */
|
* this key, from the first blocked to the last. */
|
||||||
de = dictFind(rl->db->blocking_keys,rl->key);
|
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||||
if (de) {
|
if (de) {
|
||||||
list *clients = dictGetVal(de);
|
list *clients = dictGetVal(de);
|
||||||
int numclients = listLength(clients);
|
int numclients = listLength(clients);
|
||||||
@ -331,13 +290,13 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* when an element was pushed on the list. */
|
* when an element was pushed on the list. */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Serve clients blocked on sorted set key. */
|
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||||
else if (o != NULL && o->type == OBJ_ZSET) {
|
* when there may be clients blocked on a sorted set key, and there may be new
|
||||||
dictEntry *de;
|
* data to fetch (the key is ready). */
|
||||||
|
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||||
/* We serve clients in the same order they blocked for
|
/* We serve clients in the same order they blocked for
|
||||||
* this key, from the first blocked to the last. */
|
* this key, from the first blocked to the last. */
|
||||||
de = dictFind(rl->db->blocking_keys,rl->key);
|
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||||
if (de) {
|
if (de) {
|
||||||
list *clients = dictGetVal(de);
|
list *clients = dictGetVal(de);
|
||||||
int numclients = listLength(clients);
|
int numclients = listLength(clients);
|
||||||
@ -378,8 +337,10 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Serve clients blocked on stream key. */
|
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||||
else if (o != NULL && o->type == OBJ_STREAM) {
|
* when there may be clients blocked on a stream key, and there may be new
|
||||||
|
* data to fetch (the key is ready). */
|
||||||
|
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||||
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||||
stream *s = o->ptr;
|
stream *s = o->ptr;
|
||||||
|
|
||||||
@ -469,6 +430,58 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function should be called by Redis every time a single command,
|
||||||
|
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
||||||
|
* being called by a client. It handles serving clients blocked in
|
||||||
|
* lists, streams, and sorted sets, via a blocking commands.
|
||||||
|
*
|
||||||
|
* All the keys with at least one client blocked that received at least
|
||||||
|
* one new element via some write operation are accumulated into
|
||||||
|
* the server.ready_keys list. This function will run the list and will
|
||||||
|
* serve clients accordingly. Note that the function will iterate again and
|
||||||
|
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
||||||
|
* to serve because of the PUSH side of BRPOPLPUSH.
|
||||||
|
*
|
||||||
|
* This function is normally "fair", that is, it will server clients
|
||||||
|
* using a FIFO behavior. However this fairness is violated in certain
|
||||||
|
* edge cases, that is, when we have clients blocked at the same time
|
||||||
|
* in a sorted set and in a list, for the same key (a very odd thing to
|
||||||
|
* do client side, indeed!). Because mismatching clients (blocking for
|
||||||
|
* a different type compared to the current key type) are moved in the
|
||||||
|
* other side of the linked list. However as long as the key starts to
|
||||||
|
* be used only for a single type, like virtually any Redis application will
|
||||||
|
* do, the function is already fair. */
|
||||||
|
void handleClientsBlockedOnKeys(void) {
|
||||||
|
while(listLength(server.ready_keys) != 0) {
|
||||||
|
list *l;
|
||||||
|
|
||||||
|
/* Point server.ready_keys to a fresh list and save the current one
|
||||||
|
* locally. This way as we run the old list we are free to call
|
||||||
|
* signalKeyAsReady() that may push new elements in server.ready_keys
|
||||||
|
* when handling clients blocked into BRPOPLPUSH. */
|
||||||
|
l = server.ready_keys;
|
||||||
|
server.ready_keys = listCreate();
|
||||||
|
|
||||||
|
while(listLength(l) != 0) {
|
||||||
|
listNode *ln = listFirst(l);
|
||||||
|
readyList *rl = ln->value;
|
||||||
|
|
||||||
|
/* First of all remove this key from db->ready_keys so that
|
||||||
|
* we can safely call signalKeyAsReady() against this key. */
|
||||||
|
dictDelete(rl->db->ready_keys,rl->key);
|
||||||
|
|
||||||
|
/* Serve clients blocked on list key. */
|
||||||
|
robj *o = lookupKeyWrite(rl->db,rl->key);
|
||||||
|
|
||||||
|
if (o != NULL) {
|
||||||
|
if (o->type == OBJ_LIST)
|
||||||
|
serveClientsBlockedOnListKey(o,rl);
|
||||||
|
else if (o->type == OBJ_ZSET)
|
||||||
|
serveClientsBlockedOnSortedSetKey(o,rl);
|
||||||
|
else if (o->type == OBJ_STREAM)
|
||||||
|
serveClientsBlockedOnStreamKey(o,rl);
|
||||||
|
}
|
||||||
|
|
||||||
/* Free this item. */
|
/* Free this item. */
|
||||||
decrRefCount(rl->key);
|
decrRefCount(rl->key);
|
||||||
zfree(rl);
|
zfree(rl);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user