Lists AOF rewrite using variadic RPUSH (work in progress)
This commit is contained in:
parent
43093dff2d
commit
5b25009656
94
src/aof.c
94
src/aof.c
@ -422,8 +422,62 @@ int rioWriteBulkObject(rio *r, robj *obj) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Emit the commands needed to rebuild a list object.
|
||||||
|
* The function returns 0 on error, 1 on success. */
|
||||||
|
int rewriteListObject(rio *r, robj *key, robj *o) {
|
||||||
|
long long count = 0, items = listTypeLength(o);
|
||||||
|
|
||||||
|
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||||
|
unsigned char *zl = o->ptr;
|
||||||
|
unsigned char *p = ziplistIndex(zl,0);
|
||||||
|
unsigned char *vstr;
|
||||||
|
unsigned int vlen;
|
||||||
|
long long vlong;
|
||||||
|
|
||||||
|
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
|
||||||
|
if (count == 0) {
|
||||||
|
int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ?
|
||||||
|
REDIS_AOFREWRITE_ITEMS_PER_CMD : items;
|
||||||
|
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
|
||||||
|
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
|
||||||
|
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||||
|
}
|
||||||
|
if (vstr) {
|
||||||
|
if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
|
||||||
|
} else {
|
||||||
|
if (rioWriteBulkLongLong(r,vlong) == 0) return 0;
|
||||||
|
}
|
||||||
|
p = ziplistNext(zl,p);
|
||||||
|
if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0;
|
||||||
|
items--;
|
||||||
|
}
|
||||||
|
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
||||||
|
list *list = o->ptr;
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
|
||||||
|
listRewind(list,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
robj *eleobj = listNodeValue(ln);
|
||||||
|
|
||||||
|
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
|
||||||
|
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
|
||||||
|
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||||
|
if (rioWriteBulkObject(r,eleobj) == 0) return 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
redisPanic("Unknown list encoding");
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Write a sequence of commands able to fully rebuild the dataset into
|
/* Write a sequence of commands able to fully rebuild the dataset into
|
||||||
* "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
|
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
|
||||||
|
*
|
||||||
|
* In order to minimize the number of commands needed in the rewritten
|
||||||
|
* log Redis uses variadic commands when possible, such as RPUSH, SADD
|
||||||
|
* and ZADD. However at max REDIS_AOFREWRITE_ITEMS_PER_CMD items per time
|
||||||
|
* are inserted using a single command. */
|
||||||
int rewriteAppendOnlyFile(char *filename) {
|
int rewriteAppendOnlyFile(char *filename) {
|
||||||
dictIterator *di = NULL;
|
dictIterator *di = NULL;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -479,43 +533,7 @@ int rewriteAppendOnlyFile(char *filename) {
|
|||||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||||
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
|
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
|
||||||
} else if (o->type == REDIS_LIST) {
|
} else if (o->type == REDIS_LIST) {
|
||||||
/* Emit the RPUSHes needed to rebuild the list */
|
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
|
||||||
char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
|
|
||||||
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
||||||
unsigned char *zl = o->ptr;
|
|
||||||
unsigned char *p = ziplistIndex(zl,0);
|
|
||||||
unsigned char *vstr;
|
|
||||||
unsigned int vlen;
|
|
||||||
long long vlong;
|
|
||||||
|
|
||||||
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
|
|
||||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
|
||||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
|
||||||
if (vstr) {
|
|
||||||
if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
|
|
||||||
goto werr;
|
|
||||||
} else {
|
|
||||||
if (rioWriteBulkLongLong(&aof,vlong) == 0)
|
|
||||||
goto werr;
|
|
||||||
}
|
|
||||||
p = ziplistNext(zl,p);
|
|
||||||
}
|
|
||||||
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
||||||
list *list = o->ptr;
|
|
||||||
listNode *ln;
|
|
||||||
listIter li;
|
|
||||||
|
|
||||||
listRewind(list,&li);
|
|
||||||
while((ln = listNext(&li))) {
|
|
||||||
robj *eleobj = listNodeValue(ln);
|
|
||||||
|
|
||||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
|
||||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
|
||||||
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
redisPanic("Unknown list encoding");
|
|
||||||
}
|
|
||||||
} else if (o->type == REDIS_SET) {
|
} else if (o->type == REDIS_SET) {
|
||||||
char cmd[]="*3\r\n$4\r\nSADD\r\n";
|
char cmd[]="*3\r\n$4\r\nSADD\r\n";
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@
|
|||||||
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
|
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
|
||||||
#define REDIS_AUTO_AOFREWRITE_PERC 100
|
#define REDIS_AUTO_AOFREWRITE_PERC 100
|
||||||
#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
|
#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
|
||||||
|
#define REDIS_AOFREWRITE_ITEMS_PER_CMD 64
|
||||||
#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
|
#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
|
||||||
#define REDIS_SLOWLOG_MAX_LEN 64
|
#define REDIS_SLOWLOG_MAX_LEN 64
|
||||||
#define REDIS_MAX_CLIENTS 10000
|
#define REDIS_MAX_CLIENTS 10000
|
||||||
|
Loading…
x
Reference in New Issue
Block a user