Nasty bug of the new DB format fixed, objects sharing implemented

This commit is contained in:
antirez 2009-03-25 21:00:48 +01:00
parent d9f650b990
commit 10c43610de
3 changed files with 95 additions and 31 deletions

1
TODO
View File

@ -8,5 +8,6 @@
- check 'server.dirty' everywere - check 'server.dirty' everywere
- replication automated tests - replication automated tests
- a command, or an external tool, to perform the MD5SUM of the whole dataset, so that if the dataset between two servers is identical, so will be the MD5SUM - a command, or an external tool, to perform the MD5SUM of the whole dataset, so that if the dataset between two servers is identical, so will be the MD5SUM
- objects sharing, "objectsharing yes", "objectsharingpool 1024"
* Include Lua and Perl bindings * Include Lua and Perl bindings

119
redis.c
View File

@ -95,10 +95,10 @@
* 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
* 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
* 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
* 11|000000 [64 bit integer] => if it's 11, a full 64 bit len will follow * 11|000000 reserved for future uses
* *
* 64 bit lengths are not used currently. Lenghts up to 63 are stored using * Lenghts up to 63 are stored using a single byte, most DB keys, and may
* a single byte, most DB keys, and may values, will fit inside. */ * values, will fit inside. */
#define REDIS_RDB_6BITLEN 0 #define REDIS_RDB_6BITLEN 0
#define REDIS_RDB_14BITLEN 1 #define REDIS_RDB_14BITLEN 1
#define REDIS_RDB_32BITLEN 2 #define REDIS_RDB_32BITLEN 2
@ -173,6 +173,8 @@ struct redisServer {
int port; int port;
int fd; int fd;
dict **dict; dict **dict;
dict *sharingpool;
unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */ long long dirty; /* changes to DB from the last save */
list *clients; list *clients;
list *slaves, *monitors; list *slaves, *monitors;
@ -199,6 +201,7 @@ struct redisServer {
char *logfile; char *logfile;
char *bindaddr; char *bindaddr;
char *dbfilename; char *dbfilename;
int shareobjects;
/* Replication related */ /* Replication related */
int isslave; int isslave;
char *masterhost; char *masterhost;
@ -258,6 +261,7 @@ static int rdbSaveBackground(char *filename);
static robj *createStringObject(char *ptr, size_t len); static robj *createStringObject(char *ptr, size_t len);
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void); static int syncWithMaster(void);
static robj *tryObjectSharing(robj *o);
static void pingCommand(redisClient *c); static void pingCommand(redisClient *c);
static void echoCommand(redisClient *c); static void echoCommand(redisClient *c);
@ -627,7 +631,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use",
listLength(server.clients)-listLength(server.slaves), listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves), listLength(server.slaves),
server.usedmemory); server.usedmemory,
dictGetHashTableUsed(server.sharingpool));
} }
/* Close connections of timedout clients */ /* Close connections of timedout clients */
@ -739,6 +744,7 @@ static void initServerConfig() {
server.daemonize = 0; server.daemonize = 0;
server.pidfile = "/var/run/redis.pid"; server.pidfile = "/var/run/redis.pid";
server.dbfilename = "dump.rdb"; server.dbfilename = "dump.rdb";
server.shareobjects = 0;
ResetServerSaveParams(); ResetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@ -765,6 +771,8 @@ static void initServer() {
createSharedObjects(); createSharedObjects();
server.el = aeCreateEventLoop(); server.el = aeCreateEventLoop();
server.dict = zmalloc(sizeof(dict*)*server.dbnum); server.dict = zmalloc(sizeof(dict*)*server.dbnum);
server.sharingpool = dictCreate(&setDictType,NULL);
server.sharingpoolsize = 1024;
if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
oom("server initialization"); /* Fatal OOM */ oom("server initialization"); /* Fatal OOM */
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
@ -772,11 +780,8 @@ static void initServer() {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
exit(1); exit(1);
} }
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++)
server.dict[j] = dictCreate(&hashDictType,NULL); server.dict[j] = dictCreate(&hashDictType,NULL);
if (!server.dict[j])
oom("dictCreate"); /* Fatal OOM */
}
server.cronloops = 0; server.cronloops = 0;
server.bgsaveinprogress = 0; server.bgsaveinprogress = 0;
server.lastsave = time(NULL); server.lastsave = time(NULL);
@ -895,6 +900,13 @@ static void loadServerConfig(char *filename) {
else { else {
err = "argument must be 'yes' or 'no'"; goto loaderr; err = "argument must be 'yes' or 'no'"; goto loaderr;
} }
} else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
sdstolower(argv[1]);
if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
else {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcmp(argv[0],"daemonize") && argc == 2) { } else if (!strcmp(argv[0],"daemonize") && argc == 2) {
sdstolower(argv[1]); sdstolower(argv[1]);
if (!strcmp(argv[1],"yes")) server.daemonize = 1; if (!strcmp(argv[1],"yes")) server.daemonize = 1;
@ -1102,6 +1114,12 @@ static int processCommand(redisClient *c) {
return 1; return 1;
} }
} }
/* Let's try to share objects on the command arguments vector */
if (server.shareobjects) {
int j;
for(j = 1; j < c->argc; j++)
c->argv[j] = tryObjectSharing(c->argv[j]);
}
/* Exec the command */ /* Exec the command */
dirty = server.dirty; dirty = server.dirty;
cmd->proc(c); cmd->proc(c);
@ -1412,6 +1430,51 @@ static void decrRefCount(void *obj) {
} }
} }
/* Try to share an object against the shared objects pool */
static robj *tryObjectSharing(robj *o) {
struct dictEntry *de;
unsigned long c;
if (server.shareobjects == 0) return o;
assert(o->type == REDIS_STRING);
de = dictFind(server.sharingpool,o);
if (de) {
robj *shared = dictGetEntryKey(de);
c = ((unsigned long) dictGetEntryVal(de))+1;
dictGetEntryVal(de) = (void*) c;
incrRefCount(shared);
decrRefCount(o);
return shared;
} else {
/* Here we are using a stream algorihtm: Every time an object is
* shared we increment its count, everytime there is a miss we
* recrement the counter of a random object. If this object reaches
* zero we remove the object and put the current object instead. */
if (dictGetHashTableUsed(server.sharingpool) >=
server.sharingpoolsize) {
de = dictGetRandomKey(server.sharingpool);
assert(de != NULL);
c = ((unsigned long) dictGetEntryVal(de))-1;
dictGetEntryVal(de) = (void*) c;
if (c == 0) {
dictDelete(server.sharingpool,de->key);
}
} else {
c = 0; /* If the pool is empty we want to add this object */
}
if (c == 0) {
int retval;
retval = dictAdd(server.sharingpool,o,(void*)1);
assert(retval == DICT_OK);
incrRefCount(o);
}
return o;
}
}
/*============================ DB saving/loading ============================ */ /*============================ DB saving/loading ============================ */
static int rdbSaveType(FILE *fp, unsigned char type) { static int rdbSaveType(FILE *fp, unsigned char type) {
@ -1424,16 +1487,16 @@ static int rdbSaveLen(FILE *fp, uint32_t len) {
if (len < (1<<6)) { if (len < (1<<6)) {
/* Save a 6 bit len */ /* Save a 6 bit len */
buf[0] = (len&0xFF)|REDIS_RDB_6BITLEN; buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1; if (fwrite(buf,1,1,fp) == 0) return -1;
} else if (len < (1<<14)) { } else if (len < (1<<14)) {
/* Save a 14 bit len */ /* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|REDIS_RDB_14BITLEN; buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
buf[1] = len&0xFF; buf[1] = len&0xFF;
if (fwrite(buf,4,1,fp) == 0) return -1; if (fwrite(buf,4,1,fp) == 0) return -1;
} else { } else {
/* Save a 32 bit len */ /* Save a 32 bit len */
buf[0] = REDIS_RDB_32BITLEN; buf[0] = (REDIS_RDB_32BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1; if (fwrite(buf,1,1,fp) == 0) return -1;
len = htonl(len); len = htonl(len);
if (fwrite(&len,4,1,fp) == 0) return -1; if (fwrite(&len,4,1,fp) == 0) return -1;
@ -1441,6 +1504,14 @@ static int rdbSaveLen(FILE *fp, uint32_t len) {
return 0; return 0;
} }
static int rdbSaveStringObject(FILE *fp, robj *obj) {
size_t len = sdslen(obj->ptr);
if (rdbSaveLen(fp,len) == -1) return -1;
if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
return 0;
}
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
static int rdbSave(char *filename) { static int rdbSave(char *filename) {
dictIterator *di = NULL; dictIterator *di = NULL;
@ -1475,15 +1546,10 @@ static int rdbSave(char *filename) {
robj *o = dictGetEntryVal(de); robj *o = dictGetEntryVal(de);
if (rdbSaveType(fp,o->type) == -1) goto werr; if (rdbSaveType(fp,o->type) == -1) goto werr;
if (rdbSaveLen(fp,sdslen(key->ptr)) == -1) goto werr; if (rdbSaveStringObject(fp,key) == -1) goto werr;
if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr;
if (o->type == REDIS_STRING) { if (o->type == REDIS_STRING) {
/* Save a string value */ /* Save a string value */
sds sval = o->ptr; if (rdbSaveStringObject(fp,o) == -1) goto werr;
if (rdbSaveLen(fp,sdslen(sval)) == -1) goto werr;
if (sdslen(sval) &&
fwrite(sval,sdslen(sval),1,fp) == 0) goto werr;
} else if (o->type == REDIS_LIST) { } else if (o->type == REDIS_LIST) {
/* Save a list value */ /* Save a list value */
list *list = o->ptr; list *list = o->ptr;
@ -1493,10 +1559,7 @@ static int rdbSave(char *filename) {
while(ln) { while(ln) {
robj *eleobj = listNodeValue(ln); robj *eleobj = listNodeValue(ln);
if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr; if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
if (sdslen(eleobj->ptr) &&
fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
goto werr;
ln = ln->next; ln = ln->next;
} }
} else if (o->type == REDIS_SET) { } else if (o->type == REDIS_SET) {
@ -1508,13 +1571,9 @@ static int rdbSave(char *filename) {
if (!set) oom("dictGetIteraotr"); if (!set) oom("dictGetIteraotr");
if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr; if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr;
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
robj *eleobj; robj *eleobj = dictGetEntryKey(de);
eleobj = dictGetEntryKey(de); if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr;
if (sdslen(eleobj->ptr) &&
fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
goto werr;
} }
dictReleaseIterator(di); dictReleaseIterator(di);
} else { } else {
@ -1600,7 +1659,6 @@ static uint32_t rdbLoadLen(FILE *fp, int rdbver) {
return ntohl(len); return ntohl(len);
} }
} }
return 0;
} }
static robj *rdbLoadStringObject(FILE*fp,int rdbver) { static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
@ -1613,7 +1671,7 @@ static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
sdsfree(val); sdsfree(val);
return NULL; return NULL;
} }
return createObject(REDIS_STRING,val); return tryObjectSharing(createObject(REDIS_STRING,val));
} }
static int rdbLoad(char *filename) { static int rdbLoad(char *filename) {
@ -1625,7 +1683,6 @@ static int rdbLoad(char *filename) {
dict *d = server.dict[0]; dict *d = server.dict[0];
char buf[1024]; char buf[1024];
int rdbver; int rdbver;
fp = fopen(filename,"r"); fp = fopen(filename,"r");
if (!fp) return REDIS_ERR; if (!fp) return REDIS_ERR;
if (fread(buf,9,1,fp) == 0) goto eoferr; if (fread(buf,9,1,fp) == 0) goto eoferr;

View File

@ -68,3 +68,9 @@ databases 16
# single TCP packet. Uses a bit more CPU but most of the times it is a win # single TCP packet. Uses a bit more CPU but most of the times it is a win
# in terms of number of queries per second. Use 'yes' if unsure. # in terms of number of queries per second. Use 'yes' if unsure.
glueoutputbuf yes glueoutputbuf yes
# Use object sharing. Can save a lot of memory if you have many common
# string in your dataset, but performs lookups against the shared objects
# pool so it uses more CPU and can be a bit slower. Usually it's a good
# idea.
shareobjects no