diff --git a/src/rdb.cpp b/src/rdb.cpp index abaa3d608..97f8aebc0 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2429,7 +2429,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { val = nullptr; } } - + if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); @@ -2701,9 +2701,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { g_pserver->rdb_child_pid = childpid; g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ - if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { - serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); - } + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); + } + }); } return (childpid == -1) ? C_ERR : C_OK; } diff --git a/src/replication.cpp b/src/replication.cpp index 766a9ff67..48b9f21f8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1345,9 +1345,11 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { g_pserver->rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (g_pserver->rdb_pipe_numconns_writing == 0) { - if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { - serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event."); - } + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { + serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event."); + } + }); } } @@ -1398,11 +1400,11 @@ void RdbPipeCleanup() { void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { UNUSED(mask); UNUSED(clientData); - UNUSED(eventLoop); int i; if (!g_pserver->rdb_pipe_buff) g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN); serverAssert(g_pserver->rdb_pipe_numconns_writing==0); + serverAssert(eventLoop == g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el); while (1) { g_pserver->rdb_pipe_bufflen = read(fd, g_pserver->rdb_pipe_buff, PROTO_IOBUF_LEN);