ae.c initial refactoring for epoll implementation
This commit is contained in:
parent
a96ffc6641
commit
5b2a1c292a
9
TODO
9
TODO
@ -6,8 +6,11 @@ Most of the features already implemented for this release. The following is a li
|
||||
* For now only the last argument gets integer encoded, so make sure that: 1) every multi bulk commands implemented will have the last arg that is indeed a value, and not used otherwise. 2) to explicitly call the function to encode the object in MSET and other commands where there are multiple "values".
|
||||
* Man pages for MSET MSETNX and SRANDMEMBER, missing Z-commands, ...
|
||||
* Use strcoll() to compare objects in sorted sets, like it already happens for SORT.
|
||||
* Write docs for the "STORE" operaiton of SORT, and GET "#" option.
|
||||
* Write docs for the "STORE" operaiton of SORT. Link to the article about SORT by written by defunkt.
|
||||
* Append only mode: testing and a command to rebuild the log from scratch.
|
||||
* ZRANGEBYSCORE test, ZRANGEBYSCORE LIMIT option.
|
||||
* Sorted sets infinity tests.
|
||||
* Support for epool in ae.c.
|
||||
|
||||
VERSION 1.2 TODO (Hash type)
|
||||
============================
|
||||
@ -25,6 +28,10 @@ VERSION 1.4 TODO (Fault tollerant sharding)
|
||||
|
||||
* Redis-cluster, a fast intermediate layer (proxy) that implements consistent hashing and fault tollerant nodes handling.
|
||||
|
||||
Interesting readings about this:
|
||||
|
||||
- http://ayende.com/Blog/archive/2009/04/06/designing-rhino-dht-a-fault-tolerant-dynamically-distributed-hash.aspx
|
||||
|
||||
VERSION 1.5 TODO (Optimizations and latency)
|
||||
============================================
|
||||
|
||||
|
97
ae.c
97
ae.c
@ -167,7 +167,12 @@ int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
||||
* put in sleep without to delay any event.
|
||||
* If there are no timers NULL is returned.
|
||||
*
|
||||
* Note that's O(N) since time events are unsorted. */
|
||||
* Note that's O(N) since time events are unsorted.
|
||||
* Possible optimizations (not needed by Redis so far, but...):
|
||||
* 1) Insert the event in order, so that the nearest is just the head.
|
||||
* Much better but still insertion or deletion of timers is O(N).
|
||||
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
|
||||
*/
|
||||
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
|
||||
{
|
||||
aeTimeEvent *te = eventLoop->timeEventHead;
|
||||
@ -183,6 +188,57 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
|
||||
return nearest;
|
||||
}
|
||||
|
||||
/* Process time events */
|
||||
static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||
int processed = 0;
|
||||
aeTimeEvent *te;
|
||||
long long maxId;
|
||||
|
||||
te = eventLoop->timeEventHead;
|
||||
maxId = eventLoop->timeEventNextId-1;
|
||||
while(te) {
|
||||
long now_sec, now_ms;
|
||||
long long id;
|
||||
|
||||
if (te->id > maxId) {
|
||||
te = te->next;
|
||||
continue;
|
||||
}
|
||||
aeGetTime(&now_sec, &now_ms);
|
||||
if (now_sec > te->when_sec ||
|
||||
(now_sec == te->when_sec && now_ms >= te->when_ms))
|
||||
{
|
||||
int retval;
|
||||
|
||||
id = te->id;
|
||||
retval = te->timeProc(eventLoop, id, te->clientData);
|
||||
processed++;
|
||||
/* After an event is processed our time event list may
|
||||
* no longer be the same, so we restart from head.
|
||||
* Still we make sure to don't process events registered
|
||||
* by event handlers itself in order to don't loop forever.
|
||||
* To do so we saved the max ID we want to handle.
|
||||
*
|
||||
* FUTURE OPTIMIZATIONS:
|
||||
* Note that this is NOT great algorithmically. Redis uses
|
||||
* a single time event so it's not a problem but the right
|
||||
* way to do this is to add the new elements on head, and
|
||||
* to flag deleted elements in a special way for later
|
||||
* deletion (putting references to the nodes to delete into
|
||||
* another linked list). */
|
||||
if (retval != AE_NOMORE) {
|
||||
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
|
||||
} else {
|
||||
aeDeleteTimeEvent(eventLoop, id);
|
||||
}
|
||||
te = eventLoop->timeEventHead;
|
||||
} else {
|
||||
te = te->next;
|
||||
}
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
/* Process every pending time event, then every pending file event
|
||||
* (that may be registered by time event callbacks just processed).
|
||||
* Without special flags the function sleeps until some file event
|
||||
@ -201,9 +257,6 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
int maxfd = 0, numfd = 0, processed = 0;
|
||||
fd_set rfds, wfds, efds;
|
||||
aeFileEvent *fe = eventLoop->fileEventHead;
|
||||
aeTimeEvent *te;
|
||||
long long maxId;
|
||||
AE_NOTUSED(flags);
|
||||
|
||||
/* Nothing to do? return ASAP */
|
||||
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
|
||||
@ -296,41 +349,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
}
|
||||
}
|
||||
/* Check time events */
|
||||
if (flags & AE_TIME_EVENTS) {
|
||||
te = eventLoop->timeEventHead;
|
||||
maxId = eventLoop->timeEventNextId-1;
|
||||
while(te) {
|
||||
long now_sec, now_ms;
|
||||
long long id;
|
||||
if (flags & AE_TIME_EVENTS)
|
||||
processed += processTimeEvents(eventLoop);
|
||||
|
||||
if (te->id > maxId) {
|
||||
te = te->next;
|
||||
continue;
|
||||
}
|
||||
aeGetTime(&now_sec, &now_ms);
|
||||
if (now_sec > te->when_sec ||
|
||||
(now_sec == te->when_sec && now_ms >= te->when_ms))
|
||||
{
|
||||
int retval;
|
||||
|
||||
id = te->id;
|
||||
retval = te->timeProc(eventLoop, id, te->clientData);
|
||||
/* After an event is processed our time event list may
|
||||
* no longer be the same, so we restart from head.
|
||||
* Still we make sure to don't process events registered
|
||||
* by event handlers itself in order to don't loop forever.
|
||||
* To do so we saved the max ID we want to handle. */
|
||||
if (retval != AE_NOMORE) {
|
||||
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
|
||||
} else {
|
||||
aeDeleteTimeEvent(eventLoop, id);
|
||||
}
|
||||
te = eventLoop->timeEventHead;
|
||||
} else {
|
||||
te = te->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
return processed; /* return the number of processed file/time events */
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user