Process async client checks like client timeouts and BLPOP timeouts incrementally using a circular list.
This commit is contained in:
parent
bbaeda402c
commit
d19015be12
16
src/adlist.c
16
src/adlist.c
@ -323,3 +323,19 @@ listNode *listIndex(list *list, long index) {
|
|||||||
}
|
}
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Rotate the list removing the tail node and inserting it to the head. */
|
||||||
|
void listRotate(list *list) {
|
||||||
|
listNode *tail = list->tail;
|
||||||
|
|
||||||
|
if (listLength(list) <= 1) return;
|
||||||
|
|
||||||
|
/* Detatch current tail */
|
||||||
|
list->tail = tail->prev;
|
||||||
|
list->tail->next = NULL;
|
||||||
|
/* Move it as head */
|
||||||
|
list->head->prev = tail;
|
||||||
|
tail->prev = NULL;
|
||||||
|
tail->next = list->head;
|
||||||
|
list->head = tail;
|
||||||
|
}
|
||||||
|
@ -84,6 +84,7 @@ listNode *listSearchKey(list *list, void *key);
|
|||||||
listNode *listIndex(list *list, long index);
|
listNode *listIndex(list *list, long index);
|
||||||
void listRewind(list *list, listIter *li);
|
void listRewind(list *list, listIter *li);
|
||||||
void listRewindTail(list *list, listIter *li);
|
void listRewindTail(list *list, listIter *li);
|
||||||
|
void listRotate(list *list);
|
||||||
|
|
||||||
/* Directions for iterators */
|
/* Directions for iterators */
|
||||||
#define AL_START_HEAD 0
|
#define AL_START_HEAD 0
|
||||||
|
@ -751,34 +751,6 @@ void resetClient(redisClient *c) {
|
|||||||
if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
|
if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeTimedoutClients(void) {
|
|
||||||
redisClient *c;
|
|
||||||
listNode *ln;
|
|
||||||
time_t now = time(NULL);
|
|
||||||
listIter li;
|
|
||||||
|
|
||||||
listRewind(server.clients,&li);
|
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
|
||||||
c = listNodeValue(ln);
|
|
||||||
if (server.maxidletime &&
|
|
||||||
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
|
|
||||||
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
|
|
||||||
!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
|
|
||||||
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
|
|
||||||
listLength(c->pubsub_patterns) == 0 &&
|
|
||||||
(now - c->lastinteraction > server.maxidletime))
|
|
||||||
{
|
|
||||||
redisLog(REDIS_VERBOSE,"Closing idle client");
|
|
||||||
freeClient(c);
|
|
||||||
} else if (c->flags & REDIS_BLOCKED) {
|
|
||||||
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
|
|
||||||
addReply(c,shared.nullmultibulk);
|
|
||||||
unblockClientWaitingData(c);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int processInlineBuffer(redisClient *c) {
|
int processInlineBuffer(redisClient *c) {
|
||||||
char *newline = strstr(c->querybuf,"\r\n");
|
char *newline = strstr(c->querybuf,"\r\n");
|
||||||
int argc, j;
|
int argc, j;
|
||||||
|
49
src/redis.c
49
src/redis.c
@ -641,6 +641,50 @@ long long getOperationsPerSecond(void) {
|
|||||||
return sum / REDIS_OPS_SEC_SAMPLES;
|
return sum / REDIS_OPS_SEC_SAMPLES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void closeTimedoutClient(redisClient *c) {
|
||||||
|
time_t now = time(NULL);
|
||||||
|
|
||||||
|
if (server.maxidletime &&
|
||||||
|
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
|
||||||
|
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
|
||||||
|
!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
|
||||||
|
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
|
||||||
|
listLength(c->pubsub_patterns) == 0 &&
|
||||||
|
(now - c->lastinteraction > server.maxidletime))
|
||||||
|
{
|
||||||
|
redisLog(REDIS_VERBOSE,"Closing idle client");
|
||||||
|
freeClient(c);
|
||||||
|
} else if (c->flags & REDIS_BLOCKED) {
|
||||||
|
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
|
||||||
|
addReply(c,shared.nullmultibulk);
|
||||||
|
unblockClientWaitingData(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void clientsCron(void) {
|
||||||
|
/* Make sure to process at least 1/100 of clients per call.
|
||||||
|
* Since this function is called 10 times per second we are sure that
|
||||||
|
* in the worst case we process all the clients in 10 seconds.
|
||||||
|
* In normal conditions (a reasonable number of clients) we process
|
||||||
|
* all the clients in a shorter time. */
|
||||||
|
int iterations = listLength(server.clients)/100;
|
||||||
|
if (iterations < 50) iterations = 50;
|
||||||
|
|
||||||
|
while(listLength(server.clients) && iterations--) {
|
||||||
|
redisClient *c;
|
||||||
|
listNode *head;
|
||||||
|
|
||||||
|
/* Rotate the list, take the current head, process.
|
||||||
|
* This way if the client must be removed from the list it's the
|
||||||
|
* first element and we don't incur into O(N) computation. */
|
||||||
|
listRotate(server.clients);
|
||||||
|
head = listFirst(server.clients);
|
||||||
|
c = listNodeValue(head);
|
||||||
|
closeTimedoutClient(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||||
int j, loops = server.cronloops;
|
int j, loops = server.cronloops;
|
||||||
REDIS_NOTUSED(eventLoop);
|
REDIS_NOTUSED(eventLoop);
|
||||||
@ -712,9 +756,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
zmalloc_used_memory());
|
zmalloc_used_memory());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Close connections of timedout clients */
|
/* We need to do a few operations on clients asynchronously. */
|
||||||
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
|
clientsCron();
|
||||||
closeTimedoutClients();
|
|
||||||
|
|
||||||
/* Start a scheduled AOF rewrite if this was requested by the user while
|
/* Start a scheduled AOF rewrite if this was requested by the user while
|
||||||
* a BGSAVE was in progress. */
|
* a BGSAVE was in progress. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user