Most tests failing, except some memory and number of PSYNC syncs

This commit is contained in:
John Sully 2019-02-19 01:11:00 -05:00
parent acbad0c04e
commit bf41d3916e
6 changed files with 77 additions and 10 deletions

View File

@ -144,8 +144,17 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostCppFunction: case AE_ASYNC_OP::PostCppFunction:
{ {
if (cmd.pctl != nullptr)
cmd.pctl->mutexcv.lock();
std::unique_lock<decltype(g_lock)> ulock(g_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock);
(*cmd.pfn)(); (*cmd.pfn)();
if (cmd.pctl != nullptr)
{
cmd.pctl->cv.notify_all();
cmd.pctl->mutexcv.unlock();
}
delete cmd.pfn; delete cmd.pfn;
} }
break; break;
@ -186,6 +195,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
{ {
if (eventLoop == g_eventLoopThisThread)
{
proc(arg);
return AE_OK;
}
aeCommand cmd; aeCommand cmd;
cmd.op = AE_ASYNC_OP::PostFunction; cmd.op = AE_ASYNC_OP::PostFunction;
cmd.proc = proc; cmd.proc = proc;
@ -195,14 +209,33 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK; return AE_OK;
} }
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn) int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous)
{ {
if (eventLoop == g_eventLoopThisThread)
{
fn();
return AE_OK;
}
aeCommand cmd; aeCommand cmd;
cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.op = AE_ASYNC_OP::PostCppFunction;
cmd.pfn = new std::function<void()>(fn); cmd.pfn = new std::function<void()>(fn);
cmd.pctl = nullptr;
if (fSynchronous)
cmd.pctl = new aeCommandControl();
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
if (fSynchronous)
cmd.pctl->mutexcv.lock();
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(size == sizeof(cmd)); AE_ASSERT(size == sizeof(cmd));
return AE_OK; int ret = AE_OK;
if (fSynchronous)
{
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
delete cmd.pctl;
}
return ret;
} }
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *aeCreateEventLoop(int setsize) {
@ -232,7 +265,8 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (pipe(rgfd) < 0) if (pipe(rgfd) < 0)
goto err; goto err;
eventLoop->fdCmdRead = rgfd[0]; eventLoop->fdCmdRead = rgfd[0];
eventLoop->fdCmdWrite = rgfd[1]; eventLoop->fdCmdWrite = rgfd[1];;
fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK);
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
eventLoop->cevents = 0; eventLoop->cevents = 0;
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL); aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL);
@ -325,8 +359,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
cmd.fd = fd; cmd.fd = fd;
cmd.mask = mask; cmd.mask = mask;
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (cb != sizeof(cmd)) AE_ASSERT(cb == sizeof(cmd));
fprintf(stderr, "Failed to write to pipe.\n");
} }
extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)

View File

@ -131,7 +131,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} // EXTERN C } // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn); int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false);
extern "C" { extern "C" {
#endif #endif
void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -1154,6 +1154,7 @@ void disconnectSlaves(void) {
* This is used by freeClient() and replicationCacheMaster(). */ * This is used by freeClient() and replicationCacheMaster(). */
void unlinkClient(client *c) { void unlinkClient(client *c) {
listNode *ln; listNode *ln;
AssertCorrectThread(c);
/* If this is marked as current client unset it. */ /* If this is marked as current client unset it. */
if (server.current_client == c) server.current_client = NULL; if (server.current_client == c) server.current_client = NULL;
@ -1196,6 +1197,13 @@ void unlinkClient(client *c) {
listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln); listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED; c->flags &= ~CLIENT_UNBLOCKED;
} }
if (c->flags & CLIENT_PENDING_ASYNCWRITE) {
ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_asyncwrite,c);
serverAssert(ln != NULL);
listDelNode(server.rgthreadvar[c->iel].clients_pending_asyncwrite,ln);
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
}
} }
void freeClient(client *c) { void freeClient(client *c) {

View File

@ -298,6 +298,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyProtoAsync(slave,buf,buflen); addReplyProtoAsync(slave,buf,buflen);
} }
if (listLength(slaves))
ProcessPendingAsyncWrites(); // flush them to their respective threads
} }
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@ -2039,7 +2042,10 @@ void replicationHandleMasterDisconnection(void) {
* the slaves only if we'll have to do a full resync with our master. */ * the slaves only if we'll have to do a full resync with our master. */
} }
void replicaofCommandCore(client *c);
void replicaofCommand(client *c) { void replicaofCommand(client *c) {
// Changing the master needs to be done on the main thread.
/* SLAVEOF is not allowed in cluster mode as replication is automatically /* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node. */ * configured using the current address of the master node. */
if (server.cluster_enabled) { if (server.cluster_enabled) {
@ -2047,6 +2053,23 @@ void replicaofCommand(client *c) {
return; return;
} }
if ((serverTL - server.rgthreadvar) == IDX_EVENT_LOOP_MAIN)
{
replicaofCommandCore(c);
}
else
{
aeReleaseLock();
aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [=]{
replicaofCommandCore(c);
}, true /*fSync*/);
aeAcquireLock();
}
}
void replicaofCommandCore(client *c) {
/* The special host/port combination "NO" "ONE" turns the instance /* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */ * into a master. Otherwise the new master address is set. */
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") &&
@ -2068,7 +2091,7 @@ void replicaofCommand(client *c) {
if (server.masterhost && !strcasecmp(server.masterhost,(const char*)ptrFromObj(c->argv[1])) if (server.masterhost && !strcasecmp(server.masterhost,(const char*)ptrFromObj(c->argv[1]))
&& server.masterport == port) { && server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed."); serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); addReplySdsAsync(c,sdsnew("+OK Already connected to specified master\r\n"));
return; return;
} }
/* There was no previous master or the user specified a different one, /* There was no previous master or the user specified a different one,
@ -2079,7 +2102,7 @@ void replicaofCommand(client *c) {
server.masterhost, server.masterport, client); server.masterhost, server.masterport, client);
sdsfree(client); sdsfree(client);
} }
addReply(c,shared.ok); addReplyAsync(c,shared.ok);
} }
/* ROLE command: provide information about the role of the instance /* ROLE command: provide information about the role of the instance

View File

@ -3181,7 +3181,7 @@ void preventCommandReplication(client *c) {
c->flags |= CLIENT_PREVENT_REPL_PROP; c->flags |= CLIENT_PREVENT_REPL_PROP;
} }
static void ProcessPendingAsyncWrites() void ProcessPendingAsyncWrites()
{ {
while(listLength(serverTL->clients_pending_asyncwrite)) { while(listLength(serverTL->clients_pending_asyncwrite)) {
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
@ -5046,8 +5046,9 @@ int main(int argc, char **argv) {
initServer(); initServer();
server.cthreads = 2; //testing server.cthreads = 1; //testing
initNetworking(1 /* fReusePort */); initNetworking(1 /* fReusePort */);
serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN];
if (background || server.pidfile) createPidFile(); if (background || server.pidfile) createPidFile();
redisSetProcTitle(argv[0]); redisSetProcTitle(argv[0]);

View File

@ -1641,6 +1641,8 @@ void addReplySdsAsync(client *c, sds s);
void addReplyBulkSdsAsync(client *c, sds s); void addReplyBulkSdsAsync(client *c, sds s);
void addReplyPushLenAsync(client *c, long length); void addReplyPushLenAsync(client *c, long length);
void ProcessPendingAsyncWrites(void);
#ifdef __GNUC__ #ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...) void addReplyErrorFormat(client *c, const char *fmt, ...)
__attribute__((format(printf, 2, 3))); __attribute__((format(printf, 2, 3)));