Use rio.h functions in aof.c
This commit is contained in:
parent
15788d3c5a
commit
a67b0922d7
95
src/aof.c
95
src/aof.c
@ -1,5 +1,4 @@
|
||||
#include "redis.h"
|
||||
|
||||
#include <signal.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
@ -7,6 +6,7 @@
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/wait.h>
|
||||
#include "rio.h"
|
||||
|
||||
/* Called when the user switches from "appendonly yes" to "appendonly no"
|
||||
* at runtime using the CONFIG command. */
|
||||
@ -313,11 +313,26 @@ fmterr:
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Delegate writing an object to writing a bulk string or bulk long long.
|
||||
* This is not placed in rio.c since that adds the redis.h dependency. */
|
||||
int rioWriteBulkObject(rio *r, robj *obj) {
|
||||
/* Avoid using getDecodedObject to help copy-on-write (we are often
|
||||
* in a child process when this function is called). */
|
||||
if (obj->encoding == REDIS_ENCODING_INT) {
|
||||
return rioWriteBulkLongLong(r,(long)obj->ptr);
|
||||
} else if (obj->encoding == REDIS_ENCODING_RAW) {
|
||||
return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
|
||||
} else {
|
||||
redisPanic("Unknown string encoding");
|
||||
}
|
||||
}
|
||||
|
||||
/* Write a sequence of commands able to fully rebuild the dataset into
|
||||
* "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
|
||||
int rewriteAppendOnlyFile(char *filename) {
|
||||
dictIterator *di = NULL;
|
||||
dictEntry *de;
|
||||
rio aof;
|
||||
FILE *fp;
|
||||
char tmpfile[256];
|
||||
int j;
|
||||
@ -331,6 +346,8 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
aof = rioInitWithFile(fp);
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
|
||||
redisDb *db = server.db+j;
|
||||
@ -343,8 +360,8 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
}
|
||||
|
||||
/* SELECT the new DB */
|
||||
if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkLongLong(fp,j) == 0) goto werr;
|
||||
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
|
||||
|
||||
/* Iterate this DB writing every entry */
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
@ -362,10 +379,10 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
if (o->type == REDIS_STRING) {
|
||||
/* Emit a SET command */
|
||||
char cmd[]="*3\r\n$3\r\nSET\r\n";
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
/* Key and value */
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,o) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
|
||||
} else if (o->type == REDIS_LIST) {
|
||||
/* Emit the RPUSHes needed to rebuild the list */
|
||||
char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
|
||||
@ -377,13 +394,13 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
long long vlong;
|
||||
|
||||
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (vstr) {
|
||||
if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
|
||||
if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
|
||||
goto werr;
|
||||
} else {
|
||||
if (fwriteBulkLongLong(fp,vlong) == 0)
|
||||
if (rioWriteBulkLongLong(&aof,vlong) == 0)
|
||||
goto werr;
|
||||
}
|
||||
p = ziplistNext(zl,p);
|
||||
@ -397,9 +414,9 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
while((ln = listNext(&li))) {
|
||||
robj *eleobj = listNodeValue(ln);
|
||||
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unknown list encoding");
|
||||
@ -412,18 +429,18 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
int ii = 0;
|
||||
int64_t llval;
|
||||
while(intsetGet(o->ptr,ii++,&llval)) {
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr;
|
||||
}
|
||||
} else if (o->encoding == REDIS_ENCODING_HT) {
|
||||
dictIterator *di = dictGetIterator(o->ptr);
|
||||
dictEntry *de;
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *eleobj = dictGetEntryKey(de);
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
} else {
|
||||
@ -450,14 +467,14 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkDouble(fp,score) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkDouble(&aof,score) == 0) goto werr;
|
||||
if (vstr != NULL) {
|
||||
if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
|
||||
if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
|
||||
goto werr;
|
||||
} else {
|
||||
if (fwriteBulkLongLong(fp,vll) == 0)
|
||||
if (rioWriteBulkLongLong(&aof,vll) == 0)
|
||||
goto werr;
|
||||
}
|
||||
zzlNext(zl,&eptr,&sptr);
|
||||
@ -471,10 +488,10 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
robj *eleobj = dictGetEntryKey(de);
|
||||
double *score = dictGetEntryVal(de);
|
||||
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkDouble(fp,*score) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkDouble(&aof,*score) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
} else {
|
||||
@ -490,11 +507,11 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
unsigned int flen, vlen;
|
||||
|
||||
while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkString(fp,(char*)field,flen) == 0)
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkString(&aof,(char*)field,flen) == 0)
|
||||
goto werr;
|
||||
if (fwriteBulkString(fp,(char*)val,vlen) == 0)
|
||||
if (rioWriteBulkString(&aof,(char*)val,vlen) == 0)
|
||||
goto werr;
|
||||
}
|
||||
} else {
|
||||
@ -505,10 +522,10 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
robj *field = dictGetEntryKey(de);
|
||||
robj *val = dictGetEntryVal(de);
|
||||
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,field) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,val) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,field) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,val) == 0) goto werr;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
@ -520,9 +537,9 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
|
||||
/* If this key is already expired skip it */
|
||||
if (expiretime < now) continue;
|
||||
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
|
||||
if (fwriteBulkObject(fp,&key) == 0) goto werr;
|
||||
if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
@ -841,11 +841,6 @@ unsigned long estimateObjectIdleTime(robj *o);
|
||||
int syncWrite(int fd, char *ptr, ssize_t size, int timeout);
|
||||
int syncRead(int fd, char *ptr, ssize_t size, int timeout);
|
||||
int syncReadLine(int fd, char *ptr, ssize_t size, int timeout);
|
||||
int fwriteBulkString(FILE *fp, char *s, unsigned long len);
|
||||
int fwriteBulkDouble(FILE *fp, double d);
|
||||
int fwriteBulkLongLong(FILE *fp, long long l);
|
||||
int fwriteBulkObject(FILE *fp, robj *obj);
|
||||
int fwriteBulkCount(FILE *fp, char prefix, int count);
|
||||
|
||||
/* Replication */
|
||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
||||
|
67
src/syncio.c
67
src/syncio.c
@ -99,70 +99,3 @@ int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
|
||||
}
|
||||
return nread;
|
||||
}
|
||||
|
||||
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
|
||||
|
||||
/* Write binary-safe string into a file in the bulkformat
|
||||
* $<count>\r\n<payload>\r\n */
|
||||
int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
|
||||
char cbuf[128];
|
||||
int clen;
|
||||
|
||||
cbuf[0] = '$';
|
||||
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
|
||||
cbuf[clen++] = '\r';
|
||||
cbuf[clen++] = '\n';
|
||||
if (fwrite(cbuf,clen,1,fp) == 0) return 0;
|
||||
if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
|
||||
if (fwrite("\r\n",2,1,fp) == 0) return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Write a multi bulk count in the form "*<count>\r\n" */
|
||||
int fwriteBulkCount(FILE *fp, char prefix, int count) {
|
||||
char cbuf[128];
|
||||
int clen;
|
||||
|
||||
cbuf[0] = prefix;
|
||||
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
|
||||
cbuf[clen++] = '\r';
|
||||
cbuf[clen++] = '\n';
|
||||
if (fwrite(cbuf,clen,1,fp) == 0) return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
|
||||
int fwriteBulkDouble(FILE *fp, double d) {
|
||||
char buf[128], dbuf[128];
|
||||
|
||||
snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
|
||||
snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
|
||||
if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
|
||||
if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
|
||||
int fwriteBulkLongLong(FILE *fp, long long l) {
|
||||
char bbuf[128], lbuf[128];
|
||||
unsigned int blen, llen;
|
||||
llen = ll2string(lbuf,32,l);
|
||||
blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
|
||||
if (fwrite(bbuf,blen,1,fp) == 0) return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Delegate writing an object to writing a bulk string or bulk long long. */
|
||||
int fwriteBulkObject(FILE *fp, robj *obj) {
|
||||
/* Avoid using getDecodedObject to help copy-on-write (we are often
|
||||
* in a child process when this function is called). */
|
||||
if (obj->encoding == REDIS_ENCODING_INT) {
|
||||
return fwriteBulkLongLong(fp,(long)obj->ptr);
|
||||
} else if (obj->encoding == REDIS_ENCODING_RAW) {
|
||||
return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
|
||||
} else {
|
||||
redisPanic("Unknown string encoding");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user