diff --git a/src/aof.cpp b/src/aof.cpp index 5c6385c84..637b2ce34 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -97,6 +97,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { aofrwblock *block; ssize_t nwritten; serverAssert(GlobalLocksAcquired()); + serverAssert(el == g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el); // MUST run on main thread UNUSED(el); UNUSED(fd); @@ -164,10 +165,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (aeGetFileEvents(serverTL->el,g_pserver->aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(serverTL->el, g_pserver->aof_pipe_write_data_to_child, - AE_WRITABLE, aofChildWriteDiffData, NULL); - } + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL, FALSE); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -1519,7 +1517,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Remove the handler since this can be called only one time during a * rewrite. */ - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); } /* Create the pipes used for parent - child process IPC during rewrite. @@ -1557,12 +1555,20 @@ error: } void aofClosePipes(void) { - aeDeleteFileEventAsync(g_pserver->el_alf_pip_read_ack_from_child,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_write_data_to_child,AE_WRITABLE); - close(g_pserver->aof_pipe_write_data_to_child); + int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; + aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); + close (fdAofAckPipe); + }); + + int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); + close(fdAofWritePipe); + }); + close(g_pserver->aof_pipe_read_data_from_parent); close(g_pserver->aof_pipe_write_ack_to_parent); - close(g_pserver->aof_pipe_read_ack_from_child); close(g_pserver->aof_pipe_write_ack_to_child); close(g_pserver->aof_pipe_read_ack_from_parent); }