Initial work of multithreaded key-db. Note: Fails tests
This commit is contained in:
parent
988ea40101
commit
d62178ec8c
223
src/ae.cpp
223
src/ae.cpp
@ -30,7 +30,9 @@
|
|||||||
* POSSIBILITY OF SUCH DAMAGE.
|
* POSSIBILITY OF SUCH DAMAGE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <fcntl.h>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@ -41,11 +43,17 @@
|
|||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
|
||||||
#include "ae.h"
|
#include "ae.h"
|
||||||
|
#include "fastlock.h"
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include "zmalloc.h"
|
#include "zmalloc.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fastlock g_lock;
|
||||||
|
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
||||||
|
|
||||||
|
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0)
|
||||||
|
|
||||||
/* Include the best multiplexing layer supported by this system.
|
/* Include the best multiplexing layer supported by this system.
|
||||||
* The following should be ordered by performances, descending. */
|
* The following should be ordered by performances, descending. */
|
||||||
#ifdef HAVE_EVPORT
|
#ifdef HAVE_EVPORT
|
||||||
@ -62,6 +70,59 @@ extern "C" {
|
|||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
enum class AE_ASYNC_OP
|
||||||
|
{
|
||||||
|
PostFunction,
|
||||||
|
DeleteFileEvent,
|
||||||
|
};
|
||||||
|
typedef struct aeCommand
|
||||||
|
{
|
||||||
|
AE_ASYNC_OP op;
|
||||||
|
int fd;
|
||||||
|
int mask;
|
||||||
|
aePostFunctionProc *proc;
|
||||||
|
void *clientData;
|
||||||
|
} aeCommand;
|
||||||
|
|
||||||
|
void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||||
|
{
|
||||||
|
aeCommand cmd;
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
auto cb = read(fd, &cmd, sizeof(aeCommand));
|
||||||
|
if (cb != sizeof(cmd))
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
break;
|
||||||
|
fprintf(stderr, "Failed to read pipe.\n");
|
||||||
|
}
|
||||||
|
switch (cmd.op)
|
||||||
|
{
|
||||||
|
case AE_ASYNC_OP::DeleteFileEvent:
|
||||||
|
aeDeleteFileEvent(eventLoop, cmd.fd, cmd.mask);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case AE_ASYNC_OP::PostFunction:
|
||||||
|
{
|
||||||
|
std::unique_lock<decltype(g_lock)> ulock(g_lock);
|
||||||
|
((aePostFunctionProc*)cmd.proc)(cmd.clientData);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
||||||
|
{
|
||||||
|
aeCommand cmd;
|
||||||
|
cmd.op = AE_ASYNC_OP::PostFunction;
|
||||||
|
cmd.proc = proc;
|
||||||
|
cmd.clientData = arg;
|
||||||
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
AE_ASSERT(size == sizeof(cmd));
|
||||||
|
return AE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
aeEventLoop *aeCreateEventLoop(int setsize) {
|
aeEventLoop *aeCreateEventLoop(int setsize) {
|
||||||
aeEventLoop *eventLoop;
|
aeEventLoop *eventLoop;
|
||||||
int i;
|
int i;
|
||||||
@ -83,6 +144,16 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
|
|||||||
* vector with it. */
|
* vector with it. */
|
||||||
for (i = 0; i < setsize; i++)
|
for (i = 0; i < setsize; i++)
|
||||||
eventLoop->events[i].mask = AE_NONE;
|
eventLoop->events[i].mask = AE_NONE;
|
||||||
|
|
||||||
|
fastlock_init(&eventLoop->flock);
|
||||||
|
int rgfd[2];
|
||||||
|
if (pipe(rgfd) < 0)
|
||||||
|
goto err;
|
||||||
|
eventLoop->fdCmdRead = rgfd[0];
|
||||||
|
eventLoop->fdCmdWrite = rgfd[1];
|
||||||
|
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
|
||||||
|
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_THREADSAFE, aeProcessCmd, NULL);
|
||||||
|
|
||||||
return eventLoop;
|
return eventLoop;
|
||||||
|
|
||||||
err:
|
err:
|
||||||
@ -107,6 +178,7 @@ int aeGetSetSize(aeEventLoop *eventLoop) {
|
|||||||
*
|
*
|
||||||
* Otherwise AE_OK is returned and the operation is successful. */
|
* Otherwise AE_OK is returned and the operation is successful. */
|
||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
if (setsize == eventLoop->setsize) return AE_OK;
|
if (setsize == eventLoop->setsize) return AE_OK;
|
||||||
@ -129,19 +201,25 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) {
|
|||||||
zfree(eventLoop->events);
|
zfree(eventLoop->events);
|
||||||
zfree(eventLoop->fired);
|
zfree(eventLoop->fired);
|
||||||
zfree(eventLoop);
|
zfree(eventLoop);
|
||||||
|
fastlock_free(&eventLoop->flock);
|
||||||
|
close(eventLoop->fdCmdRead);
|
||||||
|
close(eventLoop->fdCmdWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
eventLoop->stop = 1;
|
eventLoop->stop = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData)
|
aeFileProc *proc, void *clientData)
|
||||||
{
|
{
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
if (fd >= eventLoop->setsize) {
|
if (fd >= eventLoop->setsize) {
|
||||||
errno = ERANGE;
|
errno = ERANGE;
|
||||||
return AE_ERR;
|
return AE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
aeFileEvent *fe = &eventLoop->events[fd];
|
aeFileEvent *fe = &eventLoop->events[fd];
|
||||||
|
|
||||||
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
|
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
|
||||||
@ -155,8 +233,22 @@ extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
return AE_OK;
|
return AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
|
||||||
|
{
|
||||||
|
if (eventLoop == g_eventLoopThisThread)
|
||||||
|
return aeDeleteFileEvent(eventLoop, fd, mask);
|
||||||
|
aeCommand cmd;
|
||||||
|
cmd.op = AE_ASYNC_OP::DeleteFileEvent;
|
||||||
|
cmd.fd = fd;
|
||||||
|
cmd.mask = mask;
|
||||||
|
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
if (cb != sizeof(cmd))
|
||||||
|
fprintf(stderr, "Failed to write to pipe.\n");
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
||||||
{
|
{
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
if (fd >= eventLoop->setsize) return;
|
if (fd >= eventLoop->setsize) return;
|
||||||
aeFileEvent *fe = &eventLoop->events[fd];
|
aeFileEvent *fe = &eventLoop->events[fd];
|
||||||
if (fe->mask == AE_NONE) return;
|
if (fe->mask == AE_NONE) return;
|
||||||
@ -178,6 +270,7 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
|||||||
}
|
}
|
||||||
|
|
||||||
extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
|
extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
if (fd >= eventLoop->setsize) return 0;
|
if (fd >= eventLoop->setsize) return 0;
|
||||||
aeFileEvent *fe = &eventLoop->events[fd];
|
aeFileEvent *fe = &eventLoop->events[fd];
|
||||||
|
|
||||||
@ -211,6 +304,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise
|
|||||||
aeTimeProc *proc, void *clientData,
|
aeTimeProc *proc, void *clientData,
|
||||||
aeEventFinalizerProc *finalizerProc)
|
aeEventFinalizerProc *finalizerProc)
|
||||||
{
|
{
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
long long id = eventLoop->timeEventNextId++;
|
long long id = eventLoop->timeEventNextId++;
|
||||||
aeTimeEvent *te;
|
aeTimeEvent *te;
|
||||||
|
|
||||||
@ -231,6 +325,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise
|
|||||||
|
|
||||||
extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
||||||
{
|
{
|
||||||
|
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
aeTimeEvent *te = eventLoop->timeEventHead;
|
aeTimeEvent *te = eventLoop->timeEventHead;
|
||||||
while(te) {
|
while(te) {
|
||||||
if (te->id == id) {
|
if (te->id == id) {
|
||||||
@ -270,6 +365,7 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
|
|||||||
|
|
||||||
/* Process time events */
|
/* Process time events */
|
||||||
static int processTimeEvents(aeEventLoop *eventLoop) {
|
static int processTimeEvents(aeEventLoop *eventLoop) {
|
||||||
|
std::unique_lock<decltype(g_lock)> ulock(g_lock);
|
||||||
int processed = 0;
|
int processed = 0;
|
||||||
aeTimeEvent *te;
|
aeTimeEvent *te;
|
||||||
long long maxId;
|
long long maxId;
|
||||||
@ -343,6 +439,62 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
|
|||||||
return processed;
|
return processed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int mask, int fd)
|
||||||
|
{
|
||||||
|
#define LOCK_IF_NECESSARY(fe) \
|
||||||
|
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
|
||||||
|
if (!(fe->mask & AE_THREADSAFE)) \
|
||||||
|
ulock.lock()
|
||||||
|
|
||||||
|
int fired = 0; /* Number of events fired for current fd. */
|
||||||
|
|
||||||
|
/* Normally we execute the readable event first, and the writable
|
||||||
|
* event laster. This is useful as sometimes we may be able
|
||||||
|
* to serve the reply of a query immediately after processing the
|
||||||
|
* query.
|
||||||
|
*
|
||||||
|
* However if AE_BARRIER is set in the mask, our application is
|
||||||
|
* asking us to do the reverse: never fire the writable event
|
||||||
|
* after the readable. In such a case, we invert the calls.
|
||||||
|
* This is useful when, for instance, we want to do things
|
||||||
|
* in the beforeSleep() hook, like fsynching a file to disk,
|
||||||
|
* before replying to a client. */
|
||||||
|
int invert = fe->mask & AE_BARRIER;
|
||||||
|
|
||||||
|
/* Note the "fe->mask & mask & ..." code: maybe an already
|
||||||
|
* processed event removed an element that fired and we still
|
||||||
|
* didn't processed, so we check if the event is still valid.
|
||||||
|
*
|
||||||
|
* Fire the readable event if the call sequence is not
|
||||||
|
* inverted. */
|
||||||
|
if (!invert && fe->mask & mask & AE_READABLE) {
|
||||||
|
LOCK_IF_NECESSARY(fe);
|
||||||
|
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
||||||
|
fired++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Fire the writable event. */
|
||||||
|
if (fe->mask & mask & AE_WRITABLE) {
|
||||||
|
if (!fired || fe->wfileProc != fe->rfileProc) {
|
||||||
|
LOCK_IF_NECESSARY(fe);
|
||||||
|
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
|
||||||
|
fired++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If we have to invert the call, fire the readable event now
|
||||||
|
* after the writable one. */
|
||||||
|
if (invert && fe->mask & mask & AE_READABLE) {
|
||||||
|
if (!fired || fe->wfileProc != fe->rfileProc) {
|
||||||
|
LOCK_IF_NECESSARY(fe);
|
||||||
|
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
||||||
|
fired++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#undef LOCK_IF_NECESSARY
|
||||||
|
}
|
||||||
|
|
||||||
/* Process every pending time event, then every pending file event
|
/* Process every pending time event, then every pending file event
|
||||||
* (that may be registered by time event callbacks just processed).
|
* (that may be registered by time event callbacks just processed).
|
||||||
* Without special flags the function sleeps until some file event
|
* Without special flags the function sleeps until some file event
|
||||||
@ -413,55 +565,19 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
|||||||
numevents = aeApiPoll(eventLoop, tvp);
|
numevents = aeApiPoll(eventLoop, tvp);
|
||||||
|
|
||||||
/* After sleep callback. */
|
/* After sleep callback. */
|
||||||
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
|
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
|
||||||
|
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||||
|
if (!(eventLoop->beforesleepFlags & AE_THREADSAFE))
|
||||||
|
ulock.lock();
|
||||||
eventLoop->aftersleep(eventLoop);
|
eventLoop->aftersleep(eventLoop);
|
||||||
|
}
|
||||||
|
|
||||||
for (j = 0; j < numevents; j++) {
|
for (j = 0; j < numevents; j++) {
|
||||||
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
|
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
|
||||||
int mask = eventLoop->fired[j].mask;
|
int mask = eventLoop->fired[j].mask;
|
||||||
int fd = eventLoop->fired[j].fd;
|
int fd = eventLoop->fired[j].fd;
|
||||||
int fired = 0; /* Number of events fired for current fd. */
|
|
||||||
|
|
||||||
/* Normally we execute the readable event first, and the writable
|
ProcessEventCore(eventLoop, fe, mask, fd);
|
||||||
* event laster. This is useful as sometimes we may be able
|
|
||||||
* to serve the reply of a query immediately after processing the
|
|
||||||
* query.
|
|
||||||
*
|
|
||||||
* However if AE_BARRIER is set in the mask, our application is
|
|
||||||
* asking us to do the reverse: never fire the writable event
|
|
||||||
* after the readable. In such a case, we invert the calls.
|
|
||||||
* This is useful when, for instance, we want to do things
|
|
||||||
* in the beforeSleep() hook, like fsynching a file to disk,
|
|
||||||
* before replying to a client. */
|
|
||||||
int invert = fe->mask & AE_BARRIER;
|
|
||||||
|
|
||||||
/* Note the "fe->mask & mask & ..." code: maybe an already
|
|
||||||
* processed event removed an element that fired and we still
|
|
||||||
* didn't processed, so we check if the event is still valid.
|
|
||||||
*
|
|
||||||
* Fire the readable event if the call sequence is not
|
|
||||||
* inverted. */
|
|
||||||
if (!invert && fe->mask & mask & AE_READABLE) {
|
|
||||||
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
|
||||||
fired++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Fire the writable event. */
|
|
||||||
if (fe->mask & mask & AE_WRITABLE) {
|
|
||||||
if (!fired || fe->wfileProc != fe->rfileProc) {
|
|
||||||
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
|
|
||||||
fired++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If we have to invert the call, fire the readable event now
|
|
||||||
* after the writable one. */
|
|
||||||
if (invert && fe->mask & mask & AE_READABLE) {
|
|
||||||
if (!fired || fe->wfileProc != fe->rfileProc) {
|
|
||||||
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
|
|
||||||
fired++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processed++;
|
processed++;
|
||||||
}
|
}
|
||||||
@ -497,9 +613,14 @@ int aeWait(int fd, int mask, long long milliseconds) {
|
|||||||
|
|
||||||
void aeMain(aeEventLoop *eventLoop) {
|
void aeMain(aeEventLoop *eventLoop) {
|
||||||
eventLoop->stop = 0;
|
eventLoop->stop = 0;
|
||||||
|
g_eventLoopThisThread = eventLoop;
|
||||||
while (!eventLoop->stop) {
|
while (!eventLoop->stop) {
|
||||||
if (eventLoop->beforesleep != NULL)
|
if (eventLoop->beforesleep != NULL) {
|
||||||
|
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||||
|
if (!(eventLoop->beforesleepFlags & AE_THREADSAFE))
|
||||||
|
ulock.lock();
|
||||||
eventLoop->beforesleep(eventLoop);
|
eventLoop->beforesleep(eventLoop);
|
||||||
|
}
|
||||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -508,10 +629,22 @@ const char *aeGetApiName(void) {
|
|||||||
return aeApiName();
|
return aeApiName();
|
||||||
}
|
}
|
||||||
|
|
||||||
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
|
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags) {
|
||||||
eventLoop->beforesleep = beforesleep;
|
eventLoop->beforesleep = beforesleep;
|
||||||
|
eventLoop->beforesleepFlags = flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
|
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags) {
|
||||||
eventLoop->aftersleep = aftersleep;
|
eventLoop->aftersleep = aftersleep;
|
||||||
|
eventLoop->aftersleepFlags = flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void aeAcquireLock()
|
||||||
|
{
|
||||||
|
g_lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void aeReleaseLock()
|
||||||
|
{
|
||||||
|
g_lock.unlock();
|
||||||
|
}
|
16
src/ae.h
16
src/ae.h
@ -34,6 +34,7 @@
|
|||||||
#define __AE_H__
|
#define __AE_H__
|
||||||
|
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
#include "fastlock.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
@ -71,6 +72,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData,
|
|||||||
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
|
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
|
||||||
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
|
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
|
||||||
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
|
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
|
||||||
|
typedef void aePostFunctionProc(void *pvArgs);
|
||||||
|
|
||||||
/* File event structure */
|
/* File event structure */
|
||||||
typedef struct aeFileEvent {
|
typedef struct aeFileEvent {
|
||||||
@ -110,16 +112,23 @@ typedef struct aeEventLoop {
|
|||||||
int stop;
|
int stop;
|
||||||
void *apidata; /* This is used for polling API specific data */
|
void *apidata; /* This is used for polling API specific data */
|
||||||
aeBeforeSleepProc *beforesleep;
|
aeBeforeSleepProc *beforesleep;
|
||||||
|
int beforesleepFlags;
|
||||||
aeBeforeSleepProc *aftersleep;
|
aeBeforeSleepProc *aftersleep;
|
||||||
|
int aftersleepFlags;
|
||||||
|
struct fastlock flock;
|
||||||
|
int fdCmdWrite;
|
||||||
|
int fdCmdRead;
|
||||||
} aeEventLoop;
|
} aeEventLoop;
|
||||||
|
|
||||||
/* Prototypes */
|
/* Prototypes */
|
||||||
aeEventLoop *aeCreateEventLoop(int setsize);
|
aeEventLoop *aeCreateEventLoop(int setsize);
|
||||||
|
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
|
||||||
void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
||||||
void aeStop(aeEventLoop *eventLoop);
|
void aeStop(aeEventLoop *eventLoop);
|
||||||
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData);
|
aeFileProc *proc, void *clientData);
|
||||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||||
|
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
|
||||||
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
|
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
|
||||||
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
|
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
|
||||||
aeTimeProc *proc, void *clientData,
|
aeTimeProc *proc, void *clientData,
|
||||||
@ -129,11 +138,14 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags);
|
|||||||
int aeWait(int fd, int mask, long long milliseconds);
|
int aeWait(int fd, int mask, long long milliseconds);
|
||||||
void aeMain(aeEventLoop *eventLoop);
|
void aeMain(aeEventLoop *eventLoop);
|
||||||
const char *aeGetApiName(void);
|
const char *aeGetApiName(void);
|
||||||
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
|
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags);
|
||||||
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
|
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags);
|
||||||
int aeGetSetSize(aeEventLoop *eventLoop);
|
int aeGetSetSize(aeEventLoop *eventLoop);
|
||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
||||||
|
|
||||||
|
void aeAcquireLock();
|
||||||
|
void aeReleaseLock();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -83,7 +83,11 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
|
|||||||
if (mask & AE_READABLE) ee.events |= EPOLLIN;
|
if (mask & AE_READABLE) ee.events |= EPOLLIN;
|
||||||
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
|
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
|
||||||
ee.data.fd = fd;
|
ee.data.fd = fd;
|
||||||
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
|
if (epoll_ctl(state->epfd,op,fd,&ee) == -1)
|
||||||
|
{
|
||||||
|
perror("epoll_ctl failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
16
src/aof.c
16
src/aof.c
@ -105,7 +105,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
ln = listFirst(server.aof_rewrite_buf_blocks);
|
ln = listFirst(server.aof_rewrite_buf_blocks);
|
||||||
block = ln ? ln->value : NULL;
|
block = ln ? ln->value : NULL;
|
||||||
if (server.aof_stop_sending_diff || !block) {
|
if (server.aof_stop_sending_diff || !block) {
|
||||||
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,
|
||||||
AE_WRITABLE);
|
AE_WRITABLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -162,8 +162,8 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
|||||||
|
|
||||||
/* Install a file event to send data to the rewrite child if there is
|
/* Install a file event to send data to the rewrite child if there is
|
||||||
* not one already. */
|
* not one already. */
|
||||||
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
|
if (aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child) == 0) {
|
||||||
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
|
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.aof_pipe_write_data_to_child,
|
||||||
AE_WRITABLE, aofChildWriteDiffData, NULL);
|
AE_WRITABLE, aofChildWriteDiffData, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -738,7 +738,7 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
/* Serve the clients from time to time */
|
/* Serve the clients from time to time */
|
||||||
if (!(loops++ % 1000)) {
|
if (!(loops++ % 1000)) {
|
||||||
loadingProgress(ftello(fp));
|
loadingProgress(ftello(fp));
|
||||||
processEventsWhileBlocked();
|
processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fgets(buf,sizeof(buf),fp) == NULL) {
|
if (fgets(buf,sizeof(buf),fp) == NULL) {
|
||||||
@ -1470,7 +1470,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
/* Remove the handler since this can be called only one time during a
|
/* Remove the handler since this can be called only one time during a
|
||||||
* rewrite. */
|
* rewrite. */
|
||||||
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Create the pipes used for parent - child process IPC during rewrite.
|
/* Create the pipes used for parent - child process IPC during rewrite.
|
||||||
@ -1488,7 +1488,7 @@ int aofCreatePipes(void) {
|
|||||||
/* Parent -> children data is non blocking. */
|
/* Parent -> children data is non blocking. */
|
||||||
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
|
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
|
||||||
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
|
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
|
||||||
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
|
||||||
|
|
||||||
server.aof_pipe_write_data_to_child = fds[1];
|
server.aof_pipe_write_data_to_child = fds[1];
|
||||||
server.aof_pipe_read_data_from_parent = fds[0];
|
server.aof_pipe_read_data_from_parent = fds[0];
|
||||||
@ -1507,8 +1507,8 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void aofClosePipes(void) {
|
void aofClosePipes(void) {
|
||||||
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE);
|
||||||
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,AE_WRITABLE);
|
||||||
close(server.aof_pipe_write_data_to_child);
|
close(server.aof_pipe_write_data_to_child);
|
||||||
close(server.aof_pipe_read_data_from_parent);
|
close(server.aof_pipe_read_data_from_parent);
|
||||||
close(server.aof_pipe_write_ack_to_parent);
|
close(server.aof_pipe_write_ack_to_parent);
|
||||||
|
@ -109,15 +109,15 @@ void blockClient(client *c, int btype) {
|
|||||||
/* This function is called in the beforeSleep() function of the event loop
|
/* This function is called in the beforeSleep() function of the event loop
|
||||||
* in order to process the pending input buffer of clients that were
|
* in order to process the pending input buffer of clients that were
|
||||||
* unblocked after a blocking operation. */
|
* unblocked after a blocking operation. */
|
||||||
void processUnblockedClients(void) {
|
void processUnblockedClients(int iel) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
client *c;
|
client *c;
|
||||||
|
|
||||||
while (listLength(server.unblocked_clients)) {
|
while (listLength(server.rgunblocked_clients[iel])) {
|
||||||
ln = listFirst(server.unblocked_clients);
|
ln = listFirst(server.rgunblocked_clients[iel]);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
c = ln->value;
|
c = ln->value;
|
||||||
listDelNode(server.unblocked_clients,ln);
|
listDelNode(server.rgunblocked_clients[iel],ln);
|
||||||
c->flags &= ~CLIENT_UNBLOCKED;
|
c->flags &= ~CLIENT_UNBLOCKED;
|
||||||
|
|
||||||
/* Process remaining data in the input buffer, unless the client
|
/* Process remaining data in the input buffer, unless the client
|
||||||
@ -153,7 +153,7 @@ void queueClientForReprocessing(client *c) {
|
|||||||
* blocking operation, don't add back it into the list multiple times. */
|
* blocking operation, don't add back it into the list multiple times. */
|
||||||
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
||||||
c->flags |= CLIENT_UNBLOCKED;
|
c->flags |= CLIENT_UNBLOCKED;
|
||||||
listAddNodeTail(server.unblocked_clients,c);
|
listAddNodeTail(server.rgunblocked_clients[c->iel],c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,7 +493,7 @@ void clusterInit(void) {
|
|||||||
int j;
|
int j;
|
||||||
|
|
||||||
for (j = 0; j < server.cfd_count; j++) {
|
for (j = 0; j < server.cfd_count; j++) {
|
||||||
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.cfd[j], AE_READABLE,
|
||||||
clusterAcceptHandler, NULL) == AE_ERR)
|
clusterAcceptHandler, NULL) == AE_ERR)
|
||||||
serverPanic("Unrecoverable error creating Redis Cluster "
|
serverPanic("Unrecoverable error creating Redis Cluster "
|
||||||
"file event.");
|
"file event.");
|
||||||
@ -601,7 +601,7 @@ clusterLink *createClusterLink(clusterNode *node) {
|
|||||||
* with this link will have the 'link' field set to NULL. */
|
* with this link will have the 'link' field set to NULL. */
|
||||||
void freeClusterLink(clusterLink *link) {
|
void freeClusterLink(clusterLink *link) {
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_READABLE|AE_WRITABLE);
|
||||||
}
|
}
|
||||||
sdsfree(link->sndbuf);
|
sdsfree(link->sndbuf);
|
||||||
sdsfree(link->rcvbuf);
|
sdsfree(link->rcvbuf);
|
||||||
@ -645,7 +645,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* node identity. */
|
* node identity. */
|
||||||
link = createClusterLink(NULL);
|
link = createClusterLink(NULL);
|
||||||
link->fd = cfd;
|
link->fd = cfd;
|
||||||
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
|
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],cfd,AE_READABLE,clusterReadHandler,link);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2132,7 +2132,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
sdsrange(link->sndbuf,nwritten,-1);
|
sdsrange(link->sndbuf,nwritten,-1);
|
||||||
if (sdslen(link->sndbuf) == 0)
|
if (sdslen(link->sndbuf) == 0)
|
||||||
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Read data. Try to read the first field of the header first to check the
|
/* Read data. Try to read the first field of the header first to check the
|
||||||
@ -2208,7 +2208,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* from event handlers that will do stuff with the same link later. */
|
* from event handlers that will do stuff with the same link later. */
|
||||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||||
aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
|
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_WRITABLE|AE_BARRIER,
|
||||||
clusterWriteHandler,link);
|
clusterWriteHandler,link);
|
||||||
|
|
||||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||||
@ -3402,7 +3402,7 @@ void clusterCron(void) {
|
|||||||
link = createClusterLink(node);
|
link = createClusterLink(node);
|
||||||
link->fd = fd;
|
link->fd = fd;
|
||||||
node->link = link;
|
node->link = link;
|
||||||
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
|
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_READABLE,
|
||||||
clusterReadHandler,link);
|
clusterReadHandler,link);
|
||||||
/* Queue a PING in the new connection ASAP: this is crucial
|
/* Queue a PING in the new connection ASAP: this is crucial
|
||||||
* to avoid false positives in failure detection.
|
* to avoid false positives in failure detection.
|
||||||
|
15
src/config.c
15
src/config.c
@ -968,15 +968,18 @@ void configSetCommand(client *c) {
|
|||||||
server.maxclients = orig_value;
|
server.maxclients = orig_value;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ((unsigned int) aeGetSetSize(server.el) <
|
if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) <
|
||||||
server.maxclients + CONFIG_FDSET_INCR)
|
server.maxclients + CONFIG_FDSET_INCR)
|
||||||
{
|
{
|
||||||
if (aeResizeSetSize(server.el,
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
server.maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
|
||||||
{
|
{
|
||||||
addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients");
|
if (aeResizeSetSize(server.rgel[iel],
|
||||||
server.maxclients = orig_value;
|
server.maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
||||||
return;
|
{
|
||||||
|
addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients");
|
||||||
|
server.maxclients = orig_value;
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,11 @@ struct fastlock
|
|||||||
int m_lock;
|
int m_lock;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
fastlock()
|
||||||
|
{
|
||||||
|
fastlock_init(this);
|
||||||
|
}
|
||||||
|
|
||||||
void lock()
|
void lock()
|
||||||
{
|
{
|
||||||
fastlock_lock(this);
|
fastlock_lock(this);
|
||||||
|
14
src/module.c
14
src/module.c
@ -2696,7 +2696,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
|||||||
|
|
||||||
/* Create the client and dispatch the command. */
|
/* Create the client and dispatch the command. */
|
||||||
va_start(ap, fmt);
|
va_start(ap, fmt);
|
||||||
c = createClient(-1);
|
c = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
c->puser = NULL; /* Root user. */
|
c->puser = NULL; /* Root user. */
|
||||||
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
|
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
|
||||||
replicate = flags & REDISMODULE_ARGV_REPLICATE;
|
replicate = flags & REDISMODULE_ARGV_REPLICATE;
|
||||||
@ -3546,7 +3546,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
|||||||
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||||
bc->free_privdata = free_privdata;
|
bc->free_privdata = free_privdata;
|
||||||
bc->privdata = NULL;
|
bc->privdata = NULL;
|
||||||
bc->reply_client = createClient(-1);
|
bc->reply_client = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
bc->reply_client->flags |= CLIENT_MODULE;
|
bc->reply_client->flags |= CLIENT_MODULE;
|
||||||
bc->dbid = c->db->id;
|
bc->dbid = c->db->id;
|
||||||
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||||
@ -3692,7 +3692,7 @@ void moduleHandleBlockedClients(void) {
|
|||||||
!(c->flags & CLIENT_PENDING_WRITE))
|
!(c->flags & CLIENT_PENDING_WRITE))
|
||||||
{
|
{
|
||||||
c->flags |= CLIENT_PENDING_WRITE;
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
listAddNodeHead(server.clients_pending_write,c);
|
listAddNodeHead(server.rgclients_pending_write[IDX_EVENT_LOOP_MAIN],c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3794,7 +3794,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
|||||||
* access it safely from another thread, so we create a fake client here
|
* access it safely from another thread, so we create a fake client here
|
||||||
* in order to keep things like the currently selected database and similar
|
* in order to keep things like the currently selected database and similar
|
||||||
* things. */
|
* things. */
|
||||||
ctx->client = createClient(-1);
|
ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
if (bc) selectDb(ctx->client,bc->dbid);
|
if (bc) selectDb(ctx->client,bc->dbid);
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
@ -4300,7 +4300,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
|||||||
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
||||||
/* This is the first key, we need to re-install the timer according
|
/* This is the first key, we need to re-install the timer according
|
||||||
* to the just added event. */
|
* to the just added event. */
|
||||||
aeDeleteTimeEvent(server.el,aeTimer);
|
aeDeleteTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],aeTimer);
|
||||||
aeTimer = -1;
|
aeTimer = -1;
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
raxStop(&ri);
|
||||||
@ -4309,7 +4309,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
|||||||
/* If we have no main timer (the old one was invalidated, or this is the
|
/* If we have no main timer (the old one was invalidated, or this is the
|
||||||
* first module timer we have), install one. */
|
* first module timer we have), install one. */
|
||||||
if (aeTimer == -1)
|
if (aeTimer == -1)
|
||||||
aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL);
|
aeTimer = aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],period,moduleTimerHandler,NULL,NULL);
|
||||||
|
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
@ -4659,7 +4659,7 @@ void moduleInitModulesSystem(void) {
|
|||||||
|
|
||||||
/* Set up the keyspace notification susbscriber list and static client */
|
/* Set up the keyspace notification susbscriber list and static client */
|
||||||
moduleKeyspaceSubscribers = listCreate();
|
moduleKeyspaceSubscribers = listCreate();
|
||||||
moduleFreeContextReusedClient = createClient(-1);
|
moduleFreeContextReusedClient = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
|
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
|
||||||
moduleFreeContextReusedClient->puser = NULL; /* root user. */
|
moduleFreeContextReusedClient->puser = NULL; /* root user. */
|
||||||
|
|
||||||
|
122
src/networking.c
122
src/networking.c
@ -82,7 +82,7 @@ void linkClient(client *c) {
|
|||||||
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
|
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
client *createClient(int fd) {
|
client *createClient(int fd, int iel) {
|
||||||
client *c = zmalloc(sizeof(client), MALLOC_LOCAL);
|
client *c = zmalloc(sizeof(client), MALLOC_LOCAL);
|
||||||
|
|
||||||
/* passing -1 as fd it is possible to create a non connected client.
|
/* passing -1 as fd it is possible to create a non connected client.
|
||||||
@ -94,7 +94,7 @@ client *createClient(int fd) {
|
|||||||
anetEnableTcpNoDelay(NULL,fd);
|
anetEnableTcpNoDelay(NULL,fd);
|
||||||
if (server.tcpkeepalive)
|
if (server.tcpkeepalive)
|
||||||
anetKeepAlive(NULL,fd,server.tcpkeepalive);
|
anetKeepAlive(NULL,fd,server.tcpkeepalive);
|
||||||
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
|
if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_THREADSAFE,
|
||||||
readQueryFromClient, c) == AE_ERR)
|
readQueryFromClient, c) == AE_ERR)
|
||||||
{
|
{
|
||||||
close(fd);
|
close(fd);
|
||||||
@ -106,6 +106,7 @@ client *createClient(int fd) {
|
|||||||
selectDb(c,0);
|
selectDb(c,0);
|
||||||
uint64_t client_id;
|
uint64_t client_id;
|
||||||
atomicGetIncr(server.next_client_id,client_id,1);
|
atomicGetIncr(server.next_client_id,client_id,1);
|
||||||
|
c->iel = iel;
|
||||||
c->id = client_id;
|
c->id = client_id;
|
||||||
c->resp = 2;
|
c->resp = 2;
|
||||||
c->fd = fd;
|
c->fd = fd;
|
||||||
@ -186,7 +187,7 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
* a system call. We'll only really install the write handler if
|
* a system call. We'll only really install the write handler if
|
||||||
* we'll not be able to write the whole reply at once. */
|
* we'll not be able to write the whole reply at once. */
|
||||||
c->flags |= CLIENT_PENDING_WRITE;
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
listAddNodeHead(server.clients_pending_write,c);
|
listAddNodeHead(server.rgclients_pending_write[c->iel],c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -779,9 +780,9 @@ int clientHasPendingReplies(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define MAX_ACCEPTS_PER_CALL 1000
|
#define MAX_ACCEPTS_PER_CALL 1000
|
||||||
static void acceptCommonHandler(int fd, int flags, char *ip) {
|
static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
|
||||||
client *c;
|
client *c;
|
||||||
if ((c = createClient(fd)) == NULL) {
|
if ((c = createClient(fd, iel)) == NULL) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Error registering fd event for the new client: %s (fd=%d)",
|
"Error registering fd event for the new client: %s (fd=%d)",
|
||||||
strerror(errno),fd);
|
strerror(errno),fd);
|
||||||
@ -849,6 +850,32 @@ static void acceptCommonHandler(int fd, int flags, char *ip) {
|
|||||||
c->flags |= flags;
|
c->flags |= flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct AcceptCommonHandlerAsyncArgs
|
||||||
|
{
|
||||||
|
int fd;
|
||||||
|
int flags;
|
||||||
|
char cip[NET_IP_STR_LEN];
|
||||||
|
int fUseCip;
|
||||||
|
int iel;
|
||||||
|
};
|
||||||
|
static void AcceptCommonHandlerAsync(void *args)
|
||||||
|
{
|
||||||
|
struct AcceptCommonHandlerAsyncArgs *aargs = args;
|
||||||
|
acceptCommonHandler(aargs->fd, aargs->flags, aargs->cip, aargs->iel);
|
||||||
|
zfree(args);
|
||||||
|
}
|
||||||
|
static void EnqueueAcceptCommonHandler(int fd, int flags, char *ip, int iel)
|
||||||
|
{
|
||||||
|
struct AcceptCommonHandlerAsyncArgs *args = zmalloc(sizeof(struct AcceptCommonHandlerAsyncArgs), MALLOC_LOCAL);
|
||||||
|
args->fd = fd;
|
||||||
|
args->flags = flags;
|
||||||
|
if (ip != NULL)
|
||||||
|
memcpy(args->cip, ip, NET_IP_STR_LEN);
|
||||||
|
args->fUseCip = (ip != NULL);
|
||||||
|
args->iel = iel;
|
||||||
|
aePostFunction(server.rgel[iel], AcceptCommonHandlerAsync, args);
|
||||||
|
}
|
||||||
|
|
||||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
|
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
|
||||||
char cip[NET_IP_STR_LEN];
|
char cip[NET_IP_STR_LEN];
|
||||||
@ -865,7 +892,24 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
|
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
|
||||||
acceptCommonHandler(cfd,0,cip);
|
int ielCur = 0;
|
||||||
|
for (; ielCur < MAX_EVENT_LOOPS; ++ielCur)
|
||||||
|
{
|
||||||
|
if (el == server.rgel[ielCur])
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
serverAssert(ielCur < MAX_EVENT_LOOPS);
|
||||||
|
int iel = rand() % MAX_EVENT_LOOPS;
|
||||||
|
if (iel == ielCur)
|
||||||
|
{
|
||||||
|
aeAcquireLock();
|
||||||
|
acceptCommonHandler(cfd,0,cip, iel);
|
||||||
|
aeReleaseLock();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
EnqueueAcceptCommonHandler(cfd, 0, cip, iel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -884,7 +928,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
|
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
|
||||||
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
|
EnqueueAcceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, rand() % MAX_EVENT_LOOPS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -928,26 +972,26 @@ void unlinkClient(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Unregister async I/O handlers and close the socket. */
|
/* Unregister async I/O handlers and close the socket. */
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_READABLE);
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
||||||
close(c->fd);
|
close(c->fd);
|
||||||
c->fd = -1;
|
c->fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove from the list of pending writes if needed. */
|
/* Remove from the list of pending writes if needed. */
|
||||||
if (c->flags & CLIENT_PENDING_WRITE) {
|
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||||
ln = listSearchKey(server.clients_pending_write,c);
|
ln = listSearchKey(server.rgclients_pending_write[c->iel],c);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
listDelNode(server.clients_pending_write,ln);
|
listDelNode(server.rgclients_pending_write[c->iel],ln);
|
||||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* When client was just unblocked because of a blocking operation,
|
/* When client was just unblocked because of a blocking operation,
|
||||||
* remove it from the list of unblocked clients. */
|
* remove it from the list of unblocked clients. */
|
||||||
if (c->flags & CLIENT_UNBLOCKED) {
|
if (c->flags & CLIENT_UNBLOCKED) {
|
||||||
ln = listSearchKey(server.unblocked_clients,c);
|
ln = listSearchKey(server.rgunblocked_clients[c->iel],c);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
listDelNode(server.unblocked_clients,ln);
|
listDelNode(server.rgunblocked_clients[c->iel],ln);
|
||||||
c->flags &= ~CLIENT_UNBLOCKED;
|
c->flags &= ~CLIENT_UNBLOCKED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1092,6 +1136,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
while(clientHasPendingReplies(c)) {
|
while(clientHasPendingReplies(c)) {
|
||||||
if (c->bufpos > 0) {
|
if (c->bufpos > 0) {
|
||||||
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||||
|
|
||||||
if (nwritten <= 0) break;
|
if (nwritten <= 0) break;
|
||||||
c->sentlen += nwritten;
|
c->sentlen += nwritten;
|
||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
@ -1113,6 +1158,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
|
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
|
||||||
|
|
||||||
if (nwritten <= 0) break;
|
if (nwritten <= 0) break;
|
||||||
c->sentlen += nwritten;
|
c->sentlen += nwritten;
|
||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
@ -1145,7 +1191,8 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
zmalloc_used_memory() < server.maxmemory) &&
|
zmalloc_used_memory() < server.maxmemory) &&
|
||||||
!(c->flags & CLIENT_SLAVE)) break;
|
!(c->flags & CLIENT_SLAVE)) break;
|
||||||
}
|
}
|
||||||
server.stat_net_output_bytes += totwritten;
|
|
||||||
|
__atomic_fetch_add(&server.stat_net_output_bytes, totwritten, __ATOMIC_RELAXED);
|
||||||
if (nwritten == -1) {
|
if (nwritten == -1) {
|
||||||
if (errno == EAGAIN) {
|
if (errno == EAGAIN) {
|
||||||
nwritten = 0;
|
nwritten = 0;
|
||||||
@ -1165,7 +1212,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
}
|
}
|
||||||
if (!clientHasPendingReplies(c)) {
|
if (!clientHasPendingReplies(c)) {
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
if (handler_installed) aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
||||||
|
|
||||||
/* Close connection after entire reply has been sent. */
|
/* Close connection after entire reply has been sent. */
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||||
@ -1187,16 +1234,17 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* we can just write the replies to the client output buffer without any
|
* we can just write the replies to the client output buffer without any
|
||||||
* need to use a syscall in order to install the writable event handler,
|
* need to use a syscall in order to install the writable event handler,
|
||||||
* get it called, and so forth. */
|
* get it called, and so forth. */
|
||||||
int handleClientsWithPendingWrites(void) {
|
int handleClientsWithPendingWrites(int iel) {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
int processed = listLength(server.clients_pending_write);
|
list *pending_writes = server.rgclients_pending_write[iel];
|
||||||
|
int processed = listLength(pending_writes);
|
||||||
|
|
||||||
listRewind(server.clients_pending_write,&li);
|
listRewind(pending_writes,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
listDelNode(server.clients_pending_write,ln);
|
listDelNode(pending_writes,ln);
|
||||||
|
|
||||||
/* If a client is protected, don't do anything,
|
/* If a client is protected, don't do anything,
|
||||||
* that may trigger write error or recreate handler. */
|
* that may trigger write error or recreate handler. */
|
||||||
@ -1219,7 +1267,7 @@ int handleClientsWithPendingWrites(void) {
|
|||||||
{
|
{
|
||||||
ae_flags |= AE_BARRIER;
|
ae_flags |= AE_BARRIER;
|
||||||
}
|
}
|
||||||
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
|
if (aeCreateFileEvent(server.rgel[c->iel], c->fd, ae_flags,
|
||||||
sendReplyToClient, c) == AE_ERR)
|
sendReplyToClient, c) == AE_ERR)
|
||||||
{
|
{
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
@ -1268,15 +1316,15 @@ void resetClient(client *c) {
|
|||||||
* path, it is not really released, but only marked for later release. */
|
* path, it is not really released, but only marked for later release. */
|
||||||
void protectClient(client *c) {
|
void protectClient(client *c) {
|
||||||
c->flags |= CLIENT_PROTECTED;
|
c->flags |= CLIENT_PROTECTED;
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_READABLE);
|
||||||
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This will undo the client protection done by protectClient() */
|
/* This will undo the client protection done by protectClient() */
|
||||||
void unprotectClient(client *c) {
|
void unprotectClient(client *c) {
|
||||||
if (c->flags & CLIENT_PROTECTED) {
|
if (c->flags & CLIENT_PROTECTED) {
|
||||||
c->flags &= ~CLIENT_PROTECTED;
|
c->flags &= ~CLIENT_PROTECTED;
|
||||||
aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c);
|
aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_THREADSAFE,readQueryFromClient,c);
|
||||||
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1630,6 +1678,14 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(mask);
|
UNUSED(mask);
|
||||||
|
|
||||||
|
int iel = 0;
|
||||||
|
for (; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
if (server.rgel[iel] == el)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
serverAssert(iel == c->iel);
|
||||||
|
|
||||||
readlen = PROTO_IOBUF_LEN;
|
readlen = PROTO_IOBUF_LEN;
|
||||||
/* If this is a multi bulk request, and we are processing a bulk reply
|
/* If this is a multi bulk request, and we are processing a bulk reply
|
||||||
* that is large enough, try to maximize the probability that the query
|
* that is large enough, try to maximize the probability that the query
|
||||||
@ -1650,18 +1706,24 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
qblen = sdslen(c->querybuf);
|
qblen = sdslen(c->querybuf);
|
||||||
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
|
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
|
||||||
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
|
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
|
||||||
|
|
||||||
nread = read(fd, c->querybuf+qblen, readlen);
|
nread = read(fd, c->querybuf+qblen, readlen);
|
||||||
|
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
if (errno == EAGAIN) {
|
if (errno == EAGAIN) {
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
|
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
|
||||||
|
aeAcquireLock();
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (nread == 0) {
|
} else if (nread == 0) {
|
||||||
serverLog(LL_VERBOSE, "Client closed connection");
|
serverLog(LL_VERBOSE, "Client closed connection");
|
||||||
|
aeAcquireLock();
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
} else if (c->flags & CLIENT_MASTER) {
|
} else if (c->flags & CLIENT_MASTER) {
|
||||||
/* Append the query buffer to the pending (not applied) buffer
|
/* Append the query buffer to the pending (not applied) buffer
|
||||||
@ -1682,7 +1744,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
||||||
sdsfree(ci);
|
sdsfree(ci);
|
||||||
sdsfree(bytes);
|
sdsfree(bytes);
|
||||||
|
aeAcquireLock();
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1692,7 +1756,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* was actually applied to the master state: this quantity, and its
|
* was actually applied to the master state: this quantity, and its
|
||||||
* corresponding part of the replication stream, will be propagated to
|
* corresponding part of the replication stream, will be propagated to
|
||||||
* the sub-slaves and to the replication backlog. */
|
* the sub-slaves and to the replication backlog. */
|
||||||
|
aeAcquireLock();
|
||||||
processInputBufferAndReplicate(c);
|
processInputBufferAndReplicate(c);
|
||||||
|
aeReleaseLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||||
@ -1775,7 +1841,7 @@ sds catClientInfoString(sds s, client *client) {
|
|||||||
if (p == flags) *p++ = 'N';
|
if (p == flags) *p++ = 'N';
|
||||||
*p++ = '\0';
|
*p++ = '\0';
|
||||||
|
|
||||||
emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
|
emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgel[client->iel],client->fd);
|
||||||
p = events;
|
p = events;
|
||||||
if (emask & AE_READABLE) *p++ = 'r';
|
if (emask & AE_READABLE) *p++ = 'r';
|
||||||
if (emask & AE_WRITABLE) *p++ = 'w';
|
if (emask & AE_WRITABLE) *p++ = 'w';
|
||||||
@ -2323,7 +2389,7 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
* of put_online_on_ack is to postpone the moment it is installed.
|
* of put_online_on_ack is to postpone the moment it is installed.
|
||||||
* This is what we want since slaves in this state should not receive
|
* This is what we want since slaves in this state should not receive
|
||||||
* writes before the first ACK. */
|
* writes before the first ACK. */
|
||||||
events = aeGetFileEvents(server.el,slave->fd);
|
events = aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd);
|
||||||
if (events & AE_WRITABLE &&
|
if (events & AE_WRITABLE &&
|
||||||
slave->replstate == SLAVE_STATE_ONLINE &&
|
slave->replstate == SLAVE_STATE_ONLINE &&
|
||||||
clientHasPendingReplies(slave))
|
clientHasPendingReplies(slave))
|
||||||
@ -2395,13 +2461,13 @@ int clientsArePaused(void) {
|
|||||||
* write, close sequence needed to serve a client.
|
* write, close sequence needed to serve a client.
|
||||||
*
|
*
|
||||||
* The function returns the total number of events processed. */
|
* The function returns the total number of events processed. */
|
||||||
int processEventsWhileBlocked(void) {
|
int processEventsWhileBlocked(int iel) {
|
||||||
int iterations = 4; /* See the function top-comment. */
|
int iterations = 4; /* See the function top-comment. */
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (iterations--) {
|
while (iterations--) {
|
||||||
int events = 0;
|
int events = 0;
|
||||||
events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
events += aeProcessEvents(server.rgel[iel], AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||||
events += handleClientsWithPendingWrites();
|
events += handleClientsWithPendingWrites(iel);
|
||||||
if (!events) break;
|
if (!events) break;
|
||||||
count += events;
|
count += events;
|
||||||
}
|
}
|
||||||
|
@ -1862,7 +1862,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
|||||||
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
|
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
|
||||||
replicationSendNewlineToMaster();
|
replicationSendNewlineToMaster();
|
||||||
loadingProgress(r->processed_bytes);
|
loadingProgress(r->processed_bytes);
|
||||||
processEventsWhileBlocked();
|
processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -856,7 +856,7 @@ void putSlaveOnline(client *slave) {
|
|||||||
slave->replstate = SLAVE_STATE_ONLINE;
|
slave->replstate = SLAVE_STATE_ONLINE;
|
||||||
slave->repl_put_online_on_ack = 0;
|
slave->repl_put_online_on_ack = 0;
|
||||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||||
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], slave->fd, AE_WRITABLE,
|
||||||
sendReplyToClient, slave) == AE_ERR) {
|
sendReplyToClient, slave) == AE_ERR) {
|
||||||
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
||||||
freeClient(slave);
|
freeClient(slave);
|
||||||
@ -918,7 +918,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
if (slave->repldboff == slave->repldbsize) {
|
if (slave->repldboff == slave->repldbsize) {
|
||||||
close(slave->repldbfd);
|
close(slave->repldbfd);
|
||||||
slave->repldbfd = -1;
|
slave->repldbfd = -1;
|
||||||
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd,AE_WRITABLE);
|
||||||
putSlaveOnline(slave);
|
putSlaveOnline(slave);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -989,8 +989,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
|||||||
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
|
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
|
||||||
(unsigned long long) slave->repldbsize);
|
(unsigned long long) slave->repldbsize);
|
||||||
|
|
||||||
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd,AE_WRITABLE);
|
||||||
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
|
||||||
freeClient(slave);
|
freeClient(slave);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -1075,7 +1075,7 @@ void replicationEmptyDbCallback(void *privdata) {
|
|||||||
* performed, this function materializes the master client we store
|
* performed, this function materializes the master client we store
|
||||||
* at server.master, starting from the specified file descriptor. */
|
* at server.master, starting from the specified file descriptor. */
|
||||||
void replicationCreateMasterClient(int fd, int dbid) {
|
void replicationCreateMasterClient(int fd, int dbid) {
|
||||||
server.master = createClient(fd);
|
server.master = createClient(fd, IDX_EVENT_LOOP_MAIN);
|
||||||
server.master->flags |= CLIENT_MASTER;
|
server.master->flags |= CLIENT_MASTER;
|
||||||
server.master->authenticated = 1;
|
server.master->authenticated = 1;
|
||||||
server.master->reploff = server.master_initial_offset;
|
server.master->reploff = server.master_initial_offset;
|
||||||
@ -1276,7 +1276,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* handler, otherwise it will get called recursively since
|
* handler, otherwise it will get called recursively since
|
||||||
* rdbLoad() will call the event loop to process events from time to
|
* rdbLoad() will call the event loop to process events from time to
|
||||||
* time for non blocking loading. */
|
* time for non blocking loading. */
|
||||||
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.repl_transfer_s,AE_READABLE);
|
||||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||||
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
|
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
|
||||||
@ -1464,7 +1464,7 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
|
|||||||
if (reply != NULL) {
|
if (reply != NULL) {
|
||||||
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
|
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
|
||||||
sdsfree(reply);
|
sdsfree(reply);
|
||||||
aeDeleteFileEvent(server.el,fd,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE);
|
||||||
return PSYNC_WRITE_ERROR;
|
return PSYNC_WRITE_ERROR;
|
||||||
}
|
}
|
||||||
return PSYNC_WAIT_REPLY;
|
return PSYNC_WAIT_REPLY;
|
||||||
@ -1479,7 +1479,7 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
|
|||||||
return PSYNC_WAIT_REPLY;
|
return PSYNC_WAIT_REPLY;
|
||||||
}
|
}
|
||||||
|
|
||||||
aeDeleteFileEvent(server.el,fd,AE_READABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE);
|
||||||
|
|
||||||
if (!strncmp(reply,"+FULLRESYNC",11)) {
|
if (!strncmp(reply,"+FULLRESYNC",11)) {
|
||||||
char *replid = NULL, *offset = NULL;
|
char *replid = NULL, *offset = NULL;
|
||||||
@ -1626,7 +1626,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
|
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
|
||||||
/* Delete the writable event so that the readable event remains
|
/* Delete the writable event so that the readable event remains
|
||||||
* registered and we can wait for the PONG reply. */
|
* registered and we can wait for the PONG reply. */
|
||||||
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_WRITABLE);
|
||||||
server.repl_state = REPL_STATE_RECEIVE_PONG;
|
server.repl_state = REPL_STATE_RECEIVE_PONG;
|
||||||
/* Send the PING, don't check for errors at all, we have the timeout
|
/* Send the PING, don't check for errors at all, we have the timeout
|
||||||
* that will take care about this. */
|
* that will take care about this. */
|
||||||
@ -1841,7 +1841,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Setup the non blocking download of the bulk file. */
|
/* Setup the non blocking download of the bulk file. */
|
||||||
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd, AE_READABLE,readSyncBulkPayload,NULL)
|
||||||
== AE_ERR)
|
== AE_ERR)
|
||||||
{
|
{
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -1860,7 +1860,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE);
|
||||||
if (dfd != -1) close(dfd);
|
if (dfd != -1) close(dfd);
|
||||||
close(fd);
|
close(fd);
|
||||||
server.repl_transfer_s = -1;
|
server.repl_transfer_s = -1;
|
||||||
@ -1884,7 +1884,7 @@ int connectWithMaster(void) {
|
|||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
|
||||||
AE_ERR)
|
AE_ERR)
|
||||||
{
|
{
|
||||||
close(fd);
|
close(fd);
|
||||||
@ -1905,7 +1905,7 @@ int connectWithMaster(void) {
|
|||||||
void undoConnectWithMaster(void) {
|
void undoConnectWithMaster(void) {
|
||||||
int fd = server.repl_transfer_s;
|
int fd = server.repl_transfer_s;
|
||||||
|
|
||||||
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE);
|
||||||
close(fd);
|
close(fd);
|
||||||
server.repl_transfer_s = -1;
|
server.repl_transfer_s = -1;
|
||||||
}
|
}
|
||||||
@ -2236,7 +2236,7 @@ void replicationResurrectCachedMaster(int newfd) {
|
|||||||
|
|
||||||
/* Re-add to the list of clients. */
|
/* Re-add to the list of clients. */
|
||||||
linkClient(server.master);
|
linkClient(server.master);
|
||||||
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], newfd, AE_READABLE,
|
||||||
readQueryFromClient, server.master)) {
|
readQueryFromClient, server.master)) {
|
||||||
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
|
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
|
||||||
freeClientAsync(server.master); /* Close ASAP. */
|
freeClientAsync(server.master); /* Close ASAP. */
|
||||||
@ -2245,7 +2245,7 @@ void replicationResurrectCachedMaster(int newfd) {
|
|||||||
/* We may also need to install the write handler as well if there is
|
/* We may also need to install the write handler as well if there is
|
||||||
* pending data in the write buffers. */
|
* pending data in the write buffers. */
|
||||||
if (clientHasPendingReplies(server.master)) {
|
if (clientHasPendingReplies(server.master)) {
|
||||||
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], newfd, AE_WRITABLE,
|
||||||
sendReplyToClient, server.master)) {
|
sendReplyToClient, server.master)) {
|
||||||
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
|
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
|
||||||
freeClientAsync(server.master); /* Close ASAP. */
|
freeClientAsync(server.master); /* Close ASAP. */
|
||||||
|
@ -1104,7 +1104,7 @@ void scriptingInit(int setup) {
|
|||||||
* Note: there is no need to create it again when this function is called
|
* Note: there is no need to create it again when this function is called
|
||||||
* by scriptingReset(). */
|
* by scriptingReset(). */
|
||||||
if (server.lua_client == NULL) {
|
if (server.lua_client == NULL) {
|
||||||
server.lua_client = createClient(-1);
|
server.lua_client = createClient(-1, IDX_EVENT_LOOP_MAIN);
|
||||||
server.lua_client->flags |= CLIENT_LUA;
|
server.lua_client->flags |= CLIENT_LUA;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1278,7 +1278,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
|
|||||||
* here when the EVAL command will return. */
|
* here when the EVAL command will return. */
|
||||||
protectClient(server.lua_caller);
|
protectClient(server.lua_caller);
|
||||||
}
|
}
|
||||||
if (server.lua_timedout) processEventsWhileBlocked();
|
if (server.lua_timedout) processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN);
|
||||||
if (server.lua_kill) {
|
if (server.lua_kill) {
|
||||||
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
|
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
|
||||||
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
|
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
|
||||||
|
@ -2013,7 +2013,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
link->pending_commands = 0;
|
link->pending_commands = 0;
|
||||||
link->cc_conn_time = mstime();
|
link->cc_conn_time = mstime();
|
||||||
link->cc->data = link;
|
link->cc->data = link;
|
||||||
redisAeAttach(server.el,link->cc);
|
redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->cc);
|
||||||
redisAsyncSetConnectCallback(link->cc,
|
redisAsyncSetConnectCallback(link->cc,
|
||||||
sentinelLinkEstablishedCallback);
|
sentinelLinkEstablishedCallback);
|
||||||
redisAsyncSetDisconnectCallback(link->cc,
|
redisAsyncSetDisconnectCallback(link->cc,
|
||||||
@ -2037,7 +2037,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
|
|
||||||
link->pc_conn_time = mstime();
|
link->pc_conn_time = mstime();
|
||||||
link->pc->data = link;
|
link->pc->data = link;
|
||||||
redisAeAttach(server.el,link->pc);
|
redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->pc);
|
||||||
redisAsyncSetConnectCallback(link->pc,
|
redisAsyncSetConnectCallback(link->pc,
|
||||||
sentinelLinkEstablishedCallback);
|
sentinelLinkEstablishedCallback);
|
||||||
redisAsyncSetDisconnectCallback(link->pc,
|
redisAsyncSetDisconnectCallback(link->pc,
|
||||||
|
127
src/server.c
127
src/server.c
@ -1619,6 +1619,16 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
|
|||||||
*out_usage = o;
|
*out_usage = o;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void AsyncClientCron(void *pv)
|
||||||
|
{
|
||||||
|
client *c = (client*)pv;
|
||||||
|
mstime_t now = mstime();
|
||||||
|
if (clientsCronHandleTimeout(c,now)) return;
|
||||||
|
if (clientsCronResizeQueryBuffer(c)) return;
|
||||||
|
if (clientsCronTrackExpansiveClients(c)) return;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called by serverCron() and is used in order to perform
|
/* This function is called by serverCron() and is used in order to perform
|
||||||
* operations on clients that are important to perform constantly. For instance
|
* operations on clients that are important to perform constantly. For instance
|
||||||
* we use this function in order to disconnect clients after a timeout, including
|
* we use this function in order to disconnect clients after a timeout, including
|
||||||
@ -1661,12 +1671,20 @@ void clientsCron(void) {
|
|||||||
listRotate(server.clients);
|
listRotate(server.clients);
|
||||||
head = listFirst(server.clients);
|
head = listFirst(server.clients);
|
||||||
c = listNodeValue(head);
|
c = listNodeValue(head);
|
||||||
/* The following functions do different service checks on the client.
|
if (c->iel == IDX_EVENT_LOOP_MAIN)
|
||||||
* The protocol is that they return non-zero if the client was
|
{
|
||||||
* terminated. */
|
/* The following functions do different service checks on the client.
|
||||||
if (clientsCronHandleTimeout(c,now)) continue;
|
* The protocol is that they return non-zero if the client was
|
||||||
if (clientsCronResizeQueryBuffer(c)) continue;
|
* terminated. */
|
||||||
if (clientsCronTrackExpansiveClients(c)) continue;
|
if (clientsCronHandleTimeout(c,now)) continue;
|
||||||
|
if (clientsCronResizeQueryBuffer(c)) continue;
|
||||||
|
if (clientsCronTrackExpansiveClients(c)) continue;
|
||||||
|
}
|
||||||
|
else if (IDX_EVENT_LOOP_MAIN > 1)
|
||||||
|
{
|
||||||
|
aePostFunction(server.rgel[c->iel], AsyncClientCron, c);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2070,14 +2088,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
moduleHandleBlockedClients();
|
moduleHandleBlockedClients();
|
||||||
|
|
||||||
/* Try to process pending commands for clients that were just unblocked. */
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
if (listLength(server.unblocked_clients))
|
if (listLength(server.rgunblocked_clients[IDX_EVENT_LOOP_MAIN]))
|
||||||
processUnblockedClients();
|
{
|
||||||
|
aeReleaseLock();
|
||||||
|
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
|
||||||
|
aeAcquireLock();
|
||||||
|
}
|
||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
|
|
||||||
/* Handle writes with pending output buffers. */
|
/* Handle writes with pending output buffers. */
|
||||||
handleClientsWithPendingWrites();
|
aeReleaseLock();
|
||||||
|
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
|
||||||
|
aeAcquireLock();
|
||||||
|
|
||||||
/* Before we are going to sleep, let the threads access the dataset by
|
/* Before we are going to sleep, let the threads access the dataset by
|
||||||
* releasing the GIL. Redis main thread will not touch anything at this
|
* releasing the GIL. Redis main thread will not touch anything at this
|
||||||
@ -2085,6 +2109,24 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
if (moduleCount()) moduleReleaseGIL();
|
if (moduleCount()) moduleReleaseGIL();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||||
|
{
|
||||||
|
int iel = 0;
|
||||||
|
for (; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
if (server.rgel[iel] == eventLoop)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
serverAssert(iel < MAX_EVENT_LOOPS);
|
||||||
|
|
||||||
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
|
if (listLength(server.rgunblocked_clients[iel]))
|
||||||
|
processUnblockedClients(iel);
|
||||||
|
|
||||||
|
/* Handle writes with pending output buffers. */
|
||||||
|
handleClientsWithPendingWrites(iel);
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called immadiately after the event loop multiplexing
|
/* This function is called immadiately after the event loop multiplexing
|
||||||
* API returned, and the control is going to soon return to Redis by invoking
|
* API returned, and the control is going to soon return to Redis by invoking
|
||||||
* the different events callbacks. */
|
* the different events callbacks. */
|
||||||
@ -2702,6 +2744,8 @@ void initServer(void) {
|
|||||||
signal(SIGPIPE, SIG_IGN);
|
signal(SIGPIPE, SIG_IGN);
|
||||||
setupSignalHandlers();
|
setupSignalHandlers();
|
||||||
|
|
||||||
|
fastlock_init(&server.flock);
|
||||||
|
|
||||||
if (server.syslog_enabled) {
|
if (server.syslog_enabled) {
|
||||||
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
|
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
|
||||||
server.syslog_facility);
|
server.syslog_facility);
|
||||||
@ -2715,9 +2759,12 @@ void initServer(void) {
|
|||||||
server.clients_to_close = listCreate();
|
server.clients_to_close = listCreate();
|
||||||
server.slaves = listCreate();
|
server.slaves = listCreate();
|
||||||
server.monitors = listCreate();
|
server.monitors = listCreate();
|
||||||
server.clients_pending_write = listCreate();
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
server.rgclients_pending_write[iel] = listCreate();
|
||||||
|
server.rgunblocked_clients[iel] = listCreate();
|
||||||
|
}
|
||||||
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
||||||
server.unblocked_clients = listCreate();
|
|
||||||
server.ready_keys = listCreate();
|
server.ready_keys = listCreate();
|
||||||
server.clients_waiting_acks = listCreate();
|
server.clients_waiting_acks = listCreate();
|
||||||
server.get_ack_from_slaves = 0;
|
server.get_ack_from_slaves = 0;
|
||||||
@ -2726,12 +2773,15 @@ void initServer(void) {
|
|||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
adjustOpenFilesLimit();
|
adjustOpenFilesLimit();
|
||||||
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
|
for (int i = 0; i < MAX_EVENT_LOOPS; ++i)
|
||||||
if (server.el == NULL) {
|
{
|
||||||
serverLog(LL_WARNING,
|
server.rgel[i] = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
if (server.rgel[i] == NULL) {
|
||||||
strerror(errno));
|
serverLog(LL_WARNING,
|
||||||
exit(1);
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
|
strerror(errno));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
||||||
|
|
||||||
@ -2808,7 +2858,7 @@ void initServer(void) {
|
|||||||
/* Create the timer callback, this is our way to process many background
|
/* Create the timer callback, this is our way to process many background
|
||||||
* operations incrementally, like clients timeout, eviction of unaccessed
|
* operations incrementally, like clients timeout, eviction of unaccessed
|
||||||
* expired keys and so forth. */
|
* expired keys and so forth. */
|
||||||
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
if (aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN], 1, serverCron, NULL, NULL) == AE_ERR) {
|
||||||
serverPanic("Can't create event loop timers.");
|
serverPanic("Can't create event loop timers.");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
@ -2816,20 +2866,23 @@ void initServer(void) {
|
|||||||
/* Create an event handler for accepting new connections in TCP and Unix
|
/* Create an event handler for accepting new connections in TCP and Unix
|
||||||
* domain sockets. */
|
* domain sockets. */
|
||||||
for (j = 0; j < server.ipfd_count; j++) {
|
for (j = 0; j < server.ipfd_count; j++) {
|
||||||
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
acceptTcpHandler,NULL) == AE_ERR)
|
{
|
||||||
{
|
if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_THREADSAFE,
|
||||||
serverPanic(
|
acceptTcpHandler,NULL) == AE_ERR)
|
||||||
"Unrecoverable error creating server.ipfd file event.");
|
{
|
||||||
}
|
serverPanic(
|
||||||
|
"Unrecoverable error creating server.ipfd file event.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
|
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE,
|
||||||
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
||||||
|
|
||||||
|
|
||||||
/* Register a readable event for the pipe used to awake the event loop
|
/* Register a readable event for the pipe used to awake the event loop
|
||||||
* when a blocked client in a module needs attention. */
|
* when a blocked client in a module needs attention. */
|
||||||
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
|
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE,
|
||||||
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
||||||
serverPanic(
|
serverPanic(
|
||||||
"Error registering the readable event for the module "
|
"Error registering the readable event for the module "
|
||||||
@ -4738,6 +4791,17 @@ int redisIsSupervised(int mode) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *workerThreadMain(void *parg)
|
||||||
|
{
|
||||||
|
int iel = (int)((int64_t)parg);
|
||||||
|
|
||||||
|
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
|
||||||
|
aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_THREADSAFE);
|
||||||
|
aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0);
|
||||||
|
aeMain(server.rgel[iel]);
|
||||||
|
aeDeleteEventLoop(server.rgel[iel]);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
@ -4940,10 +5004,13 @@ int main(int argc, char **argv) {
|
|||||||
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
||||||
}
|
}
|
||||||
|
|
||||||
aeSetBeforeSleepProc(server.el,beforeSleep);
|
pthread_t rgthread[MAX_EVENT_LOOPS];
|
||||||
aeSetAfterSleepProc(server.el,afterSleep);
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
aeMain(server.el);
|
{
|
||||||
aeDeleteEventLoop(server.el);
|
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
|
||||||
|
}
|
||||||
|
void *pretT;
|
||||||
|
pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pretT);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
23
src/server.h
23
src/server.h
@ -51,6 +51,7 @@
|
|||||||
|
|
||||||
typedef long long mstime_t; /* millisecond time type. */
|
typedef long long mstime_t; /* millisecond time type. */
|
||||||
|
|
||||||
|
#include "fastlock.h"
|
||||||
#include "ae.h" /* Event driven programming library */
|
#include "ae.h" /* Event driven programming library */
|
||||||
#include "sds.h" /* Dynamic safe strings */
|
#include "sds.h" /* Dynamic safe strings */
|
||||||
#include "dict.h" /* Hash tables */
|
#include "dict.h" /* Hash tables */
|
||||||
@ -845,6 +846,8 @@ typedef struct client {
|
|||||||
/* Response buffer */
|
/* Response buffer */
|
||||||
int bufpos;
|
int bufpos;
|
||||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||||
|
|
||||||
|
int iel; /* the event loop index we're registered with */
|
||||||
} client;
|
} client;
|
||||||
|
|
||||||
struct saveparam {
|
struct saveparam {
|
||||||
@ -1005,6 +1008,9 @@ struct clusterState;
|
|||||||
#define CHILD_INFO_TYPE_RDB 0
|
#define CHILD_INFO_TYPE_RDB 0
|
||||||
#define CHILD_INFO_TYPE_AOF 1
|
#define CHILD_INFO_TYPE_AOF 1
|
||||||
|
|
||||||
|
#define MAX_EVENT_LOOPS 2
|
||||||
|
#define IDX_EVENT_LOOP_MAIN 0
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
/* General */
|
/* General */
|
||||||
pid_t pid; /* Main process pid. */
|
pid_t pid; /* Main process pid. */
|
||||||
@ -1019,7 +1025,8 @@ struct redisServer {
|
|||||||
redisDb *db;
|
redisDb *db;
|
||||||
dict *commands; /* Command table */
|
dict *commands; /* Command table */
|
||||||
dict *orig_commands; /* Command table before command renaming. */
|
dict *orig_commands; /* Command table before command renaming. */
|
||||||
aeEventLoop *el;
|
int cel;
|
||||||
|
aeEventLoop *rgel[MAX_EVENT_LOOPS];
|
||||||
unsigned int lruclock; /* Clock for LRU eviction */
|
unsigned int lruclock; /* Clock for LRU eviction */
|
||||||
int shutdown_asap; /* SHUTDOWN needed ASAP */
|
int shutdown_asap; /* SHUTDOWN needed ASAP */
|
||||||
int activerehashing; /* Incremental rehash in serverCron() */
|
int activerehashing; /* Incremental rehash in serverCron() */
|
||||||
@ -1051,7 +1058,7 @@ struct redisServer {
|
|||||||
int cfd_count; /* Used slots in cfd[] */
|
int cfd_count; /* Used slots in cfd[] */
|
||||||
list *clients; /* List of active clients */
|
list *clients; /* List of active clients */
|
||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
list *clients_pending_write; /* There is to write or install handler. */
|
list *rgclients_pending_write[MAX_EVENT_LOOPS]; /* There is to write or install handler. */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
client *current_client; /* Current client, only used on crash report */
|
client *current_client; /* Current client, only used on crash report */
|
||||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||||
@ -1272,7 +1279,7 @@ struct redisServer {
|
|||||||
/* Blocked clients */
|
/* Blocked clients */
|
||||||
unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
|
unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
|
||||||
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
||||||
list *unblocked_clients; /* list of clients to unblock before next loop */
|
list *rgunblocked_clients[MAX_EVENT_LOOPS]; /* list of clients to unblock before next loop */
|
||||||
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
||||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||||
* have to take this state global, in order to pass it to sortCompare() */
|
* have to take this state global, in order to pass it to sortCompare() */
|
||||||
@ -1362,6 +1369,8 @@ struct redisServer {
|
|||||||
pthread_mutex_t lruclock_mutex;
|
pthread_mutex_t lruclock_mutex;
|
||||||
pthread_mutex_t next_client_id_mutex;
|
pthread_mutex_t next_client_id_mutex;
|
||||||
pthread_mutex_t unixtime_mutex;
|
pthread_mutex_t unixtime_mutex;
|
||||||
|
|
||||||
|
struct fastlock flock;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct pubsubPattern {
|
typedef struct pubsubPattern {
|
||||||
@ -1504,7 +1513,7 @@ size_t redisPopcount(void *s, long count);
|
|||||||
void redisSetProcTitle(char *title);
|
void redisSetProcTitle(char *title);
|
||||||
|
|
||||||
/* networking.c -- Networking and Client related operations */
|
/* networking.c -- Networking and Client related operations */
|
||||||
client *createClient(int fd);
|
client *createClient(int fd, int iel);
|
||||||
void closeTimedoutClients(void);
|
void closeTimedoutClients(void);
|
||||||
void freeClient(client *c);
|
void freeClient(client *c);
|
||||||
void freeClientAsync(client *c);
|
void freeClientAsync(client *c);
|
||||||
@ -1571,8 +1580,8 @@ void disconnectSlaves(void);
|
|||||||
int listenToPort(int port, int *fds, int *count);
|
int listenToPort(int port, int *fds, int *count);
|
||||||
void pauseClients(mstime_t duration);
|
void pauseClients(mstime_t duration);
|
||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
int processEventsWhileBlocked(void);
|
int processEventsWhileBlocked(int iel);
|
||||||
int handleClientsWithPendingWrites(void);
|
int handleClientsWithPendingWrites(int iel);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c);
|
||||||
void unlinkClient(client *c);
|
void unlinkClient(client *c);
|
||||||
int writeToClient(int fd, client *c, int handler_installed);
|
int writeToClient(int fd, client *c, int handler_installed);
|
||||||
@ -2014,7 +2023,7 @@ int ldbPendingChildren(void);
|
|||||||
sds luaCreateFunction(client *c, lua_State *lua, robj *body);
|
sds luaCreateFunction(client *c, lua_State *lua, robj *body);
|
||||||
|
|
||||||
/* Blocked clients */
|
/* Blocked clients */
|
||||||
void processUnblockedClients(void);
|
void processUnblockedClients(int iel);
|
||||||
void blockClient(client *c, int btype);
|
void blockClient(client *c, int btype);
|
||||||
void unblockClient(client *c);
|
void unblockClient(client *c);
|
||||||
void queueClientForReprocessing(client *c);
|
void queueClientForReprocessing(client *c);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user