Refactory fork child related infra, Unify child pid
This is a refactory commit, isn't suppose to have any actual impact. it does the following: - keep just one server struct fork child pid variable instead of 3 - have one server struct variable indicating the purpose of the current fork child. - redisFork is now responsible of updating the server struct with the pid, which means it can be the one that calls updateDictResizePolicy - move child info pipe handling into redisFork instead of having them repeated outside - there are two classes of fork purposes, mutually exclusive group (AOF, RDB, Module), and one that can create several forks to coexist in parallel (LDB, but maybe Modules some day too, Module API allows for that). - minor fix to killRDBChild: unlike killAppendOnlyChild and TerminateModuleForkChild, the killRDBChild doesn't clear the pid variable or call wait4, so checkChildrenDone does the cleanup for it. This commit removes the explicit calls to rdbRemoveTempFile, closeChildInfoPipe, updateDictResizePolicy, which didn't do any harm, but where unnecessary.
This commit is contained in:
parent
b5029dfdad
commit
f9dacf8aac
32
src/aof.c
32
src/aof.c
@ -213,22 +213,20 @@ void aof_background_fsync(int fd) {
|
|||||||
void killAppendOnlyChild(void) {
|
void killAppendOnlyChild(void) {
|
||||||
int statloc;
|
int statloc;
|
||||||
/* No AOFRW child? return. */
|
/* No AOFRW child? return. */
|
||||||
if (server.aof_child_pid == -1) return;
|
if (server.child_type != CHILD_TYPE_AOF) return;
|
||||||
/* Kill AOFRW child, wait for child exit. */
|
/* Kill AOFRW child, wait for child exit. */
|
||||||
serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
|
serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
|
||||||
(long) server.aof_child_pid);
|
(long) server.child_pid);
|
||||||
if (kill(server.aof_child_pid,SIGUSR1) != -1) {
|
if (kill(server.child_pid,SIGUSR1) != -1) {
|
||||||
while(wait3(&statloc,0,NULL) != server.aof_child_pid);
|
while(wait3(&statloc,0,NULL) != server.child_pid);
|
||||||
}
|
}
|
||||||
/* Reset the buffer accumulating changes while the child saves. */
|
/* Reset the buffer accumulating changes while the child saves. */
|
||||||
aofRewriteBufferReset();
|
aofRewriteBufferReset();
|
||||||
aofRemoveTempFile(server.aof_child_pid);
|
aofRemoveTempFile(server.child_pid);
|
||||||
server.aof_child_pid = -1;
|
resetChildState();
|
||||||
server.aof_rewrite_time_start = -1;
|
server.aof_rewrite_time_start = -1;
|
||||||
/* Close pipes used for IPC between the two processes. */
|
/* Close pipes used for IPC between the two processes. */
|
||||||
aofClosePipes();
|
aofClosePipes();
|
||||||
closeChildInfoPipe();
|
|
||||||
updateDictResizePolicy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Called when the user switches from "appendonly yes" to "appendonly no"
|
/* Called when the user switches from "appendonly yes" to "appendonly no"
|
||||||
@ -265,14 +263,14 @@ int startAppendOnly(void) {
|
|||||||
strerror(errno));
|
strerror(errno));
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
if (hasActiveChildProcess() && server.aof_child_pid == -1) {
|
if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) {
|
||||||
server.aof_rewrite_scheduled = 1;
|
server.aof_rewrite_scheduled = 1;
|
||||||
serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
|
serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
|
||||||
} else {
|
} else {
|
||||||
/* If there is a pending AOF rewrite, we need to switch it off and
|
/* If there is a pending AOF rewrite, we need to switch it off and
|
||||||
* start a new one: the old one cannot be reused because it is not
|
* start a new one: the old one cannot be reused because it is not
|
||||||
* accumulating the AOF buffer. */
|
* accumulating the AOF buffer. */
|
||||||
if (server.aof_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_AOF) {
|
||||||
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
|
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
|
||||||
killAppendOnlyChild();
|
killAppendOnlyChild();
|
||||||
}
|
}
|
||||||
@ -646,7 +644,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
|
|||||||
* accumulate the differences between the child DB and the current one
|
* accumulate the differences between the child DB and the current one
|
||||||
* in a buffer, so that when the child process will do its work we
|
* in a buffer, so that when the child process will do its work we
|
||||||
* can append the differences to the new append only file. */
|
* can append the differences to the new append only file. */
|
||||||
if (server.aof_child_pid != -1)
|
if (server.child_type == CHILD_TYPE_AOF)
|
||||||
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
|
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
|
||||||
|
|
||||||
sdsfree(buf);
|
sdsfree(buf);
|
||||||
@ -1703,7 +1701,6 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
|
|
||||||
if (hasActiveChildProcess()) return C_ERR;
|
if (hasActiveChildProcess()) return C_ERR;
|
||||||
if (aofCreatePipes() != C_OK) return C_ERR;
|
if (aofCreatePipes() != C_OK) return C_ERR;
|
||||||
openChildInfoPipe();
|
|
||||||
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
|
if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {
|
||||||
char tmpfile[256];
|
char tmpfile[256];
|
||||||
|
|
||||||
@ -1720,7 +1717,6 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
if (childpid == -1) {
|
if (childpid == -1) {
|
||||||
closeChildInfoPipe();
|
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Can't rewrite append only file in background: fork: %s",
|
"Can't rewrite append only file in background: fork: %s",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
@ -1731,8 +1727,7 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
"Background append only file rewriting started by pid %ld",(long) childpid);
|
"Background append only file rewriting started by pid %ld",(long) childpid);
|
||||||
server.aof_rewrite_scheduled = 0;
|
server.aof_rewrite_scheduled = 0;
|
||||||
server.aof_rewrite_time_start = time(NULL);
|
server.aof_rewrite_time_start = time(NULL);
|
||||||
server.aof_child_pid = childpid;
|
|
||||||
updateDictResizePolicy();
|
|
||||||
/* We set appendseldb to -1 in order to force the next call to the
|
/* We set appendseldb to -1 in order to force the next call to the
|
||||||
* feedAppendOnlyFile() to issue a SELECT command, so the differences
|
* feedAppendOnlyFile() to issue a SELECT command, so the differences
|
||||||
* accumulated by the parent into server.aof_rewrite_buf will start
|
* accumulated by the parent into server.aof_rewrite_buf will start
|
||||||
@ -1745,7 +1740,7 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void bgrewriteaofCommand(client *c) {
|
void bgrewriteaofCommand(client *c) {
|
||||||
if (server.aof_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_AOF) {
|
||||||
addReplyError(c,"Background append only file rewriting already in progress");
|
addReplyError(c,"Background append only file rewriting already in progress");
|
||||||
} else if (hasActiveChildProcess()) {
|
} else if (hasActiveChildProcess()) {
|
||||||
server.aof_rewrite_scheduled = 1;
|
server.aof_rewrite_scheduled = 1;
|
||||||
@ -1803,7 +1798,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
|
|||||||
* rewritten AOF. */
|
* rewritten AOF. */
|
||||||
latencyStartMonitor(latency);
|
latencyStartMonitor(latency);
|
||||||
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
|
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
|
||||||
(int)server.aof_child_pid);
|
(int)server.child_pid);
|
||||||
newfd = open(tmpfile,O_WRONLY|O_APPEND);
|
newfd = open(tmpfile,O_WRONLY|O_APPEND);
|
||||||
if (newfd == -1) {
|
if (newfd == -1) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -1931,8 +1926,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
|
|||||||
cleanup:
|
cleanup:
|
||||||
aofClosePipes();
|
aofClosePipes();
|
||||||
aofRewriteBufferReset();
|
aofRewriteBufferReset();
|
||||||
aofRemoveTempFile(server.aof_child_pid);
|
aofRemoveTempFile(server.child_pid);
|
||||||
server.aof_child_pid = -1;
|
|
||||||
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
|
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
|
||||||
server.aof_rewrite_time_start = -1;
|
server.aof_rewrite_time_start = -1;
|
||||||
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
|
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
|
||||||
|
2
src/db.c
2
src/db.c
@ -617,7 +617,7 @@ int getFlushCommandFlags(client *c, int *flags) {
|
|||||||
/* Flushes the whole server data set. */
|
/* Flushes the whole server data set. */
|
||||||
void flushAllDataAndResetRDB(int flags) {
|
void flushAllDataAndResetRDB(int flags) {
|
||||||
server.dirty += emptyDb(-1,flags,NULL);
|
server.dirty += emptyDb(-1,flags,NULL);
|
||||||
if (server.rdb_child_pid != -1) killRDBChild();
|
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
|
||||||
if (server.saveparamslen > 0) {
|
if (server.saveparamslen > 0) {
|
||||||
/* Normally rdbSave() will reset dirty, but we don't want this here
|
/* Normally rdbSave() will reset dirty, but we don't want this here
|
||||||
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
|
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
|
||||||
|
27
src/module.c
27
src/module.c
@ -7065,23 +7065,16 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc
|
|||||||
*/
|
*/
|
||||||
int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
|
int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
|
||||||
pid_t childpid;
|
pid_t childpid;
|
||||||
if (hasActiveChildProcess()) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
openChildInfoPipe();
|
|
||||||
if ((childpid = redisFork(CHILD_TYPE_MODULE)) == 0) {
|
if ((childpid = redisFork(CHILD_TYPE_MODULE)) == 0) {
|
||||||
/* Child */
|
/* Child */
|
||||||
redisSetProcTitle("redis-module-fork");
|
redisSetProcTitle("redis-module-fork");
|
||||||
} else if (childpid == -1) {
|
} else if (childpid == -1) {
|
||||||
closeChildInfoPipe();
|
|
||||||
serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno));
|
serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno));
|
||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
server.module_child_pid = childpid;
|
|
||||||
moduleForkInfo.done_handler = cb;
|
moduleForkInfo.done_handler = cb;
|
||||||
moduleForkInfo.done_handler_user_data = user_data;
|
moduleForkInfo.done_handler_user_data = user_data;
|
||||||
updateDictResizePolicy();
|
|
||||||
serverLog(LL_VERBOSE, "Module fork started pid: %ld ", (long) childpid);
|
serverLog(LL_VERBOSE, "Module fork started pid: %ld ", (long) childpid);
|
||||||
}
|
}
|
||||||
return childpid;
|
return childpid;
|
||||||
@ -7101,22 +7094,20 @@ int RM_ExitFromChild(int retcode) {
|
|||||||
* child or the pid does not match, return C_ERR without doing anything. */
|
* child or the pid does not match, return C_ERR without doing anything. */
|
||||||
int TerminateModuleForkChild(int child_pid, int wait) {
|
int TerminateModuleForkChild(int child_pid, int wait) {
|
||||||
/* Module child should be active and pid should match. */
|
/* Module child should be active and pid should match. */
|
||||||
if (server.module_child_pid == -1 ||
|
if (server.child_type != CHILD_TYPE_MODULE ||
|
||||||
server.module_child_pid != child_pid) return C_ERR;
|
server.child_pid != child_pid) return C_ERR;
|
||||||
|
|
||||||
int statloc;
|
int statloc;
|
||||||
serverLog(LL_VERBOSE,"Killing running module fork child: %ld",
|
serverLog(LL_VERBOSE,"Killing running module fork child: %ld",
|
||||||
(long) server.module_child_pid);
|
(long) server.child_pid);
|
||||||
if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
|
if (kill(server.child_pid,SIGUSR1) != -1 && wait) {
|
||||||
while(wait4(server.module_child_pid,&statloc,0,NULL) !=
|
while(wait4(server.child_pid,&statloc,0,NULL) !=
|
||||||
server.module_child_pid);
|
server.child_pid);
|
||||||
}
|
}
|
||||||
/* Reset the buffer accumulating changes while the child saves. */
|
/* Reset the buffer accumulating changes while the child saves. */
|
||||||
server.module_child_pid = -1;
|
resetChildState();
|
||||||
moduleForkInfo.done_handler = NULL;
|
moduleForkInfo.done_handler = NULL;
|
||||||
moduleForkInfo.done_handler_user_data = NULL;
|
moduleForkInfo.done_handler_user_data = NULL;
|
||||||
closeChildInfoPipe();
|
|
||||||
updateDictResizePolicy();
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -7133,12 +7124,12 @@ int RM_KillForkChild(int child_pid) {
|
|||||||
void ModuleForkDoneHandler(int exitcode, int bysignal) {
|
void ModuleForkDoneHandler(int exitcode, int bysignal) {
|
||||||
serverLog(LL_NOTICE,
|
serverLog(LL_NOTICE,
|
||||||
"Module fork exited pid: %ld, retcode: %d, bysignal: %d",
|
"Module fork exited pid: %ld, retcode: %d, bysignal: %d",
|
||||||
(long) server.module_child_pid, exitcode, bysignal);
|
(long) server.child_pid, exitcode, bysignal);
|
||||||
if (moduleForkInfo.done_handler) {
|
if (moduleForkInfo.done_handler) {
|
||||||
moduleForkInfo.done_handler(exitcode, bysignal,
|
moduleForkInfo.done_handler(exitcode, bysignal,
|
||||||
moduleForkInfo.done_handler_user_data);
|
moduleForkInfo.done_handler_user_data);
|
||||||
}
|
}
|
||||||
server.module_child_pid = -1;
|
|
||||||
moduleForkInfo.done_handler = NULL;
|
moduleForkInfo.done_handler = NULL;
|
||||||
moduleForkInfo.done_handler_user_data = NULL;
|
moduleForkInfo.done_handler_user_data = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1358,7 +1358,7 @@ void freeClient(client *c) {
|
|||||||
* to keep data safe and we may delay configured 'save' for full sync. */
|
* to keep data safe and we may delay configured 'save' for full sync. */
|
||||||
if (server.saveparamslen == 0 &&
|
if (server.saveparamslen == 0 &&
|
||||||
c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
|
c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
|
||||||
server.rdb_child_pid != -1 &&
|
server.child_type == CHILD_TYPE_RDB &&
|
||||||
server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
|
server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
|
||||||
anyOtherSlaveWaitRdb(c) == 0)
|
anyOtherSlaveWaitRdb(c) == 0)
|
||||||
{
|
{
|
||||||
|
26
src/rdb.c
26
src/rdb.c
@ -1410,7 +1410,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
|||||||
|
|
||||||
server.dirty_before_bgsave = server.dirty;
|
server.dirty_before_bgsave = server.dirty;
|
||||||
server.lastbgsave_try = time(NULL);
|
server.lastbgsave_try = time(NULL);
|
||||||
openChildInfoPipe();
|
|
||||||
|
|
||||||
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
|
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
|
||||||
int retval;
|
int retval;
|
||||||
@ -1426,7 +1425,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
|||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
if (childpid == -1) {
|
if (childpid == -1) {
|
||||||
closeChildInfoPipe();
|
|
||||||
server.lastbgsave_status = C_ERR;
|
server.lastbgsave_status = C_ERR;
|
||||||
serverLog(LL_WARNING,"Can't save in background: fork: %s",
|
serverLog(LL_WARNING,"Can't save in background: fork: %s",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
@ -1434,9 +1432,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
|||||||
}
|
}
|
||||||
serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
|
serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
|
||||||
server.rdb_save_time_start = time(NULL);
|
server.rdb_save_time_start = time(NULL);
|
||||||
server.rdb_child_pid = childpid;
|
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
|
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
|
||||||
updateDictResizePolicy();
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
return C_OK; /* unreached */
|
return C_OK; /* unreached */
|
||||||
@ -2654,7 +2650,7 @@ static void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
|
|||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Background saving terminated by signal %d", bysignal);
|
"Background saving terminated by signal %d", bysignal);
|
||||||
latencyStartMonitor(latency);
|
latencyStartMonitor(latency);
|
||||||
rdbRemoveTempFile(server.rdb_child_pid, 0);
|
rdbRemoveTempFile(server.child_pid, 0);
|
||||||
latencyEndMonitor(latency);
|
latencyEndMonitor(latency);
|
||||||
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
|
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
|
||||||
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
|
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
|
||||||
@ -2706,7 +2702,6 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
server.rdb_child_pid = -1;
|
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
||||||
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
|
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
|
||||||
server.rdb_save_time_start = -1;
|
server.rdb_save_time_start = -1;
|
||||||
@ -2719,10 +2714,13 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
|
|||||||
* the child did not exit for an error, but because we wanted), and performs
|
* the child did not exit for an error, but because we wanted), and performs
|
||||||
* the cleanup needed. */
|
* the cleanup needed. */
|
||||||
void killRDBChild(void) {
|
void killRDBChild(void) {
|
||||||
kill(server.rdb_child_pid,SIGUSR1);
|
kill(server.child_pid, SIGUSR1);
|
||||||
rdbRemoveTempFile(server.rdb_child_pid, 0);
|
/* Because we are not using here wait4 (like we have in killAppendOnlyChild
|
||||||
closeChildInfoPipe();
|
* and TerminateModuleForkChild), all the cleanup operations is done by
|
||||||
updateDictResizePolicy();
|
* checkChildrenDone, that later will find that the process killed.
|
||||||
|
* This includes:
|
||||||
|
* - resetChildState
|
||||||
|
* - rdbRemoveTempFile */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
|
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
|
||||||
@ -2773,7 +2771,6 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Create the child process. */
|
/* Create the child process. */
|
||||||
openChildInfoPipe();
|
|
||||||
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
|
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
|
||||||
/* Child */
|
/* Child */
|
||||||
int retval, dummy;
|
int retval, dummy;
|
||||||
@ -2824,14 +2821,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
server.rdb_pipe_conns = NULL;
|
server.rdb_pipe_conns = NULL;
|
||||||
server.rdb_pipe_numconns = 0;
|
server.rdb_pipe_numconns = 0;
|
||||||
server.rdb_pipe_numconns_writing = 0;
|
server.rdb_pipe_numconns_writing = 0;
|
||||||
closeChildInfoPipe();
|
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
|
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
|
||||||
(long) childpid);
|
(long) childpid);
|
||||||
server.rdb_save_time_start = time(NULL);
|
server.rdb_save_time_start = time(NULL);
|
||||||
server.rdb_child_pid = childpid;
|
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
|
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
|
||||||
updateDictResizePolicy();
|
|
||||||
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
|
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
|
||||||
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
|
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
|
||||||
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
|
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
|
||||||
@ -2843,7 +2837,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void saveCommand(client *c) {
|
void saveCommand(client *c) {
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_RDB) {
|
||||||
addReplyError(c,"Background save already in progress");
|
addReplyError(c,"Background save already in progress");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -2874,7 +2868,7 @@ void bgsaveCommand(client *c) {
|
|||||||
rdbSaveInfo rsi, *rsiptr;
|
rdbSaveInfo rsi, *rsiptr;
|
||||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||||
|
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_RDB) {
|
||||||
addReplyError(c,"Background save already in progress");
|
addReplyError(c,"Background save already in progress");
|
||||||
} else if (hasActiveChildProcess()) {
|
} else if (hasActiveChildProcess()) {
|
||||||
if (schedule) {
|
if (schedule) {
|
||||||
|
@ -786,7 +786,7 @@ void syncCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* CASE 1: BGSAVE is in progress, with disk target. */
|
/* CASE 1: BGSAVE is in progress, with disk target. */
|
||||||
if (server.rdb_child_pid != -1 &&
|
if (server.child_type == CHILD_TYPE_RDB &&
|
||||||
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
|
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
|
||||||
{
|
{
|
||||||
/* Ok a background save is in progress. Let's check if it is a good
|
/* Ok a background save is in progress. Let's check if it is a good
|
||||||
@ -816,7 +816,7 @@ void syncCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* CASE 2: BGSAVE is in progress, with socket target. */
|
/* CASE 2: BGSAVE is in progress, with socket target. */
|
||||||
} else if (server.rdb_child_pid != -1 &&
|
} else if (server.child_type == CHILD_TYPE_RDB &&
|
||||||
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
|
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
|
||||||
{
|
{
|
||||||
/* There is an RDB child process but it is writing directly to
|
/* There is an RDB child process but it is writing directly to
|
||||||
@ -914,7 +914,7 @@ void replconfCommand(client *c) {
|
|||||||
* There's a chance the ACK got to us before we detected that the
|
* There's a chance the ACK got to us before we detected that the
|
||||||
* bgsave is done (since that depends on cron ticks), so run a
|
* bgsave is done (since that depends on cron ticks), so run a
|
||||||
* quick check first (instead of waiting for the next ACK. */
|
* quick check first (instead of waiting for the next ACK. */
|
||||||
if (server.rdb_child_pid != -1 && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
|
if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
|
||||||
checkChildrenDone();
|
checkChildrenDone();
|
||||||
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
|
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
|
||||||
putSlaveOnline(c);
|
putSlaveOnline(c);
|
||||||
@ -1721,13 +1721,13 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
connRecvTimeout(conn,0);
|
connRecvTimeout(conn,0);
|
||||||
} else {
|
} else {
|
||||||
/* Ensure background save doesn't overwrite synced data */
|
/* Ensure background save doesn't overwrite synced data */
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_RDB) {
|
||||||
serverLog(LL_NOTICE,
|
serverLog(LL_NOTICE,
|
||||||
"Replica is about to load the RDB file received from the "
|
"Replica is about to load the RDB file received from the "
|
||||||
"master, but there is a pending RDB child running. "
|
"master, but there is a pending RDB child running. "
|
||||||
"Killing process %ld and removing its temp file to avoid "
|
"Killing process %ld and removing its temp file to avoid "
|
||||||
"any race",
|
"any race",
|
||||||
(long) server.rdb_child_pid);
|
(long) server.child_pid);
|
||||||
killRDBChild();
|
killRDBChild();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
103
src/server.c
103
src/server.c
@ -1559,12 +1559,32 @@ void updateDictResizePolicy(void) {
|
|||||||
dictDisableResize();
|
dictDisableResize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char *strChildType(int type) {
|
||||||
|
switch(type) {
|
||||||
|
case CHILD_TYPE_RDB: return "RDB";
|
||||||
|
case CHILD_TYPE_AOF: return "AOF";
|
||||||
|
case CHILD_TYPE_LDB: return "LDB";
|
||||||
|
case CHILD_TYPE_MODULE: return "MODULE";
|
||||||
|
default: return "Unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Return true if there are active children processes doing RDB saving,
|
/* Return true if there are active children processes doing RDB saving,
|
||||||
* AOF rewriting, or some side process spawned by a loaded module. */
|
* AOF rewriting, or some side process spawned by a loaded module. */
|
||||||
int hasActiveChildProcess() {
|
int hasActiveChildProcess() {
|
||||||
return server.rdb_child_pid != -1 ||
|
return server.child_pid != -1;
|
||||||
server.aof_child_pid != -1 ||
|
}
|
||||||
server.module_child_pid != -1;
|
|
||||||
|
void resetChildState() {
|
||||||
|
server.child_type = CHILD_TYPE_NONE;
|
||||||
|
server.child_pid = -1;
|
||||||
|
updateDictResizePolicy();
|
||||||
|
closeChildInfoPipe();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return if child type is mutual exclusive with other fork children */
|
||||||
|
int isMutuallyExclusiveChildType(int type) {
|
||||||
|
return type == CHILD_TYPE_RDB || type == CHILD_TYPE_AOF || type == CHILD_TYPE_MODULE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return true if this instance has persistence completely turned off:
|
/* Return true if this instance has persistence completely turned off:
|
||||||
@ -1886,20 +1906,23 @@ void checkChildrenDone(void) {
|
|||||||
|
|
||||||
if (pid == -1) {
|
if (pid == -1) {
|
||||||
serverLog(LL_WARNING,"wait3() returned an error: %s. "
|
serverLog(LL_WARNING,"wait3() returned an error: %s. "
|
||||||
"rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
|
"child_type: %s, child_pid = %d",
|
||||||
strerror(errno),
|
strerror(errno),
|
||||||
(int) server.rdb_child_pid,
|
strChildType(server.child_type),
|
||||||
(int) server.aof_child_pid,
|
(int) server.child_pid);
|
||||||
(int) server.module_child_pid);
|
} else if (pid == server.child_pid) {
|
||||||
} else if (pid == server.rdb_child_pid) {
|
if (server.child_type == CHILD_TYPE_RDB) {
|
||||||
backgroundSaveDoneHandler(exitcode, bysignal);
|
backgroundSaveDoneHandler(exitcode, bysignal);
|
||||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
} else if (server.child_type == CHILD_TYPE_AOF) {
|
||||||
} else if (pid == server.aof_child_pid) {
|
|
||||||
backgroundRewriteDoneHandler(exitcode, bysignal);
|
backgroundRewriteDoneHandler(exitcode, bysignal);
|
||||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
} else if (server.child_type == CHILD_TYPE_MODULE) {
|
||||||
} else if (pid == server.module_child_pid) {
|
|
||||||
ModuleForkDoneHandler(exitcode, bysignal);
|
ModuleForkDoneHandler(exitcode, bysignal);
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||||
|
resetChildState();
|
||||||
} else {
|
} else {
|
||||||
if (!ldbRemoveChild(pid)) {
|
if (!ldbRemoveChild(pid)) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -1907,8 +1930,6 @@ void checkChildrenDone(void) {
|
|||||||
(long) pid);
|
(long) pid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
updateDictResizePolicy();
|
|
||||||
closeChildInfoPipe();
|
|
||||||
|
|
||||||
/* start any pending forks immediately. */
|
/* start any pending forks immediately. */
|
||||||
replicationStartPendingFork();
|
replicationStartPendingFork();
|
||||||
@ -3087,9 +3108,8 @@ void initServer(void) {
|
|||||||
server.in_eval = 0;
|
server.in_eval = 0;
|
||||||
server.in_exec = 0;
|
server.in_exec = 0;
|
||||||
server.propagate_in_transaction = 0;
|
server.propagate_in_transaction = 0;
|
||||||
server.rdb_child_pid = -1;
|
server.child_pid = -1;
|
||||||
server.aof_child_pid = -1;
|
server.child_type = CHILD_TYPE_NONE;
|
||||||
server.module_child_pid = -1;
|
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
||||||
server.rdb_pipe_conns = NULL;
|
server.rdb_pipe_conns = NULL;
|
||||||
server.rdb_pipe_numconns = 0;
|
server.rdb_pipe_numconns = 0;
|
||||||
@ -4056,25 +4076,28 @@ int prepareForShutdown(int flags) {
|
|||||||
/* Kill the saving child if there is a background saving in progress.
|
/* Kill the saving child if there is a background saving in progress.
|
||||||
We want to avoid race conditions, for instance our saving child may
|
We want to avoid race conditions, for instance our saving child may
|
||||||
overwrite the synchronous saving did by SHUTDOWN. */
|
overwrite the synchronous saving did by SHUTDOWN. */
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_RDB) {
|
||||||
serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
|
serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
|
||||||
/* Note that, in killRDBChild, we call rdbRemoveTempFile that will
|
killRDBChild();
|
||||||
* do close fd(in order to unlink file actully) in background thread.
|
/* Note that, in killRDBChild normally has backgroundSaveDoneHandler
|
||||||
|
* doing it's cleanup, but in this case this code will not be reached,
|
||||||
|
* so we need to call rdbRemoveTempFile which will close fd(in order
|
||||||
|
* to unlink file actully) in background thread.
|
||||||
* The temp rdb file fd may won't be closed when redis exits quickly,
|
* The temp rdb file fd may won't be closed when redis exits quickly,
|
||||||
* but OS will close this fd when process exits. */
|
* but OS will close this fd when process exits. */
|
||||||
killRDBChild();
|
rdbRemoveTempFile(server.child_pid, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Kill module child if there is one. */
|
/* Kill module child if there is one. */
|
||||||
if (server.module_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_MODULE) {
|
||||||
serverLog(LL_WARNING,"There is a module fork child. Killing it!");
|
serverLog(LL_WARNING,"There is a module fork child. Killing it!");
|
||||||
TerminateModuleForkChild(server.module_child_pid,0);
|
TerminateModuleForkChild(server.child_pid,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.aof_state != AOF_OFF) {
|
if (server.aof_state != AOF_OFF) {
|
||||||
/* Kill the AOF saving child as the AOF we already have may be longer
|
/* Kill the AOF saving child as the AOF we already have may be longer
|
||||||
* but contains the full dataset anyway. */
|
* but contains the full dataset anyway. */
|
||||||
if (server.aof_child_pid != -1) {
|
if (server.child_type == CHILD_TYPE_AOF) {
|
||||||
/* If we have AOF enabled but haven't written the AOF yet, don't
|
/* If we have AOF enabled but haven't written the AOF yet, don't
|
||||||
* shutdown or else the dataset will be lost. */
|
* shutdown or else the dataset will be lost. */
|
||||||
if (server.aof_state == AOF_WAIT_REWRITE) {
|
if (server.aof_state == AOF_WAIT_REWRITE) {
|
||||||
@ -4613,23 +4636,23 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"module_fork_last_cow_size:%zu\r\n",
|
"module_fork_last_cow_size:%zu\r\n",
|
||||||
server.loading,
|
server.loading,
|
||||||
server.dirty,
|
server.dirty,
|
||||||
server.rdb_child_pid != -1,
|
server.child_type == CHILD_TYPE_RDB,
|
||||||
(intmax_t)server.lastsave,
|
(intmax_t)server.lastsave,
|
||||||
(server.lastbgsave_status == C_OK) ? "ok" : "err",
|
(server.lastbgsave_status == C_OK) ? "ok" : "err",
|
||||||
(intmax_t)server.rdb_save_time_last,
|
(intmax_t)server.rdb_save_time_last,
|
||||||
(intmax_t)((server.rdb_child_pid == -1) ?
|
(intmax_t)((server.child_type != CHILD_TYPE_RDB) ?
|
||||||
-1 : time(NULL)-server.rdb_save_time_start),
|
-1 : time(NULL)-server.rdb_save_time_start),
|
||||||
server.stat_rdb_cow_bytes,
|
server.stat_rdb_cow_bytes,
|
||||||
server.aof_state != AOF_OFF,
|
server.aof_state != AOF_OFF,
|
||||||
server.aof_child_pid != -1,
|
server.child_type == CHILD_TYPE_AOF,
|
||||||
server.aof_rewrite_scheduled,
|
server.aof_rewrite_scheduled,
|
||||||
(intmax_t)server.aof_rewrite_time_last,
|
(intmax_t)server.aof_rewrite_time_last,
|
||||||
(intmax_t)((server.aof_child_pid == -1) ?
|
(intmax_t)((server.child_type != CHILD_TYPE_AOF) ?
|
||||||
-1 : time(NULL)-server.aof_rewrite_time_start),
|
-1 : time(NULL)-server.aof_rewrite_time_start),
|
||||||
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
|
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
|
||||||
(server.aof_last_write_status == C_OK) ? "ok" : "err",
|
(server.aof_last_write_status == C_OK) ? "ok" : "err",
|
||||||
server.stat_aof_cow_bytes,
|
server.stat_aof_cow_bytes,
|
||||||
server.module_child_pid != -1,
|
server.child_type == CHILD_TYPE_MODULE,
|
||||||
server.stat_module_cow_bytes);
|
server.stat_module_cow_bytes);
|
||||||
|
|
||||||
if (server.aof_enabled) {
|
if (server.aof_enabled) {
|
||||||
@ -5289,6 +5312,13 @@ void closeClildUnusedResourceAfterFork() {
|
|||||||
|
|
||||||
/* purpose is one of CHILD_TYPE_ types */
|
/* purpose is one of CHILD_TYPE_ types */
|
||||||
int redisFork(int purpose) {
|
int redisFork(int purpose) {
|
||||||
|
if (isMutuallyExclusiveChildType(purpose)) {
|
||||||
|
if (hasActiveChildProcess())
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
openChildInfoPipe();
|
||||||
|
}
|
||||||
|
|
||||||
int childpid;
|
int childpid;
|
||||||
long long start = ustime();
|
long long start = ustime();
|
||||||
if ((childpid = fork()) == 0) {
|
if ((childpid = fork()) == 0) {
|
||||||
@ -5304,8 +5334,23 @@ int redisFork(int purpose) {
|
|||||||
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
||||||
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
||||||
if (childpid == -1) {
|
if (childpid == -1) {
|
||||||
|
if (isMutuallyExclusiveChildType(purpose)) closeChildInfoPipe();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* The child_pid and child_type are only for mutual exclusive children.
|
||||||
|
* other child types should handle and store their pid's in dedicated variables.
|
||||||
|
*
|
||||||
|
* Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types:
|
||||||
|
* - it isn't used for production, so it will not make the server to be less efficient
|
||||||
|
* - used for debugging, and we don't want to block it from running while other
|
||||||
|
* forks are running (like RDB and AOF) */
|
||||||
|
if (isMutuallyExclusiveChildType(purpose)) {
|
||||||
|
server.child_pid = childpid;
|
||||||
|
server.child_type = purpose;
|
||||||
|
}
|
||||||
|
|
||||||
|
updateDictResizePolicy();
|
||||||
}
|
}
|
||||||
return childpid;
|
return childpid;
|
||||||
}
|
}
|
||||||
|
@ -1145,7 +1145,8 @@ struct redisServer {
|
|||||||
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
||||||
client blocked on a module command needs
|
client blocked on a module command needs
|
||||||
to be processed. */
|
to be processed. */
|
||||||
pid_t module_child_pid; /* PID of module child */
|
pid_t child_pid; /* PID of current child */
|
||||||
|
int child_type; /* Type of current child */
|
||||||
/* Networking */
|
/* Networking */
|
||||||
int port; /* TCP listening port */
|
int port; /* TCP listening port */
|
||||||
int tls_port; /* TLS listening port */
|
int tls_port; /* TLS listening port */
|
||||||
@ -1281,7 +1282,6 @@ struct redisServer {
|
|||||||
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
|
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
|
||||||
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
|
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
|
||||||
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
|
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
|
||||||
pid_t aof_child_pid; /* PID if rewriting process */
|
|
||||||
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
|
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
|
||||||
sds aof_buf; /* AOF buffer, written before entering the event loop */
|
sds aof_buf; /* AOF buffer, written before entering the event loop */
|
||||||
int aof_fd; /* File descriptor of currently selected AOF file */
|
int aof_fd; /* File descriptor of currently selected AOF file */
|
||||||
@ -1311,7 +1311,6 @@ struct redisServer {
|
|||||||
/* RDB persistence */
|
/* RDB persistence */
|
||||||
long long dirty; /* Changes to DB from the last save */
|
long long dirty; /* Changes to DB from the last save */
|
||||||
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
|
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
|
||||||
pid_t rdb_child_pid; /* PID of RDB saving child */
|
|
||||||
struct saveparam *saveparams; /* Save points array for RDB */
|
struct saveparam *saveparams; /* Save points array for RDB */
|
||||||
int saveparamslen; /* Number of saving points */
|
int saveparamslen; /* Number of saving points */
|
||||||
char *rdb_filename; /* Name of RDB file */
|
char *rdb_filename; /* Name of RDB file */
|
||||||
@ -2007,6 +2006,8 @@ void receiveChildInfo(void);
|
|||||||
/* Fork helpers */
|
/* Fork helpers */
|
||||||
int redisFork(int type);
|
int redisFork(int type);
|
||||||
int hasActiveChildProcess();
|
int hasActiveChildProcess();
|
||||||
|
void resetChildState();
|
||||||
|
int isMutuallyExclusiveChildType(int type);
|
||||||
void sendChildCOWInfo(int ptype, char *pname);
|
void sendChildCOWInfo(int ptype, char *pname);
|
||||||
|
|
||||||
/* acl.c -- Authentication related prototypes. */
|
/* acl.c -- Authentication related prototypes. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user