Fix most failing tests

This commit is contained in:
John Sully 2019-02-15 10:53:32 -05:00
parent d62178ec8c
commit 48f6d0d800
6 changed files with 79 additions and 97 deletions

View File

@ -152,7 +152,8 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->fdCmdRead = rgfd[0]; eventLoop->fdCmdRead = rgfd[0];
eventLoop->fdCmdWrite = rgfd[1]; eventLoop->fdCmdWrite = rgfd[1];
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_THREADSAFE, aeProcessCmd, NULL); eventLoop->cevents = 0;
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL);
return eventLoop; return eventLoop;
@ -257,6 +258,9 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
* is removed. */ * is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER; if (mask & AE_WRITABLE) mask |= AE_BARRIER;
if (mask & AE_WRITABLE) mask |= AE_WRITE_THREADSAFE;
if (mask & AE_READABLE) mask |= AE_READ_THREADSAFE;
aeApiDelEvent(eventLoop, fd, mask); aeApiDelEvent(eventLoop, fd, mask);
fe->mask = fe->mask & (~mask); fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
@ -441,9 +445,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int mask, int fd) extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int mask, int fd)
{ {
#define LOCK_IF_NECESSARY(fe) \ #define LOCK_IF_NECESSARY(fe, tsmask) \
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \ std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
if (!(fe->mask & AE_THREADSAFE)) \ if (!(fe->mask & tsmask)) \
ulock.lock() ulock.lock()
int fired = 0; /* Number of events fired for current fd. */ int fired = 0; /* Number of events fired for current fd. */
@ -468,7 +472,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
* Fire the readable event if the call sequence is not * Fire the readable event if the call sequence is not
* inverted. */ * inverted. */
if (!invert && fe->mask & mask & AE_READABLE) { if (!invert && fe->mask & mask & AE_READABLE) {
LOCK_IF_NECESSARY(fe); LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE);
fe->rfileProc(eventLoop,fd,fe->clientData,mask); fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++; fired++;
} }
@ -476,7 +480,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
/* Fire the writable event. */ /* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) { if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) { if (!fired || fe->wfileProc != fe->rfileProc) {
LOCK_IF_NECESSARY(fe); LOCK_IF_NECESSARY(fe, AE_WRITE_THREADSAFE);
fe->wfileProc(eventLoop,fd,fe->clientData,mask); fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++; fired++;
} }
@ -486,7 +490,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
* after the writable one. */ * after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) { if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) { if (!fired || fe->wfileProc != fe->rfileProc) {
LOCK_IF_NECESSARY(fe); LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE);
fe->rfileProc(eventLoop,fd,fe->clientData,mask); fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++; fired++;
} }
@ -567,7 +571,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* After sleep callback. */ /* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE))
ulock.lock(); ulock.lock();
eventLoop->aftersleep(eventLoop); eventLoop->aftersleep(eventLoop);
} }
@ -586,6 +590,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (flags & AE_TIME_EVENTS) if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop); processed += processTimeEvents(eventLoop);
eventLoop->cevents += processed;
return processed; /* return the number of processed file/time events */ return processed; /* return the number of processed file/time events */
} }
@ -617,7 +622,7 @@ void aeMain(aeEventLoop *eventLoop) {
while (!eventLoop->stop) { while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL) { if (eventLoop->beforesleep != NULL) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE))
ulock.lock(); ulock.lock();
eventLoop->beforesleep(eventLoop); eventLoop->beforesleep(eventLoop);
} }

View File

@ -51,7 +51,9 @@ extern "C" {
loop iteration. Useful when you want to persist loop iteration. Useful when you want to persist
things to disk before sending replies, and want things to disk before sending replies, and want
to do that in a group fashion. */ to do that in a group fashion. */
#define AE_THREADSAFE 8 /* Ok to run concurrently */ #define AE_READ_THREADSAFE 8
#define AE_WRITE_THREADSAFE 16
#define AE_SLEEP_THREADSAFE 32
#define AE_FILE_EVENTS 1 #define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2 #define AE_TIME_EVENTS 2
@ -118,6 +120,7 @@ typedef struct aeEventLoop {
struct fastlock flock; struct fastlock flock;
int fdCmdWrite; int fdCmdWrite;
int fdCmdRead; int fdCmdRead;
int cevents;
} aeEventLoop; } aeEventLoop;
/* Prototypes */ /* Prototypes */

View File

@ -971,7 +971,7 @@ void configSetCommand(client *c) {
if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) < if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) <
server.maxclients + CONFIG_FDSET_INCR) server.maxclients + CONFIG_FDSET_INCR)
{ {
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) for (int iel = 0; iel < server.cel; ++iel)
{ {
if (aeResizeSetSize(server.rgel[iel], if (aeResizeSetSize(server.rgel[iel],
server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) server.maxclients + CONFIG_FDSET_INCR) == AE_ERR)

View File

@ -94,7 +94,7 @@ client *createClient(int fd, int iel) {
anetEnableTcpNoDelay(NULL,fd); anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive) if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive); anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_THREADSAFE, if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_READ_THREADSAFE,
readQueryFromClient, c) == AE_ERR) readQueryFromClient, c) == AE_ERR)
{ {
close(fd); close(fd);
@ -850,36 +850,9 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
c->flags |= flags; c->flags |= flags;
} }
struct AcceptCommonHandlerAsyncArgs
{
int fd;
int flags;
char cip[NET_IP_STR_LEN];
int fUseCip;
int iel;
};
static void AcceptCommonHandlerAsync(void *args)
{
struct AcceptCommonHandlerAsyncArgs *aargs = args;
acceptCommonHandler(aargs->fd, aargs->flags, aargs->cip, aargs->iel);
zfree(args);
}
static void EnqueueAcceptCommonHandler(int fd, int flags, char *ip, int iel)
{
struct AcceptCommonHandlerAsyncArgs *args = zmalloc(sizeof(struct AcceptCommonHandlerAsyncArgs), MALLOC_LOCAL);
args->fd = fd;
args->flags = flags;
if (ip != NULL)
memcpy(args->cip, ip, NET_IP_STR_LEN);
args->fUseCip = (ip != NULL);
args->iel = iel;
aePostFunction(server.rgel[iel], AcceptCommonHandlerAsync, args);
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL; int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN]; char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask); UNUSED(mask);
UNUSED(privdata); UNUSED(privdata);
@ -892,24 +865,12 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} }
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
int ielCur = 0; int ielCur = ielFromEventLoop(el);
for (; ielCur < MAX_EVENT_LOOPS; ++ielCur)
{ // We always accept on the same thread
if (el == server.rgel[ielCur]) aeAcquireLock();
break; acceptCommonHandler(cfd,0,cip, ielCur);
} aeReleaseLock();
serverAssert(ielCur < MAX_EVENT_LOOPS);
int iel = rand() % MAX_EVENT_LOOPS;
if (iel == ielCur)
{
aeAcquireLock();
acceptCommonHandler(cfd,0,cip, iel);
aeReleaseLock();
}
else
{
EnqueueAcceptCommonHandler(cfd, 0, cip, iel);
}
} }
} }
@ -927,8 +888,13 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
"Accepting client connection: %s", server.neterr); "Accepting client connection: %s", server.neterr);
return; return;
} }
int ielCur = ielFromEventLoop(el);
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
EnqueueAcceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, rand() % MAX_EVENT_LOOPS);
aeAcquireLock();
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur);
aeReleaseLock();
} }
} }
@ -1324,7 +1290,7 @@ void protectClient(client *c) {
void unprotectClient(client *c) { void unprotectClient(client *c) {
if (c->flags & CLIENT_PROTECTED) { if (c->flags & CLIENT_PROTECTED) {
c->flags &= ~CLIENT_PROTECTED; c->flags &= ~CLIENT_PROTECTED;
aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_THREADSAFE,readQueryFromClient,c); aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c);
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
} }
} }
@ -1678,14 +1644,6 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el); UNUSED(el);
UNUSED(mask); UNUSED(mask);
int iel = 0;
for (; iel < MAX_EVENT_LOOPS; ++iel)
{
if (server.rgel[iel] == el)
break;
}
serverAssert(iel == c->iel);
readlen = PROTO_IOBUF_LEN; readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply /* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query * that is large enough, try to maximize the probability that the query

View File

@ -2111,13 +2111,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
void beforeSleepLite(struct aeEventLoop *eventLoop) void beforeSleepLite(struct aeEventLoop *eventLoop)
{ {
int iel = 0; int iel = ielFromEventLoop(eventLoop);
for (; iel < MAX_EVENT_LOOPS; ++iel)
{
if (server.rgel[iel] == eventLoop)
break;
}
serverAssert(iel < MAX_EVENT_LOOPS);
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.rgunblocked_clients[iel])) if (listLength(server.rgunblocked_clients[iel]))
@ -2451,6 +2445,9 @@ void initServerConfig(void) {
* script to the slave / AOF. This is the new way starting from * script to the slave / AOF. This is the new way starting from
* Redis 5. However it is possible to revert it via redis.conf. */ * Redis 5. However it is possible to revert it via redis.conf. */
server.lua_always_replicate_commands = 1; server.lua_always_replicate_commands = 1;
/* Multithreading */
server.cel = CONFIG_DEFAULT_THREADS;
} }
extern char **environ; extern char **environ;
@ -2737,9 +2734,24 @@ void resetServerStats(void) {
server.aof_delayed_fsync = 0; server.aof_delayed_fsync = 0;
} }
void initServer(void) { void initServerAcceptHandlers(void)
int j; {
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (int j = 0; j < server.ipfd_count; j++) {
int iel = j % server.cel;
if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_READ_THREADSAFE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
}
void initServer(void) {
signal(SIGHUP, SIG_IGN); signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
setupSignalHandlers(); setupSignalHandlers();
@ -2809,7 +2821,7 @@ void initServer(void) {
} }
/* Create the Redis databases, and initialize other internal state. */ /* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) { for (int j = 0; j < server.dbnum; j++) {
server.db[j].pdict = dictCreate(&dbDictType,NULL); server.db[j].pdict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
@ -2863,23 +2875,6 @@ void initServer(void) {
exit(1); exit(1);
} }
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
{
if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_THREADSAFE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop /* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */ * when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE, if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE,
@ -4794,9 +4789,10 @@ int redisIsSupervised(int mode) {
void *workerThreadMain(void *parg) void *workerThreadMain(void *parg)
{ {
int iel = (int)((int64_t)parg); int iel = (int)((int64_t)parg);
serverLog(LOG_INFO, "Thread %d alive.", iel);
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_THREADSAFE); aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0); aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0);
aeMain(server.rgel[iel]); aeMain(server.rgel[iel]);
aeDeleteEventLoop(server.rgel[iel]); aeDeleteEventLoop(server.rgel[iel]);
@ -5004,8 +5000,14 @@ int main(int argc, char **argv) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
} }
server.cel = 4; //testing
initServerAcceptHandlers();
serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS]; pthread_t rgthread[MAX_EVENT_LOOPS];
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) for (int iel = 0; iel < server.cel; ++iel)
{ {
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
} }

View File

@ -169,6 +169,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
#define CONFIG_DEFAULT_THREADS 1
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
@ -1008,7 +1010,7 @@ struct clusterState;
#define CHILD_INFO_TYPE_RDB 0 #define CHILD_INFO_TYPE_RDB 0
#define CHILD_INFO_TYPE_AOF 1 #define CHILD_INFO_TYPE_AOF 1
#define MAX_EVENT_LOOPS 2 #define MAX_EVENT_LOOPS 16
#define IDX_EVENT_LOOP_MAIN 0 #define IDX_EVENT_LOOP_MAIN 0
struct redisServer { struct redisServer {
@ -2279,6 +2281,18 @@ int memtest_preserving_test(unsigned long *m, size_t bytes, int passes);
void mixDigest(unsigned char *digest, void *ptr, size_t len); void mixDigest(unsigned char *digest, void *ptr, size_t len);
void xorDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len);
inline int ielFromEventLoop(const aeEventLoop *eventLoop)
{
int iel = 0;
for (; iel < server.cel; ++iel)
{
if (server.rgel[iel] == eventLoop)
break;
}
serverAssert(iel < server.cel);
return iel;
}
#define redisDebug(fmt, ...) \ #define redisDebug(fmt, ...) \
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
#define redisDebugMark() \ #define redisDebugMark() \