Move to struct.
This commit is contained in:
parent
b2a7fd0cf7
commit
357a841714
@ -41,8 +41,10 @@ redisClient *createClient(int fd) {
|
||||
c->reply = listCreate();
|
||||
listSetFreeMethod(c->reply,decrRefCount);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
c->blocking_keys = NULL;
|
||||
c->blocking_keys_num = 0;
|
||||
c->bstate.keys = NULL;
|
||||
c->bstate.count = 0;
|
||||
c->bstate.timeout = 0;
|
||||
c->bstate.target = NULL;
|
||||
c->io_keys = listCreate();
|
||||
c->watched_keys = listCreate();
|
||||
listSetFreeMethod(c->io_keys,decrRefCount);
|
||||
@ -677,7 +679,7 @@ void closeTimedoutClients(void) {
|
||||
redisLog(REDIS_VERBOSE,"Closing idle client");
|
||||
freeClient(c);
|
||||
} else if (c->flags & REDIS_BLOCKED) {
|
||||
if (c->blockingto != 0 && c->blockingto < now) {
|
||||
if (c->bstate.timeout != 0 && c->bstate.timeout < now) {
|
||||
addReply(c,shared.nullmultibulk);
|
||||
unblockClientWaitingData(c);
|
||||
}
|
||||
|
17
src/redis.h
17
src/redis.h
@ -293,6 +293,16 @@ typedef struct multiState {
|
||||
int count; /* Total number of MULTI commands */
|
||||
} multiState;
|
||||
|
||||
typedef struct blockingState {
|
||||
robj **keys; /* The key we are waiting to terminate a blocking
|
||||
* operation such as BLPOP. Otherwise NULL. */
|
||||
int count; /* Number of blocking keys */
|
||||
time_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||
* is >= timeout then the operation timed out. */
|
||||
robj *target; /* The key that should receive the element,
|
||||
* for BRPOPLPUSH. */
|
||||
} blockingState;
|
||||
|
||||
/* With multiplexing we need to take per-clinet state.
|
||||
* Clients are taken in a liked list. */
|
||||
typedef struct redisClient {
|
||||
@ -316,12 +326,7 @@ typedef struct redisClient {
|
||||
long repldboff; /* replication DB file offset */
|
||||
off_t repldbsize; /* replication DB file size */
|
||||
multiState mstate; /* MULTI/EXEC state */
|
||||
robj **blocking_keys; /* The key we are waiting to terminate a blocking
|
||||
* operation such as BLPOP. Otherwise NULL. */
|
||||
int blocking_keys_num; /* Number of blocking keys */
|
||||
time_t blockingto; /* Blocking operation timeout. If UNIX current time
|
||||
* is >= blockingto then the operation timed out. */
|
||||
robj *blocking_target;
|
||||
blockingState bstate; /* blocking state */
|
||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||
* swap file in order to continue. */
|
||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||
|
36
src/t_list.c
36
src/t_list.c
@ -694,12 +694,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
|
||||
list *l;
|
||||
int j;
|
||||
|
||||
c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
|
||||
c->blocking_keys_num = numkeys;
|
||||
c->blockingto = timeout;
|
||||
c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
|
||||
c->bstate.count = numkeys;
|
||||
c->bstate.timeout = timeout;
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
/* Add the key in the client structure, to map clients -> keys */
|
||||
c->blocking_keys[j] = keys[j];
|
||||
c->bstate.keys[j] = keys[j];
|
||||
incrRefCount(keys[j]);
|
||||
|
||||
/* And in the other "side", to map keys -> clients */
|
||||
@ -728,22 +728,22 @@ void unblockClientWaitingData(redisClient *c) {
|
||||
list *l;
|
||||
int j;
|
||||
|
||||
redisAssert(c->blocking_keys != NULL);
|
||||
redisAssert(c->bstate.keys != NULL);
|
||||
/* The client may wait for multiple keys, so unblock it for every key. */
|
||||
for (j = 0; j < c->blocking_keys_num; j++) {
|
||||
for (j = 0; j < c->bstate.count; j++) {
|
||||
/* Remove this client from the list of clients waiting for this key. */
|
||||
de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
|
||||
de = dictFind(c->db->blocking_keys,c->bstate.keys[j]);
|
||||
redisAssert(de != NULL);
|
||||
l = dictGetEntryVal(de);
|
||||
listDelNode(l,listSearchKey(l,c));
|
||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||
if (listLength(l) == 0)
|
||||
dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
|
||||
decrRefCount(c->blocking_keys[j]);
|
||||
dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
|
||||
decrRefCount(c->bstate.keys[j]);
|
||||
}
|
||||
/* Cleanup the client structure */
|
||||
zfree(c->blocking_keys);
|
||||
c->blocking_keys = NULL;
|
||||
zfree(c->bstate.keys);
|
||||
c->bstate.keys = NULL;
|
||||
c->flags &= (~REDIS_BLOCKED);
|
||||
server.blpop_blocked_clients--;
|
||||
/* We want to process data if there is some command waiting
|
||||
@ -777,7 +777,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
||||
redisAssert(ln != NULL);
|
||||
receiver = ln->value;
|
||||
|
||||
if (receiver->blocking_target == NULL) {
|
||||
if (receiver->bstate.target == NULL) {
|
||||
addReplyMultiBulkLen(receiver,2);
|
||||
addReplyBulk(receiver,key);
|
||||
addReplyBulk(receiver,ele);
|
||||
@ -785,7 +785,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
||||
else {
|
||||
receiver->argc++;
|
||||
|
||||
robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target);
|
||||
robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
|
||||
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
|
||||
|
||||
addReplyBulk(receiver,ele);
|
||||
@ -793,7 +793,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
|
||||
/* Create the list if the key does not exist */
|
||||
if (!dobj) {
|
||||
dobj = createZiplistObject();
|
||||
dbAdd(receiver->db,receiver->blocking_target,dobj);
|
||||
dbAdd(receiver->db,receiver->bstate.target,dobj);
|
||||
}
|
||||
|
||||
listTypePush(dobj,ele,REDIS_HEAD);
|
||||
@ -833,7 +833,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
|
||||
robj *argv[2], **orig_argv;
|
||||
int orig_argc;
|
||||
|
||||
if (c->blocking_target == NULL) {
|
||||
if (c->bstate.target == NULL) {
|
||||
/* We need to alter the command arguments before to call
|
||||
* popGenericCommand() as the command takes a single key. */
|
||||
orig_argv = c->argv;
|
||||
@ -857,8 +857,8 @@ void blockingPopGenericCommand(redisClient *c, int where) {
|
||||
c->argc = orig_argc;
|
||||
}
|
||||
else {
|
||||
c->argv[2] = c->blocking_target;
|
||||
c->blocking_target = NULL;
|
||||
c->argv[2] = c->bstate.target;
|
||||
c->bstate.target = NULL;
|
||||
|
||||
rpoplpushCommand(c);
|
||||
}
|
||||
@ -891,7 +891,7 @@ void brpopCommand(redisClient *c) {
|
||||
}
|
||||
|
||||
void brpoplpushCommand(redisClient *c) {
|
||||
c->blocking_target = c->argv[2];
|
||||
c->bstate.target = c->argv[2];
|
||||
c->argv[2] = c->argv[3];
|
||||
c->argc--;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user