Support AWS S3 saving via the s3 cli tools

This commit is contained in:
John Sully 2019-02-06 01:06:48 -05:00
parent 3e9e84ca19
commit e38d1e6c7f
8 changed files with 46 additions and 23 deletions

View File

@ -550,6 +550,9 @@ void loadServerConfigFromString(char *config) {
} }
zfree(server.rdb_filename); zfree(server.rdb_filename);
server.rdb_filename = zstrdup(argv[1]); server.rdb_filename = zstrdup(argv[1]);
} else if(!strcasecmp(argv[0],"db-s3-object") && argc == 2) {
zfree(server.rdb_s3bucketpath);
server.rdb_s3bucketpath = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) { } else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) {
server.active_defrag_threshold_lower = atoi(argv[1]); server.active_defrag_threshold_lower = atoi(argv[1]);
if (server.active_defrag_threshold_lower < 0 || if (server.active_defrag_threshold_lower < 0 ||

View File

@ -455,7 +455,7 @@ void flushallCommand(client *c) {
int saved_dirty = server.dirty; int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr); rdbSave(rsiptr);
server.dirty = saved_dirty; server.dirty = saved_dirty;
} }
server.dirty++; server.dirty++;

View File

@ -356,7 +356,7 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) { } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { if (rdbSave(rsiptr) != C_OK) {
addReply(c,shared.err); addReply(c,shared.err);
return; return;
} }

View File

@ -8,6 +8,7 @@ extern "C" {
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */ /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
{ {
int status = EXIT_FAILURE;
int fd[2]; int fd[2];
if (pipe(fd) != 0) if (pipe(fd) != 0)
return C_ERR; return C_ERR;
@ -23,21 +24,28 @@ extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
if (pid == 0) if (pid == 0)
{ {
// child process // child process
dup2(fd[1], STDIN_FILENO); dup2(fd[0], STDIN_FILENO);
execlp("aws", "s3", "cp", "-", s3bucket, nullptr); close(fd[1]);
close(fd[0]);
execlp("aws", "aws", "s3", "cp", "-", s3bucket, nullptr);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
else else
{ {
close(fd[0]);
if (rdbSaveFd(fd[1], rsi) != C_OK)
{
close(fd[1]);
return C_ERR;
}
close(fd[1]); close(fd[1]);
rdbSaveFd(fd[0], rsi);
int status;
waitpid(pid, &status, 0); waitpid(pid, &status, 0);
} }
close(fd[0]); if (status != EXIT_SUCCESS)
serverLog(LL_WARNING, "Failed to save DB to AWS S3");
else
// NOP serverLog(LL_NOTICE,"DB saved on AWS S3");
return C_ERR;
return (status == EXIT_SUCCESS) ? C_OK : C_ERR;
} }

View File

@ -1233,8 +1233,19 @@ int rdbSaveFd(int fd, rdbSaveInfo *rsi)
return C_OK; return C_OK;
} }
int rdbSave(rdbSaveInfo *rsi)
{
int err = C_OK;
if (server.rdb_filename != NULL)
err = rdbSaveFile(server.rdb_filename, rsi);
if (err == C_OK && server.rdb_s3bucketpath != NULL)
err = rdbSaveS3(server.rdb_s3bucketpath, rsi);
return err;
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */ /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) { int rdbSaveFile(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256]; char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp; FILE *fp;
@ -1289,7 +1300,7 @@ werr:
return C_ERR; return C_ERR;
} }
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { int rdbSaveBackground(rdbSaveInfo *rsi) {
pid_t childpid; pid_t childpid;
long long start; long long start;
@ -1306,7 +1317,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
/* Child */ /* Child */
closeListeningSockets(0); closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave"); redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi); retval = rdbSave(rsi);
if (retval == C_OK) { if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1); size_t private_dirty = zmalloc_get_private_dirty(-1);
@ -2413,7 +2424,7 @@ void saveCommand(client *c) {
} }
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { if (rdbSave(rsiptr) == C_OK) {
addReply(c,shared.ok); addReply(c,shared.ok);
} else { } else {
addReply(c,shared.err); addReply(c,shared.err);
@ -2450,7 +2461,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible."); "possible.");
} }
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) { } else if (rdbSaveBackground(rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started"); addReplyStatus(c,"Background saving started");
} else { } else {
addReply(c,shared.err); addReply(c,shared.err);

View File

@ -136,10 +136,11 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o); int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb); int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi); int rdbLoad(char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid); void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi); int rdbSave(rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, rdbSaveInfo *rsi);
int rdbSaveFd(int fd, rdbSaveInfo *rsi); int rdbSaveFd(int fd, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o); ssize_t rdbSaveObject(rio *rdb, robj *o);

View File

@ -577,7 +577,7 @@ int startBgsaveForReplication(int mincapa) {
if (socket_target) if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr); retval = rdbSaveToSlavesSockets(rsiptr);
else else
retval = rdbSaveBackground(server.rdb_filename,rsiptr); retval = rdbSaveBackground(rsiptr);
} else { } else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR; retval = C_ERR;

View File

@ -1946,7 +1946,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
sp->changes, (int)sp->seconds); sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr); rdbSaveBackground(rsiptr);
break; break;
} }
} }
@ -2019,7 +2019,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{ {
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) if (rdbSaveBackground(rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0; server.rdb_bgsave_scheduled = 0;
} }
@ -2266,7 +2266,7 @@ void initServerConfig(void) {
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL; server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.rdb_filename = NULL;
server.rdb_s3bucketpath = NULL; server.rdb_s3bucketpath = NULL;
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
server.acl_filename = zstrdup(CONFIG_DEFAULT_ACL_FILENAME); server.acl_filename = zstrdup(CONFIG_DEFAULT_ACL_FILENAME);
@ -3542,7 +3542,7 @@ int prepareForShutdown(int flags) {
/* Snapshotting. Perform a SYNC SAVE and exit */ /* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) { if (rdbSave(rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue /* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process, * operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background * in the next cron() Redis will be notified that the background