From 25c335488eef9adcded5f8972746471ed46b79bf Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 13 Mar 2019 16:53:37 -0400 Subject: [PATCH] Implement load database dumps from S3. We already save. Former-commit-id: a45f212693956a6fb1aacf465d88e940bbbfd56f --- src/debug.c | 2 +- src/rdb-s3.cpp | 62 ++++++++++++++++++++++++++++++++++++++++++++- src/rdb.c | 15 ++++++++++- src/rdb.h | 3 ++- src/replication.cpp | 2 +- src/server.cpp | 4 +-- 6 files changed, 81 insertions(+), 7 deletions(-) diff --git a/src/debug.c b/src/debug.c index d24c9ef9c..34e0ab22c 100644 --- a/src/debug.c +++ b/src/debug.c @@ -362,7 +362,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(NULL); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/rdb-s3.cpp b/src/rdb-s3.cpp index bd00bb2bd..f28bd07d5 100644 --- a/src/rdb-s3.cpp +++ b/src/rdb-s3.cpp @@ -48,4 +48,64 @@ extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) serverLog(LL_NOTICE,"DB saved on AWS S3"); return (status == EXIT_SUCCESS) ? C_OK : C_ERR; -} \ No newline at end of file +} + + +int rdbLoadS3Core(int fd, rdbSaveInfo *rsi) +{ + FILE *fp; + rio rdb; + int retval; + + if ((fp = fdopen(fd, "rb")) == NULL) return C_ERR; + startLoading(fp); + rioInitWithFile(&rdb,fileno(fp)); + retval = rdbLoadRio(&rdb,rsi,0); + fclose(fp); + stopLoading(); + return retval; +} + +int rdbLoadS3(char *s3bucket, rdbSaveInfo *rsi) +{ + int status = EXIT_FAILURE; + int fd[2]; + if (pipe(fd) != 0) + return C_ERR; + + pid_t pid = fork(); + if (pid < 0) + { + close(fd[0]); + close(fd[1]); + return C_ERR; + } + + if (pid == 0) + { + // child process + dup2(fd[1], STDOUT_FILENO); + close(fd[1]); + close(fd[0]); + execlp("aws", "aws", "s3", "cp", s3bucket, "-", nullptr); + exit(EXIT_FAILURE); + } + else + { + close(fd[1]); + if (rdbLoadS3Core(fd[0], rsi) != C_OK) + { + close(fd[0]); + return C_ERR; + } + close(fd[0]); + waitpid(pid, &status, 0); + } + + if (status != EXIT_SUCCESS) + serverLog(LL_WARNING, "Failed to load DB from AWS S3"); + else + serverLog(LL_NOTICE,"DB loaded from AWS S3"); + + return (status == EXIT_SUCCESS) ? C_OK : C_ERR; +} diff --git a/src/rdb.c b/src/rdb.c index 4a504e815..31db9d67f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2097,6 +2097,19 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return C_ERR; /* Just to avoid warning */ } +int rdbLoadFile(char *filename, rdbSaveInfo *rsi); +int rdbLoad(rdbSaveInfo *rsi) +{ + int err = C_ERR; + if (server.rdb_filename != NULL) + err = rdbLoadFile(server.rdb_filename, rsi); + + if ((err == C_ERR) && server.rdb_s3bucketpath != NULL) + err = rdbLoadS3(server.rdb_s3bucketpath, rsi); + + return err; +} + /* Like rdbLoadRio() but takes a filename instead of a rio stream. The * filename is open for reading and a rio stream object created in order * to do the actual loading. Moreover the ETA displayed in the INFO @@ -2104,7 +2117,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoadFile(char *filename, rdbSaveInfo *rsi) { FILE *fp; rio rdb; int retval; diff --git a/src/rdb.h b/src/rdb.h index 2daa49984..fcd44e742 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -135,7 +135,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -143,6 +143,7 @@ int rdbSave(rdbSaveInfo *rsi); int rdbSaveFile(char *filename, rdbSaveInfo *rsi); int rdbSaveFd(int fd, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi); +int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o); size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb); diff --git a/src/replication.cpp b/src/replication.cpp index 899fa3dcf..e86110320 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1334,7 +1334,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); /* Re-enable the AOF if we disabled it earlier, in order to restore diff --git a/src/server.cpp b/src/server.cpp index 9fd917866..b8075d20d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4668,9 +4668,9 @@ void loadDataFromDisk(void) { if (server.aof_state == AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); - } else if (server.rdb_filename != NULL) { + } else if (server.rdb_filename != NULL || server.rdb_s3bucketpath != NULL) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(&rsi) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000);