Add back user space buffering of RDB save
Former-commit-id: d918ca6fa57a6149b86b4effc787dbdde7350133
This commit is contained in:
parent
a686622540
commit
a43a383361
@ -752,7 +752,7 @@ int loadAppendOnlyFile(char *filename) {
|
||||
|
||||
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
|
||||
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
|
||||
rioInitWithFile(&rdb,fileno(fp));
|
||||
rioInitWithFile(&rdb,fp);
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (rdbLoadRio(&rdb,&rsi,1) != C_OK) {
|
||||
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
|
||||
@ -1400,7 +1400,7 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
}
|
||||
|
||||
g_pserver->aof_child_diff = sdsempty();
|
||||
rioInitWithFile(&aof,fileno(fp));
|
||||
rioInitWithFile(&aof,fp);
|
||||
|
||||
if (g_pserver->aof_rewrite_incremental_fsync)
|
||||
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
|
||||
|
@ -33,12 +33,19 @@ int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
|
||||
else
|
||||
{
|
||||
close(fd[0]);
|
||||
if (rdbSaveFd(fd[1], rsi) != C_OK)
|
||||
FILE *fp = fdopen(fd[1], "w");
|
||||
if (fp == NULL)
|
||||
{
|
||||
close(fd[1]);
|
||||
close (fd[1]);
|
||||
return C_ERR;
|
||||
}
|
||||
close(fd[1]);
|
||||
|
||||
if (rdbSaveFp(fp, rsi) != C_OK)
|
||||
{
|
||||
fclose(fp);
|
||||
return C_ERR;
|
||||
}
|
||||
fclose(fp);
|
||||
waitpid(pid, &status, 0);
|
||||
}
|
||||
|
||||
@ -59,7 +66,7 @@ int rdbLoadS3Core(int fd, rdbSaveInfo *rsi)
|
||||
|
||||
if ((fp = fdopen(fd, "rb")) == NULL) return C_ERR;
|
||||
startLoading(fp);
|
||||
rioInitWithFile(&rdb,fileno(fp));
|
||||
rioInitWithFile(&rdb,fp);
|
||||
retval = rdbLoadRio(&rdb,rsi,0);
|
||||
fclose(fp);
|
||||
stopLoading();
|
||||
|
@ -1220,12 +1220,12 @@ werr: /* Write error. */
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
int rdbSaveFd(int fd, rdbSaveInfo *rsi)
|
||||
int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi)
|
||||
{
|
||||
int error = 0;
|
||||
rio rdb;
|
||||
|
||||
rioInitWithFile(&rdb,fd);
|
||||
rioInitWithFile(&rdb,fp);
|
||||
|
||||
if (g_pserver->rdb_save_incremental_fsync)
|
||||
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
|
||||
@ -1267,7 +1267,7 @@ int rdbSaveFile(char *filename, rdbSaveInfo *rsi) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (rdbSaveFd(fileno(fp), rsi) == C_ERR){
|
||||
if (rdbSaveFp(fp, rsi) == C_ERR){
|
||||
goto werr;
|
||||
}
|
||||
|
||||
@ -2151,7 +2151,7 @@ int rdbLoadFile(char *filename, rdbSaveInfo *rsi) {
|
||||
|
||||
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
|
||||
startLoading(fp);
|
||||
rioInitWithFile(&rdb,fileno(fp));
|
||||
rioInitWithFile(&rdb,fp);
|
||||
retval = rdbLoadRio(&rdb,rsi,0);
|
||||
fclose(fp);
|
||||
stopLoading();
|
||||
|
@ -141,7 +141,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
|
||||
void rdbRemoveTempFile(pid_t childpid);
|
||||
int rdbSave(rdbSaveInfo *rsi);
|
||||
int rdbSaveFile(char *filename, rdbSaveInfo *rsi);
|
||||
int rdbSaveFd(int fd, rdbSaveInfo *rsi);
|
||||
int rdbSaveFp(FILE *pf, rdbSaveInfo *rsi);
|
||||
int rdbSaveS3(char *path, rdbSaveInfo *rsi);
|
||||
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key);
|
||||
|
@ -186,7 +186,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
|
||||
int closefile = (fp == NULL);
|
||||
if (fp == NULL && (fp = fopen(rdbfilename,"r")) == NULL) return 1;
|
||||
|
||||
rioInitWithFile(&rdb,fileno(fp));
|
||||
rioInitWithFile(&rdb,fp);
|
||||
rdbstate.rio = &rdb;
|
||||
rdb.update_cksum = rdbLoadProgressCallback;
|
||||
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
|
||||
|
15
src/rio.cpp
15
src/rio.cpp
@ -109,13 +109,14 @@ void rioInitWithBuffer(rio *r, sds s) {
|
||||
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
||||
size_t retval;
|
||||
|
||||
retval = write(r->io.file.fd,buf,len);
|
||||
retval = fwrite(buf,len,1,r->io.file.fp);
|
||||
r->io.file.buffered += len;
|
||||
|
||||
if (r->io.file.autosync &&
|
||||
r->io.file.buffered >= r->io.file.autosync)
|
||||
{
|
||||
redis_fsync(r->io.file.fd);
|
||||
fflush(r->io.file.fp);
|
||||
redis_fsync(fileno(r->io.file.fp));
|
||||
r->io.file.buffered = 0;
|
||||
}
|
||||
return retval;
|
||||
@ -123,18 +124,18 @@ static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
||||
|
||||
/* Returns 1 or 0 for success/failure. */
|
||||
static size_t rioFileRead(rio *r, void *buf, size_t len) {
|
||||
return read(r->io.file.fd,buf,len);
|
||||
return fread(buf,len,1,r->io.file.fp);
|
||||
}
|
||||
|
||||
/* Returns read/write position in file. */
|
||||
static off_t rioFileTell(rio *r) {
|
||||
return lseek(r->io.file.fd, 0, SEEK_CUR);
|
||||
return ftello(r->io.file.fp);
|
||||
}
|
||||
|
||||
/* Flushes any buffer to target device if applicable. Returns 1 on success
|
||||
* and 0 on failures. */
|
||||
static int rioFileFlush(rio *r) {
|
||||
return (fsync(r->io.file.fd) == 0) ? 1 : 0;
|
||||
return (fflush(r->io.file.fp) == 0) ? 1 : 0;
|
||||
}
|
||||
|
||||
static const rio rioFileIO = {
|
||||
@ -149,9 +150,9 @@ static const rio rioFileIO = {
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
};
|
||||
|
||||
void rioInitWithFile(rio *r, int fd) {
|
||||
void rioInitWithFile(rio *r, FILE *fp) {
|
||||
*r = rioFileIO;
|
||||
r->io.file.fd = fd;
|
||||
r->io.file.fp = fp;
|
||||
r->io.file.buffered = 0;
|
||||
r->io.file.autosync = 0;
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ struct _rio {
|
||||
} buffer;
|
||||
/* Stdio file pointer target. */
|
||||
struct {
|
||||
int fd;
|
||||
FILE *fp;
|
||||
off_t buffered; /* Bytes written since last fsync. */
|
||||
off_t autosync; /* fsync after 'autosync' bytes written. */
|
||||
} file;
|
||||
@ -128,7 +128,7 @@ static inline int rioFlush(rio *r) {
|
||||
return r->flush(r);
|
||||
}
|
||||
|
||||
void rioInitWithFile(rio *r, int fd);
|
||||
void rioInitWithFile(rio *r, FILE *fp);
|
||||
void rioInitWithBuffer(rio *r, sds s);
|
||||
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user