Diskless replication handler needs to run on the main thread

Former-commit-id: 091af6400348b7716f7fe0bc845bf7f16d922815
This commit is contained in:
John Sully 2020-05-23 15:15:42 -04:00
parent 646cc81c96
commit 3c0e1a1baf
2 changed files with 12 additions and 8 deletions

View File

@ -2429,7 +2429,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
val = nullptr; val = nullptr;
} }
} }
if (g_pserver->key_load_delay) if (g_pserver->key_load_delay)
usleep(g_pserver->key_load_delay); usleep(g_pserver->key_load_delay);
@ -2701,9 +2701,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdb_child_pid = childpid; g_pserver->rdb_child_pid = childpid;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
} serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
});
} }
return (childpid == -1) ? C_ERR : C_OK; return (childpid == -1) ? C_ERR : C_OK;
} }

View File

@ -1345,9 +1345,11 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
g_pserver->rdb_pipe_numconns_writing--; g_pserver->rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */ /* if there are no more writes for now for this conn, or write error: */
if (g_pserver->rdb_pipe_numconns_writing == 0) { if (g_pserver->rdb_pipe_numconns_writing == 0) {
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event."); if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
} serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event.");
}
});
} }
} }
@ -1398,11 +1400,11 @@ void RdbPipeCleanup() {
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) { void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask); UNUSED(mask);
UNUSED(clientData); UNUSED(clientData);
UNUSED(eventLoop);
int i; int i;
if (!g_pserver->rdb_pipe_buff) if (!g_pserver->rdb_pipe_buff)
g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN); g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN);
serverAssert(g_pserver->rdb_pipe_numconns_writing==0); serverAssert(g_pserver->rdb_pipe_numconns_writing==0);
serverAssert(eventLoop == g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el);
while (1) { while (1) {
g_pserver->rdb_pipe_bufflen = read(fd, g_pserver->rdb_pipe_buff, PROTO_IOBUF_LEN); g_pserver->rdb_pipe_bufflen = read(fd, g_pserver->rdb_pipe_buff, PROTO_IOBUF_LEN);