restore fork impl of save to replica sockets (#622)

* restore fork impl of save to slave sockets

* default to fork bgsave
This commit is contained in:
Malavan Sotheeswaran 2023-04-06 19:13:12 -07:00 committed by GitHub
parent 77bc0afad4
commit f54e3ea367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 49 deletions

View File

@ -2806,7 +2806,7 @@ standardConfig configs[] = {
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL), createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL), createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 1, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL), createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL), createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL),
createBoolConfig("prefetch-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->prefetch_enabled, 1, NULL, NULL), createBoolConfig("prefetch-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->prefetch_enabled, 1, NULL, NULL),

View File

@ -1334,7 +1334,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < cserver.dbnum; j++) { for (j = 0; j < cserver.dbnum; j++) {
const redisDbPersistentDataSnapshot *db = rgpdb[j]; const redisDbPersistentDataSnapshot *db = rgpdb != nullptr ? rgpdb[j] : g_pserver->db[j];
if (db->size() == 0) continue; if (db->size() == 0) continue;
/* Write the SELECT DB opcode */ /* Write the SELECT DB opcode */
@ -1715,7 +1715,7 @@ void getTempFileName(char tmpfile[], int tmpfileNum) {
char tmpfileNumString[214]; char tmpfileNumString[214];
/* Generate temp rdb file name using aync-signal safe functions. */ /* Generate temp rdb file name using aync-signal safe functions. */
int pid_len = ll2string(pid, sizeof(pid), getpid()); int pid_len = ll2string(pid, sizeof(pid), g_pserver->in_fork_child ? getppid() : getpid());
int tmpfileNum_len = ll2string(tmpfileNumString, sizeof(tmpfileNumString), tmpfileNum); int tmpfileNum_len = ll2string(tmpfileNumString, sizeof(tmpfileNumString), tmpfileNum);
strcpy(tmpfile, "temp-"); strcpy(tmpfile, "temp-");
strncpy(tmpfile+5, pid, pid_len); strncpy(tmpfile+5, pid, pid_len);
@ -3832,57 +3832,130 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
} }
/* Create the child process. */ /* Create the child process. */
openChildInfoPipe(); if (cserver.fForkBgSave) {
pid_t childpid;
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;
for (int idb = 0; idb < cserver.dbnum; ++idb) rioInitWithFd(&rdb,args->rdb_pipe_write);
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /*fOptional*/);
g_pserver->rdbThreadVars.tmpfileNum++; redisSetProcTitle("keydb-rdb-to-slaves");
g_pserver->rdbThreadVars.fRdbThreadCancel = false; redisSetCpuAffinity(g_pserver->bgsave_cpulist);
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
/* Undo the state change. The caller will perform cleanup on retval = rdbSaveRioWithEOFMark(&rdb,nullptr,nullptr,rsi);
* all the slaves in BGSAVE_START state, but an early call to if (retval == C_OK && rioFlush(&rdb) == 0)
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ retval = C_ERR;
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { if (retval == C_OK) {
client *replica = (client*)ln->value; sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
} }
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
close(args->safe_to_exit_pipe);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
closeChildInfoPipe();
return C_ERR;
}
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;
serverLog(LL_NOTICE,"Background RDB transfer started"); rioFreeFd(&rdb);
g_pserver->rdb_save_time_start = time(NULL); /* wake up the reader, tell it we're done. */
serverAssert(!g_pserver->rdbThreadVars.fRdbThreadActive); close(args->rdb_pipe_write);
g_pserver->rdbThreadVars.rdb_child_thread = child; close(g_pserver->rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
g_pserver->rdbThreadVars.fRdbThreadActive = true; /* hold exit until the parent tells us it's safe. we're not expecting
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; * to read anything, just get the error when the pipe is closed. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ dummy = read(args->safe_to_exit_pipe, pipefds, 1);
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { UNUSED(dummy);
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
close(args->safe_to_exit_pipe);
if (childpid == -1) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
(long)childpid);
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
updateDictResizePolicy();
close(args->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, nullptr) == AE_ERR) {
serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event.");
}
});
}
return (childpid == -1) ? C_ERR : C_OK;
} }
}); }
else {
openChildInfoPipe();
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /*fOptional*/);
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
close(args->safe_to_exit_pipe);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
closeChildInfoPipe();
return C_ERR;
}
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;
serverLog(LL_NOTICE,"Background RDB transfer started");
g_pserver->rdb_save_time_start = time(NULL);
serverAssert(!g_pserver->rdbThreadVars.fRdbThreadActive);
g_pserver->rdbThreadVars.rdb_child_thread = child;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, nullptr) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
});
}
return C_OK; /* Unreached. */ return C_OK; /* Unreached. */
} }

View File

@ -5172,7 +5172,7 @@ int prepareForShutdown(int flags) {
* to unlink file actully) in background thread. * to unlink file actully) in background thread.
* The temp rdb file fd may won't be closed when redis exits quickly, * The temp rdb file fd may won't be closed when redis exits quickly,
* but OS will close this fd when process exits. */ * but OS will close this fd when process exits. */
rdbRemoveTempFile(g_pserver->child_pid, 0); rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum, 0);
} }
/* Kill module child if there is one. */ /* Kill module child if there is one. */