Merge branch 'unstable' into keydbpro
Former-commit-id: e59c3945d3ed0b05a3e42713b091b1b5c96a9db2
This commit is contained in:
commit
fb5b1e8e7d
40
src/ae.cpp
40
src/ae.cpp
@ -48,6 +48,7 @@
|
|||||||
#include "fastlock.h"
|
#include "fastlock.h"
|
||||||
#include "zmalloc.h"
|
#include "zmalloc.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
#include "serverassert.h"
|
||||||
|
|
||||||
#ifdef USE_MUTEX
|
#ifdef USE_MUTEX
|
||||||
thread_local int cOwnLock = 0;
|
thread_local int cOwnLock = 0;
|
||||||
@ -84,8 +85,6 @@ fastlock g_lock("AE (global)");
|
|||||||
#endif
|
#endif
|
||||||
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
||||||
|
|
||||||
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)1) = 1; } while(0)
|
|
||||||
|
|
||||||
/* Include the best multiplexing layer supported by this system.
|
/* Include the best multiplexing layer supported by this system.
|
||||||
* The following should be ordered by performances, descending. */
|
* The following should be ordered by performances, descending. */
|
||||||
#ifdef HAVE_EVPORT
|
#ifdef HAVE_EVPORT
|
||||||
@ -143,7 +142,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
|||||||
auto cb = read(fd, &cmd, sizeof(aeCommand));
|
auto cb = read(fd, &cmd, sizeof(aeCommand));
|
||||||
if (cb != sizeof(cmd))
|
if (cb != sizeof(cmd))
|
||||||
{
|
{
|
||||||
AE_ASSERT(errno == EAGAIN);
|
serverAssert(errno == EAGAIN);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
switch (cmd.op)
|
switch (cmd.op)
|
||||||
@ -254,8 +253,8 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
if (size != sizeof(cmd))
|
if (size != sizeof(cmd))
|
||||||
{
|
{
|
||||||
AE_ASSERT(size == sizeof(cmd) || size <= 0);
|
serverAssert(size == sizeof(cmd) || size <= 0);
|
||||||
AE_ASSERT(errno == EAGAIN);
|
serverAssert(errno == EAGAIN);
|
||||||
ret = AE_ERR;
|
ret = AE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,10 +309,13 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
AE_ASSERT(!size || size == sizeof(cmd));
|
if (!(!size || size == sizeof(cmd))) {
|
||||||
|
printf("Last error: %d\n", errno);
|
||||||
|
}
|
||||||
|
serverAssert(!size || size == sizeof(cmd));
|
||||||
|
|
||||||
if (size == 0)
|
if (size == 0)
|
||||||
return AE_ERR;
|
return AE_ERR;
|
||||||
|
|
||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
{
|
{
|
||||||
@ -356,7 +358,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
|
|||||||
goto err;
|
goto err;
|
||||||
eventLoop->fdCmdRead = rgfd[0];
|
eventLoop->fdCmdRead = rgfd[0];
|
||||||
eventLoop->fdCmdWrite = rgfd[1];
|
eventLoop->fdCmdWrite = rgfd[1];
|
||||||
fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK);
|
//fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK);
|
||||||
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
|
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
|
||||||
eventLoop->cevents = 0;
|
eventLoop->cevents = 0;
|
||||||
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL);
|
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL);
|
||||||
@ -393,7 +395,7 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
|
|||||||
*
|
*
|
||||||
* Otherwise AE_OK is returned and the operation is successful. */
|
* Otherwise AE_OK is returned and the operation is successful. */
|
||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
if (setsize == eventLoop->setsize) return AE_OK;
|
if (setsize == eventLoop->setsize) return AE_OK;
|
||||||
@ -431,14 +433,14 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
extern "C" void aeStop(aeEventLoop *eventLoop) {
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
eventLoop->stop = 1;
|
eventLoop->stop = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData)
|
aeFileProc *proc, void *clientData)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
if (fd >= eventLoop->setsize) {
|
if (fd >= eventLoop->setsize) {
|
||||||
errno = ERANGE;
|
errno = ERANGE;
|
||||||
return AE_ERR;
|
return AE_ERR;
|
||||||
@ -467,12 +469,12 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
|
|||||||
cmd.mask = mask;
|
cmd.mask = mask;
|
||||||
cmd.fLock = true;
|
cmd.fLock = true;
|
||||||
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
AE_ASSERT(cb == sizeof(cmd));
|
serverAssert(cb == sizeof(cmd));
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
if (fd >= eventLoop->setsize) return;
|
if (fd >= eventLoop->setsize) return;
|
||||||
aeFileEvent *fe = &eventLoop->events[fd];
|
aeFileEvent *fe = &eventLoop->events[fd];
|
||||||
if (fe->mask == AE_NONE) return;
|
if (fe->mask == AE_NONE) return;
|
||||||
@ -530,7 +532,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise
|
|||||||
aeTimeProc *proc, void *clientData,
|
aeTimeProc *proc, void *clientData,
|
||||||
aeEventFinalizerProc *finalizerProc)
|
aeEventFinalizerProc *finalizerProc)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
long long id = eventLoop->timeEventNextId++;
|
long long id = eventLoop->timeEventNextId++;
|
||||||
aeTimeEvent *te;
|
aeTimeEvent *te;
|
||||||
|
|
||||||
@ -552,7 +554,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise
|
|||||||
|
|
||||||
extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
aeTimeEvent *te = eventLoop->timeEventHead;
|
aeTimeEvent *te = eventLoop->timeEventHead;
|
||||||
while(te) {
|
while(te) {
|
||||||
if (te->id == id) {
|
if (te->id == id) {
|
||||||
@ -577,7 +579,7 @@ extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
|
|||||||
*/
|
*/
|
||||||
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
|
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
aeTimeEvent *te = eventLoop->timeEventHead;
|
aeTimeEvent *te = eventLoop->timeEventHead;
|
||||||
aeTimeEvent *nearest = NULL;
|
aeTimeEvent *nearest = NULL;
|
||||||
|
|
||||||
@ -753,7 +755,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
|
|||||||
* The function returns the number of events processed. */
|
* The function returns the number of events processed. */
|
||||||
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||||
{
|
{
|
||||||
AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
serverAssert(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop);
|
||||||
int processed = 0, numevents;
|
int processed = 0, numevents;
|
||||||
|
|
||||||
/* Nothing to do? return ASAP */
|
/* Nothing to do? return ASAP */
|
||||||
@ -877,9 +879,9 @@ void aeMain(aeEventLoop *eventLoop) {
|
|||||||
ulock.lock();
|
ulock.lock();
|
||||||
eventLoop->beforesleep(eventLoop);
|
eventLoop->beforesleep(eventLoop);
|
||||||
}
|
}
|
||||||
AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
serverAssert(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
||||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
||||||
AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
serverAssert(!aeThreadOwnsLock()); // we should have relinquished it after processing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,8 +310,10 @@ void freeStringObject(robj_roptr o) {
|
|||||||
void freeListObject(robj_roptr o) {
|
void freeListObject(robj_roptr o) {
|
||||||
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
|
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||||
quicklistRelease((quicklist*)ptrFromObj(o));
|
quicklistRelease((quicklist*)ptrFromObj(o));
|
||||||
|
} else if (o->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||||
|
zfree(ptrFromObj(o));
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown list encoding type");
|
serverPanic("Unknown list encoding type: %d", o->encoding);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,4 +48,7 @@ start_server {tags {"nested_hash"}} {
|
|||||||
r keydb.nhset a.b.c val1
|
r keydb.nhset a.b.c val1
|
||||||
assert_equal [r expire a.b 100] 0
|
assert_equal [r expire a.b 100] 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Save not implemented so ensure we don't crash saving the RDB on shutdown
|
||||||
|
r flushall
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user