Implement load database dumps from S3. We already save.
Former-commit-id: a45f212693956a6fb1aacf465d88e940bbbfd56f
This commit is contained in:
parent
a4b2ed5861
commit
25c335488e
@ -362,7 +362,7 @@ NULL
|
||||
}
|
||||
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
protectClient(c);
|
||||
int ret = rdbLoad(server.rdb_filename,NULL);
|
||||
int ret = rdbLoad(NULL);
|
||||
unprotectClient(c);
|
||||
if (ret != C_OK) {
|
||||
addReplyError(c,"Error trying to load the RDB dump");
|
||||
|
@ -48,4 +48,64 @@ extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
|
||||
serverLog(LL_NOTICE,"DB saved on AWS S3");
|
||||
|
||||
return (status == EXIT_SUCCESS) ? C_OK : C_ERR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int rdbLoadS3Core(int fd, rdbSaveInfo *rsi)
|
||||
{
|
||||
FILE *fp;
|
||||
rio rdb;
|
||||
int retval;
|
||||
|
||||
if ((fp = fdopen(fd, "rb")) == NULL) return C_ERR;
|
||||
startLoading(fp);
|
||||
rioInitWithFile(&rdb,fileno(fp));
|
||||
retval = rdbLoadRio(&rdb,rsi,0);
|
||||
fclose(fp);
|
||||
stopLoading();
|
||||
return retval;
|
||||
}
|
||||
|
||||
int rdbLoadS3(char *s3bucket, rdbSaveInfo *rsi)
|
||||
{
|
||||
int status = EXIT_FAILURE;
|
||||
int fd[2];
|
||||
if (pipe(fd) != 0)
|
||||
return C_ERR;
|
||||
|
||||
pid_t pid = fork();
|
||||
if (pid < 0)
|
||||
{
|
||||
close(fd[0]);
|
||||
close(fd[1]);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (pid == 0)
|
||||
{
|
||||
// child process
|
||||
dup2(fd[1], STDOUT_FILENO);
|
||||
close(fd[1]);
|
||||
close(fd[0]);
|
||||
execlp("aws", "aws", "s3", "cp", s3bucket, "-", nullptr);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
else
|
||||
{
|
||||
close(fd[1]);
|
||||
if (rdbLoadS3Core(fd[0], rsi) != C_OK)
|
||||
{
|
||||
close(fd[0]);
|
||||
return C_ERR;
|
||||
}
|
||||
close(fd[0]);
|
||||
waitpid(pid, &status, 0);
|
||||
}
|
||||
|
||||
if (status != EXIT_SUCCESS)
|
||||
serverLog(LL_WARNING, "Failed to load DB from AWS S3");
|
||||
else
|
||||
serverLog(LL_NOTICE,"DB loaded from AWS S3");
|
||||
|
||||
return (status == EXIT_SUCCESS) ? C_OK : C_ERR;
|
||||
}
|
||||
|
15
src/rdb.c
15
src/rdb.c
@ -2097,6 +2097,19 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
return C_ERR; /* Just to avoid warning */
|
||||
}
|
||||
|
||||
int rdbLoadFile(char *filename, rdbSaveInfo *rsi);
|
||||
int rdbLoad(rdbSaveInfo *rsi)
|
||||
{
|
||||
int err = C_ERR;
|
||||
if (server.rdb_filename != NULL)
|
||||
err = rdbLoadFile(server.rdb_filename, rsi);
|
||||
|
||||
if ((err == C_ERR) && server.rdb_s3bucketpath != NULL)
|
||||
err = rdbLoadS3(server.rdb_s3bucketpath, rsi);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||
* filename is open for reading and a rio stream object created in order
|
||||
* to do the actual loading. Moreover the ETA displayed in the INFO
|
||||
@ -2104,7 +2117,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
*
|
||||
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
|
||||
* loading code will fiil the information fields in the structure. */
|
||||
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
|
||||
int rdbLoadFile(char *filename, rdbSaveInfo *rsi) {
|
||||
FILE *fp;
|
||||
rio rdb;
|
||||
int retval;
|
||||
|
@ -135,7 +135,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
|
||||
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
|
||||
int rdbSaveObjectType(rio *rdb, robj *o);
|
||||
int rdbLoadObjectType(rio *rdb);
|
||||
int rdbLoad(char *filename, rdbSaveInfo *rsi);
|
||||
int rdbLoad(rdbSaveInfo *rsi);
|
||||
int rdbSaveBackground(rdbSaveInfo *rsi);
|
||||
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
|
||||
void rdbRemoveTempFile(pid_t childpid);
|
||||
@ -143,6 +143,7 @@ int rdbSave(rdbSaveInfo *rsi);
|
||||
int rdbSaveFile(char *filename, rdbSaveInfo *rsi);
|
||||
int rdbSaveFd(int fd, rdbSaveInfo *rsi);
|
||||
int rdbSaveS3(char *path, rdbSaveInfo *rsi);
|
||||
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj *o);
|
||||
size_t rdbSavedObjectLen(robj *o);
|
||||
robj *rdbLoadObject(int type, rio *rdb);
|
||||
|
@ -1334,7 +1334,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
aeDeleteFileEvent(el,server.repl_transfer_s,AE_READABLE);
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
|
||||
if (rdbLoad(&rsi) != C_OK) {
|
||||
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
||||
cancelReplicationHandshake();
|
||||
/* Re-enable the AOF if we disabled it earlier, in order to restore
|
||||
|
@ -4668,9 +4668,9 @@ void loadDataFromDisk(void) {
|
||||
if (server.aof_state == AOF_ON) {
|
||||
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
|
||||
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
|
||||
} else if (server.rdb_filename != NULL) {
|
||||
} else if (server.rdb_filename != NULL || server.rdb_s3bucketpath != NULL) {
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
|
||||
if (rdbLoad(&rsi) == C_OK) {
|
||||
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
|
||||
(float)(ustime()-start)/1000000);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user