Make main headers C++ safe, and change rdb to use file descriptor instead of FILE pointer

Former-commit-id: 3c9dd6ffc254d089e4208ad39da7338b6fb0fba7
This commit is contained in:
John Sully 2019-02-05 23:36:40 -05:00
parent 2c3777f1ee
commit 01a552e651
38 changed files with 282 additions and 222 deletions

View File

@ -1,5 +1,6 @@
{ {
"files.associations": { "files.associations": {
"zmalloc.h": "c" "zmalloc.h": "c",
"stat.h": "c"
} }
} }

View File

@ -21,12 +21,13 @@ NODEPS:=clean distclean
# Default settings # Default settings
STD=-std=c99 -pedantic -DREDIS_STATIC='' STD=-std=c99 -pedantic -DREDIS_STATIC=''
CXX_STD=-std=c++14 -pedantic
ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S))) ifneq (,$(findstring FreeBSD,$(uname_S)))
STD+=-Wno-c11-extensions STD+=-Wno-c11-extensions
endif endif
endif endif
WARN=-Wall -W -Wno-missing-field-initializers WARN=-Wall -Werror -W -Wno-missing-field-initializers
OPT=$(OPTIMIZATION) OPT=$(OPTIMIZATION)
PREFIX?=/usr/local PREFIX?=/usr/local
@ -73,6 +74,7 @@ endif
-include .make-settings -include .make-settings
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
FINAL_LIBS=-lm FINAL_LIBS=-lm
DEBUG=-g -ggdb DEBUG=-g -ggdb
@ -128,6 +130,7 @@ endif
endif endif
# Include paths to dependencies # Include paths to dependencies
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src
ifeq ($(MALLOC),tcmalloc) ifeq ($(MALLOC),tcmalloc)
FINAL_CFLAGS+= -DUSE_TCMALLOC FINAL_CFLAGS+= -DUSE_TCMALLOC
@ -152,7 +155,8 @@ ifeq ($(MALLOC),memkind)
endif endif
REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS) REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS)
REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS)
REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
CCCOLOR="\033[34m" CCCOLOR="\033[34m"
@ -170,7 +174,7 @@ endif
REDIS_SERVER_NAME=redis-server REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel REDIS_SENTINEL_NAME=redis-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o storage.o REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o acl.o storage.o rdb-s3.o
REDIS_CLI_NAME=redis-cli REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o
REDIS_BENCHMARK_NAME=redis-benchmark REDIS_BENCHMARK_NAME=redis-benchmark
@ -253,6 +257,9 @@ dict-benchmark: dict.c zmalloc.c sds.c siphash.c
%.o: %.c .make-prerequisites %.o: %.c .make-prerequisites
$(REDIS_CC) -c $< $(REDIS_CC) -c $<
%.o: %.cpp .make-prerequisites
$(REDIS_CXX) -c $<
clean: clean:
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark

View File

@ -580,7 +580,7 @@ user *ACLGetUserByName(const char *name, size_t namelen) {
* command, the second if the command is denied because the user is trying * command, the second if the command is denied because the user is trying
* to access keys that are not among the specified patterns. */ * to access keys that are not among the specified patterns. */
int ACLCheckCommandPerm(client *c) { int ACLCheckCommandPerm(client *c) {
user *u = c->user; user *u = c->puser;
uint64_t id = c->cmd->id; uint64_t id = c->cmd->id;
/* If there is no associated user, the connection can run anything. */ /* If there is no associated user, the connection can run anything. */
@ -615,7 +615,7 @@ int ACLCheckCommandPerm(client *c) {
/* Check if the user can execute commands explicitly touching the keys /* Check if the user can execute commands explicitly touching the keys
* mentioned in the command arguments. */ * mentioned in the command arguments. */
if (!(c->user->flags & USER_FLAG_ALLKEYS) && if (!(c->puser->flags & USER_FLAG_ALLKEYS) &&
(c->cmd->getkeys_proc || c->cmd->firstkey)) (c->cmd->getkeys_proc || c->cmd->firstkey))
{ {
int numkeys; int numkeys;
@ -684,8 +684,8 @@ void aclCommand(client *c) {
} }
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(sub,"whoami")) { } else if (!strcasecmp(sub,"whoami")) {
if (c->user != NULL) { if (c->puser != NULL) {
addReplyBulkCBuffer(c,c->user->name,sdslen(c->user->name)); addReplyBulkCBuffer(c,c->puser->name,sdslen(c->puser->name));
} else { } else {
addReplyNull(c); addReplyNull(c);
} }

View File

@ -648,7 +648,7 @@ struct client *createFakeClient(void) {
c->watched_keys = listCreate(); c->watched_keys = listCreate();
c->peerid = NULL; c->peerid = NULL;
c->resp = 2; c->resp = 2;
c->user = NULL; c->puser = NULL;
listSetFreeMethod(c->reply,freeClientReplyValue); listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c); initClientMultiState(c);
@ -717,7 +717,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr; if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fileno(fp));
if (rdbLoadRio(&rdb,NULL,1) != C_OK) { if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr; goto readerr;
@ -1017,7 +1017,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
} }
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) { } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr; zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict); dictIterator *di = dictGetIterator(zs->pdict);
dictEntry *de; dictEntry *de;
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
@ -1272,7 +1272,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j; redisDb *db = server.db+j;
dict *d = db->dict; dict *d = db->pdict;
if (dictSize(d) == 0) continue; if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d); di = dictGetSafeIterator(d);
@ -1361,7 +1361,7 @@ int rewriteAppendOnlyFile(char *filename) {
} }
server.aof_child_diff = sdsempty(); server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp); rioInitWithFile(&aof,fileno(fp));
if (server.aof_rewrite_incremental_fsync) if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

View File

@ -3888,7 +3888,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */ /* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) { for (j = 1; j < server.dbnum; j++) {
if (dictSize(server.db[j].dict)) return C_ERR; if (dictSize(server.db[j].pdict)) return C_ERR;
} }
/* Check that all the slots we see populated memory have a corresponding /* Check that all the slots we see populated memory have a corresponding
@ -4264,7 +4264,7 @@ NULL
clusterReplyMultiBulkSlots(c); clusterReplyMultiBulkSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */ /* CLUSTER FLUSHSLOTS */
if (dictSize(server.db[0].dict) != 0) { if (dictSize(server.db[0].pdict) != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return; return;
} }
@ -4599,7 +4599,7 @@ NULL
* slots nor keys to accept to replicate some other node. * slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */ * Slaves can switch to another master without issues. */
if (nodeIsMaster(myself) && if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) { (myself->numslots != 0 || dictSize(server.db[0].pdict) != 0)) {
addReplyError(c, addReplyError(c,
"To set a master the node must be empty and " "To set a master the node must be empty and "
"without assigned slots."); "without assigned slots.");
@ -4756,7 +4756,7 @@ NULL
/* Slaves can be reset while containing data, but not master nodes /* Slaves can be reset while containing data, but not master nodes
* that must be empty. */ * that must be empty. */
if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) { if (nodeIsMaster(myself) && dictSize(c->db->pdict) != 0) {
addReplyError(c,"CLUSTER RESET can't be called with " addReplyError(c,"CLUSTER RESET can't be called with "
"master nodes containing keys"); "master nodes containing keys");
return; return;

View File

@ -53,7 +53,7 @@ void updateLFU(robj *val) {
* implementations that should instead rely on lookupKeyRead(), * implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */ * lookupKeyWrite() and lookupKeyReadWithFlags(). */
robj *lookupKey(redisDb *db, robj *key, int flags) { robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr); dictEntry *de = dictFind(db->pdict,key->ptr);
if (de) { if (de) {
robj *val = dictGetVal(de); robj *val = dictGetVal(de);
@ -172,7 +172,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* The program is aborted if the key already exists. */ * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) { void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr); sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val); int retval = dictAdd(db->pdict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK); serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST || if (val->type == OBJ_LIST ||
@ -187,7 +187,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
* *
* The program is aborted if the key was not already present. */ * The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) { void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->dict,key->ptr); dictEntry *de = dictFind(db->pdict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL); serverAssertWithInfo(NULL,key,de != NULL);
dictEntry auxentry = *de; dictEntry auxentry = *de;
@ -195,14 +195,14 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru; val->lru = old->lru;
} }
dictSetVal(db->dict, de, val); dictSetVal(db->pdict, de, val);
if (server.lazyfree_lazy_server_del) { if (server.lazyfree_lazy_server_del) {
freeObjAsync(old); freeObjAsync(old);
dictSetVal(db->dict, &auxentry, NULL); dictSetVal(db->pdict, &auxentry, NULL);
} }
dictFreeVal(db->dict, &auxentry); dictFreeVal(db->pdict, &auxentry);
} }
/* High level Set operation. This function can be used in order to set /* High level Set operation. This function can be used in order to set
@ -225,7 +225,7 @@ void setKey(redisDb *db, robj *key, robj *val) {
} }
int dbExists(redisDb *db, robj *key) { int dbExists(redisDb *db, robj *key) {
return dictFind(db->dict,key->ptr) != NULL; return dictFind(db->pdict,key->ptr) != NULL;
} }
/* Return a random key, in form of a Redis object. /* Return a random key, in form of a Redis object.
@ -235,13 +235,13 @@ int dbExists(redisDb *db, robj *key) {
robj *dbRandomKey(redisDb *db) { robj *dbRandomKey(redisDb *db) {
dictEntry *de; dictEntry *de;
int maxtries = 100; int maxtries = 100;
int allvolatile = dictSize(db->dict) == dictSize(db->expires); int allvolatile = dictSize(db->pdict) == dictSize(db->expires);
while(1) { while(1) {
sds key; sds key;
robj *keyobj; robj *keyobj;
de = dictGetRandomKey(db->dict); de = dictGetRandomKey(db->pdict);
if (de == NULL) return NULL; if (de == NULL) return NULL;
key = dictGetKey(de); key = dictGetKey(de);
@ -272,7 +272,7 @@ int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of /* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */ * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (dictDelete(db->pdict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key); if (server.cluster_enabled) slotToKeyDel(key);
return 1; return 1;
} else { } else {
@ -357,11 +357,11 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
} }
for (int j = startdb; j <= enddb; j++) { for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict); removed += dictSize(server.db[j].pdict);
if (async) { if (async) {
emptyDbAsync(&server.db[j]); emptyDbAsync(&server.db[j]);
} else { } else {
dictEmpty(server.db[j].dict,callback); dictEmpty(server.db[j].pdict,callback);
dictEmpty(server.db[j].expires,callback); dictEmpty(server.db[j].expires,callback);
} }
} }
@ -538,7 +538,7 @@ void keysCommand(client *c) {
unsigned long numkeys = 0; unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c); void *replylen = addReplyDeferredLen(c);
di = dictGetSafeIterator(c->db->dict); di = dictGetSafeIterator(c->db->pdict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0'); allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de); sds key = dictGetKey(de);
@ -677,7 +677,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
/* Handle the case of a hash table. */ /* Handle the case of a hash table. */
ht = NULL; ht = NULL;
if (o == NULL) { if (o == NULL) {
ht = c->db->dict; ht = c->db->pdict;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr; ht = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
@ -685,7 +685,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
count *= 2; /* We return key / value for this type. */ count *= 2; /* We return key / value for this type. */
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr; zset *zs = o->ptr;
ht = zs->dict; ht = zs->pdict;
count *= 2; /* We return key / value for this type. */ count *= 2; /* We return key / value for this type. */
} }
@ -803,7 +803,7 @@ void scanCommand(client *c) {
} }
void dbsizeCommand(client *c) { void dbsizeCommand(client *c) {
addReplyLongLong(c,dictSize(c->db->dict)); addReplyLongLong(c,dictSize(c->db->pdict));
} }
void lastsaveCommand(client *c) { void lastsaveCommand(client *c) {
@ -1004,11 +1004,11 @@ int dbSwapDatabases(int id1, int id2) {
/* Swap hash tables. Note that we don't swap blocking_keys, /* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to * ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */ * remain in the same DB they were. */
db1->dict = db2->dict; db1->pdict = db2->pdict;
db1->expires = db2->expires; db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl; db1->avg_ttl = db2->avg_ttl;
db2->dict = aux.dict; db2->pdict = aux.pdict;
db2->expires = aux.expires; db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl; db2->avg_ttl = aux.avg_ttl;
@ -1062,7 +1062,7 @@ void swapdbCommand(client *c) {
int removeExpire(redisDb *db, robj *key) { int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the /* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */ * main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); serverAssertWithInfo(NULL,key,dictFind(db->pdict,key->ptr) != NULL);
return dictDelete(db->expires,key->ptr) == DICT_OK; return dictDelete(db->expires,key->ptr) == DICT_OK;
} }
@ -1074,7 +1074,7 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de; dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr); kde = dictFind(db->pdict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL); serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde)); de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when); dictSetSignedIntegerVal(de,when);
@ -1095,7 +1095,7 @@ long long getExpire(redisDb *db, robj *key) {
/* The entry was found in the expire dict, this means it should also /* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */ * be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); serverAssertWithInfo(NULL,key,dictFind(db->pdict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de); return dictGetSignedIntegerVal(de);
} }

View File

@ -179,7 +179,7 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
} }
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) { } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr; zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict); dictIterator *di = dictGetIterator(zs->pdict);
dictEntry *de; dictEntry *de;
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
@ -267,8 +267,8 @@ void computeDatasetDigest(unsigned char *final) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j; redisDb *db = server.db+j;
if (dictSize(db->dict) == 0) continue; if (dictSize(db->pdict) == 0) continue;
di = dictGetSafeIterator(db->dict); di = dictGetSafeIterator(db->pdict);
/* hash the DB id, so the same dataset moved in a different /* hash the DB id, so the same dataset moved in a different
* DB will lead to a different digest */ * DB will lead to a different digest */
@ -388,7 +388,7 @@ NULL
robj *val; robj *val;
char *strenc; char *strenc;
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { if ((de = dictFind(c->db->pdict,c->argv[2]->ptr)) == NULL) {
addReply(c,shared.nokeyerr); addReply(c,shared.nokeyerr);
return; return;
} }
@ -440,7 +440,7 @@ NULL
robj *val; robj *val;
sds key; sds key;
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { if ((de = dictFind(c->db->pdict,c->argv[2]->ptr)) == NULL) {
addReply(c,shared.nokeyerr); addReply(c,shared.nokeyerr);
return; return;
} }
@ -480,7 +480,7 @@ NULL
if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK) if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK)
return; return;
dictExpand(c->db->dict,keys); dictExpand(c->db->pdict,keys);
for (j = 0; j < keys; j++) { for (j = 0; j < keys; j++) {
long valsize = 0; long valsize = 0;
snprintf(buf,sizeof(buf),"%s:%lu", snprintf(buf,sizeof(buf),"%s:%lu",
@ -631,7 +631,7 @@ NULL
} }
stats = sdscatprintf(stats,"[Dictionary HT]\n"); stats = sdscatprintf(stats,"[Dictionary HT]\n");
dictGetStats(buf,sizeof(buf),server.db[dbid].dict); dictGetStats(buf,sizeof(buf),server.db[dbid].pdict);
stats = sdscat(stats,buf); stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires HT]\n"); stats = sdscatprintf(stats,"[Expires HT]\n");
@ -651,7 +651,7 @@ NULL
case OBJ_ENCODING_SKIPLIST: case OBJ_ENCODING_SKIPLIST:
{ {
zset *zs = o->ptr; zset *zs = o->ptr;
ht = zs->dict; ht = zs->pdict;
} }
break; break;
case OBJ_ENCODING_HT: case OBJ_ENCODING_HT:
@ -1192,7 +1192,7 @@ void logCurrentClient(void) {
dictEntry *de; dictEntry *de;
key = getDecodedObject(cc->argv[1]); key = getDecodedObject(cc->argv[1]);
de = dictFind(cc->db->dict, key->ptr); de = dictFind(cc->db->pdict, key->ptr);
if (de) { if (de) {
val = dictGetVal(de); val = dictGetVal(de);
serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr); serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr);

View File

@ -256,7 +256,7 @@ long activeDefragZsetEntry(zset *zs, dictEntry *de) {
defragged++, de->key = newsds; defragged++, de->key = newsds;
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
if (newscore) { if (newscore) {
dictSetVal(zs->dict, de, newscore); dictSetVal(zs->pdict, de, newscore);
defragged++; defragged++;
} }
return defragged; return defragged;
@ -464,7 +464,7 @@ long scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
return 0; return 0;
zset *zs = (zset*)ob->ptr; zset *zs = (zset*)ob->ptr;
dict *d = zs->dict; dict *d = zs->pdict;
scanLaterZsetData data = {zs, 0}; scanLaterZsetData data = {zs, 0};
*cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
return data.defragged; return data.defragged;
@ -539,20 +539,20 @@ long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
defragged++, zs->zsl = newzsl; defragged++, zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header))) if ((newheader = activeDefragAlloc(zs->zsl->header)))
defragged++, zs->zsl->header = newheader; defragged++, zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) if (dictSize(zs->pdict) > server.active_defrag_max_scan_fields)
defragLater(db, kde); defragLater(db, kde);
else { else {
dictIterator *di = dictGetIterator(zs->dict); dictIterator *di = dictGetIterator(zs->pdict);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
defragged += activeDefragZsetEntry(zs, de); defragged += activeDefragZsetEntry(zs, de);
} }
dictReleaseIterator(di); dictReleaseIterator(di);
} }
/* handle the dict struct */ /* handle the dict struct */
if ((newdict = activeDefragAlloc(zs->dict))) if ((newdict = activeDefragAlloc(zs->pdict)))
defragged++, zs->dict = newdict; defragged++, zs->pdict = newdict;
/* defrag the dict tables */ /* defrag the dict tables */
defragged += dictDefragTables(zs->dict); defragged += dictDefragTables(zs->pdict);
return defragged; return defragged;
} }
@ -775,7 +775,7 @@ long defragKey(redisDb *db, dictEntry *de) {
/* Dirty code: /* Dirty code:
* I can't search in db->expires for that key after i already released * I can't search in db->expires for that key after i already released
* the pointer it holds it won't be able to do the string compare */ * the pointer it holds it won't be able to do the string compare */
uint64_t hash = dictGetHash(db->dict, de->key); uint64_t hash = dictGetHash(db->pdict, de->key);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
} }
@ -953,7 +953,7 @@ int defragLaterStep(redisDb *db, long long endtime) {
} }
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
dictEntry *de = dictFind(db->dict, current_key); dictEntry *de = dictFind(db->pdict, current_key);
key_defragged = server.stat_active_defrag_hits; key_defragged = server.stat_active_defrag_hits;
do { do {
int quit = 0; int quit = 0;
@ -1106,7 +1106,7 @@ void activeDefragCycle(void) {
break; /* this will exit the function and we'll continue on the next cycle */ break; /* this will exit the function and we'll continue on the next cycle */
} }
cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); cursor = dictScan(db->pdict, cursor, defragScanCallback, defragDictBucketCallback, db);
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehasing), * (if we have a lot of pointers in one hash bucket or rehasing),

View File

@ -489,9 +489,9 @@ int freeMemoryIfNeeded(void) {
for (i = 0; i < server.dbnum; i++) { for (i = 0; i < server.dbnum; i++) {
db = server.db+i; db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires; db->pdict : db->expires;
if ((keys = dictSize(dict)) != 0) { if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool); evictionPoolPopulate(i, dict, db->pdict, pool);
total_keys += keys; total_keys += keys;
} }
} }
@ -503,7 +503,7 @@ int freeMemoryIfNeeded(void) {
bestdbid = pool[k].dbid; bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict, de = dictFind(server.db[pool[k].dbid].pdict,
pool[k].key); pool[k].key);
} else { } else {
de = dictFind(server.db[pool[k].dbid].expires, de = dictFind(server.db[pool[k].dbid].expires,
@ -539,7 +539,7 @@ int freeMemoryIfNeeded(void) {
j = (++next_db) % server.dbnum; j = (++next_db) % server.dbnum;
db = server.db+j; db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires; db->pdict : db->expires;
if (dictSize(dict) != 0) { if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict); de = dictGetRandomKey(dict);
bestkey = dictGetKey(de); bestkey = dictGetKey(de);

View File

@ -33,9 +33,11 @@
#define _BSD_SOURCE #define _BSD_SOURCE
#if defined(__linux__) #if defined(__linux__)
#ifndef __cplusplus
#define _GNU_SOURCE #define _GNU_SOURCE
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#endif #endif
#endif
#if defined(_AIX) #if defined(_AIX)
#define _ALL_SOURCE #define _ALL_SOURCE
@ -55,7 +57,9 @@
#define _POSIX_C_SOURCE 199506L #define _POSIX_C_SOURCE 199506L
#endif #endif
#ifndef _LARGEFILE_SOURCE
#define _LARGEFILE_SOURCE #define _LARGEFILE_SOURCE
#endif
#define _FILE_OFFSET_BITS 64 #define _FILE_OFFSET_BITS 64
#endif #endif

View File

@ -651,7 +651,7 @@ void georadiusGeneric(client *c, int flags) {
if (maxelelen < elelen) maxelelen = elelen; if (maxelelen < elelen) maxelelen = elelen;
znode = zslInsert(zs->zsl,score,gp->member); znode = zslInsert(zs->zsl,score,gp->member);
serverAssert(dictAdd(zs->dict,gp->member,&znode->score) == DICT_OK); serverAssert(dictAdd(zs->pdict,gp->member,&znode->score) == DICT_OK);
gp->member = NULL; gp->member = NULL;
} }

View File

@ -32,10 +32,16 @@
#define __INTSET_H #define __INTSET_H
#include <stdint.h> #include <stdint.h>
#ifdef __cplusplus
#define ZERO_LENGTH_ARRAY_LENGTH 1
#else
#define ZERO_LENGTH_ARRAY_LENGTH
#endif
typedef struct intset { typedef struct intset {
uint32_t encoding; uint32_t encoding;
uint32_t length; uint32_t length;
int8_t contents[]; int8_t contents[ZERO_LENGTH_ARRAY_LENGTH];
} intset; } intset;
intset *intsetNew(void); intset *intsetNew(void);

View File

@ -59,7 +59,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* If the value is composed of a few allocations, to free in a lazy way /* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free * is actually just slower... So under a certain limit we just free
* the object synchronously. */ * the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr); dictEntry *de = dictUnlink(db->pdict,key->ptr);
if (de) { if (de) {
robj *val = dictGetVal(de); robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val); size_t free_effort = lazyfreeGetFreeEffort(val);
@ -75,14 +75,14 @@ int dbAsyncDelete(redisDb *db, robj *key) {
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1); atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL); dictSetVal(db->pdict,de,NULL);
} }
} }
/* Release the key-val pair, or just the key if we set the val /* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */ * field to NULL in order to lazy free it later. */
if (de) { if (de) {
dictFreeUnlinkedEntry(db->dict,de); dictFreeUnlinkedEntry(db->pdict,de);
if (server.cluster_enabled) slotToKeyDel(key); if (server.cluster_enabled) slotToKeyDel(key);
return 1; return 1;
} else { } else {
@ -105,8 +105,8 @@ void freeObjAsync(robj *o) {
* create a new empty set of hash tables and scheduling the old ones for * create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */ * lazy freeing. */
void emptyDbAsync(redisDb *db) { void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires; dict *oldht1 = db->pdict, *oldht2 = db->expires;
db->dict = dictCreate(&dbDictType,NULL); db->pdict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL); db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);

View File

@ -3074,11 +3074,11 @@ void moduleRDBLoadError(RedisModuleIO *io) {
void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) { void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
if (io->error) return; if (io->error) return;
/* Save opcode. */ /* Save opcode. */
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT); int retval = rdbSaveLen(io->prio, RDB_MODULE_OPCODE_UINT);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
/* Save value. */ /* Save value. */
retval = rdbSaveLen(io->rio, value); retval = rdbSaveLen(io->prio, value);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
return; return;
@ -3092,11 +3092,11 @@ saveerr:
* new data types. */ * new data types. */
uint64_t RM_LoadUnsigned(RedisModuleIO *io) { uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
if (io->ver == 2) { if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL); uint64_t opcode = rdbLoadLen(io->prio,NULL);
if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr; if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
} }
uint64_t value; uint64_t value;
int retval = rdbLoadLenByRef(io->rio, NULL, &value); int retval = rdbLoadLenByRef(io->prio, NULL, &value);
if (retval == -1) goto loaderr; if (retval == -1) goto loaderr;
return value; return value;
@ -3128,11 +3128,11 @@ int64_t RM_LoadSigned(RedisModuleIO *io) {
void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) { void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
if (io->error) return; if (io->error) return;
/* Save opcode. */ /* Save opcode. */
ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); ssize_t retval = rdbSaveLen(io->prio, RDB_MODULE_OPCODE_STRING);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
/* Save value. */ /* Save value. */
retval = rdbSaveStringObject(io->rio, s); retval = rdbSaveStringObject(io->prio, s);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
return; return;
@ -3146,11 +3146,11 @@ saveerr:
void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) { void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
if (io->error) return; if (io->error) return;
/* Save opcode. */ /* Save opcode. */
ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); ssize_t retval = rdbSaveLen(io->prio, RDB_MODULE_OPCODE_STRING);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
/* Save value. */ /* Save value. */
retval = rdbSaveRawString(io->rio, (unsigned char*)str,len); retval = rdbSaveRawString(io->prio, (unsigned char*)str,len);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
return; return;
@ -3162,10 +3162,10 @@ saveerr:
/* Implements RM_LoadString() and RM_LoadStringBuffer() */ /* Implements RM_LoadString() and RM_LoadStringBuffer() */
void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
if (io->ver == 2) { if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL); uint64_t opcode = rdbLoadLen(io->prio,NULL);
if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr; if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
} }
void *s = rdbGenericLoadStringObject(io->rio, void *s = rdbGenericLoadStringObject(io->prio,
plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr); plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
if (s == NULL) goto loaderr; if (s == NULL) goto loaderr;
return s; return s;
@ -3205,11 +3205,11 @@ char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
void RM_SaveDouble(RedisModuleIO *io, double value) { void RM_SaveDouble(RedisModuleIO *io, double value) {
if (io->error) return; if (io->error) return;
/* Save opcode. */ /* Save opcode. */
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE); int retval = rdbSaveLen(io->prio, RDB_MODULE_OPCODE_DOUBLE);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
/* Save value. */ /* Save value. */
retval = rdbSaveBinaryDoubleValue(io->rio, value); retval = rdbSaveBinaryDoubleValue(io->prio, value);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
return; return;
@ -3222,11 +3222,11 @@ saveerr:
* double value saved by RedisModule_SaveDouble(). */ * double value saved by RedisModule_SaveDouble(). */
double RM_LoadDouble(RedisModuleIO *io) { double RM_LoadDouble(RedisModuleIO *io) {
if (io->ver == 2) { if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL); uint64_t opcode = rdbLoadLen(io->prio,NULL);
if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr; if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
} }
double value; double value;
int retval = rdbLoadBinaryDoubleValue(io->rio, &value); int retval = rdbLoadBinaryDoubleValue(io->prio, &value);
if (retval == -1) goto loaderr; if (retval == -1) goto loaderr;
return value; return value;
@ -3241,11 +3241,11 @@ loaderr:
void RM_SaveFloat(RedisModuleIO *io, float value) { void RM_SaveFloat(RedisModuleIO *io, float value) {
if (io->error) return; if (io->error) return;
/* Save opcode. */ /* Save opcode. */
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT); int retval = rdbSaveLen(io->prio, RDB_MODULE_OPCODE_FLOAT);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
/* Save value. */ /* Save value. */
retval = rdbSaveBinaryFloatValue(io->rio, value); retval = rdbSaveBinaryFloatValue(io->prio, value);
if (retval == -1) goto saveerr; if (retval == -1) goto saveerr;
io->bytes += retval; io->bytes += retval;
return; return;
@ -3258,11 +3258,11 @@ saveerr:
* float value saved by RedisModule_SaveFloat(). */ * float value saved by RedisModule_SaveFloat(). */
float RM_LoadFloat(RedisModuleIO *io) { float RM_LoadFloat(RedisModuleIO *io) {
if (io->ver == 2) { if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL); uint64_t opcode = rdbLoadLen(io->prio,NULL);
if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr; if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
} }
float value; float value;
int retval = rdbLoadBinaryFloatValue(io->rio, &value); int retval = rdbLoadBinaryFloatValue(io->prio, &value);
if (retval == -1) goto loaderr; if (retval == -1) goto loaderr;
return value; return value;
@ -3373,12 +3373,12 @@ void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
} }
/* Bulk count. */ /* Bulk count. */
if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0) if (!io->error && rioWriteBulkCount(io->prio,'*',argc) == 0)
io->error = 1; io->error = 1;
/* Arguments. */ /* Arguments. */
for (j = 0; j < argc; j++) { for (j = 0; j < argc; j++) {
if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0) if (!io->error && rioWriteBulkObject(io->prio,argv[j]) == 0)
io->error = 1; io->error = 1;
decrRefCount(argv[j]); decrRefCount(argv[j]);
} }

View File

@ -329,7 +329,7 @@ void touchWatchedKeysOnFlush(int dbid) {
* key exists, mark the client as dirty, as the key will be * key exists, mark the client as dirty, as the key will be
* removed. */ * removed. */
if (dbid == -1 || wk->db->id == dbid) { if (dbid == -1 || wk->db->id == dbid) {
if (dictFind(wk->db->dict, wk->key->ptr) != NULL) if (dictFind(wk->db->pdict, wk->key->ptr) != NULL)
c->flags |= CLIENT_DIRTY_CAS; c->flags |= CLIENT_DIRTY_CAS;
} }
} }

View File

@ -119,7 +119,7 @@ client *createClient(int fd) {
c->argc = 0; c->argc = 0;
c->argv = NULL; c->argv = NULL;
c->cmd = c->lastcmd = NULL; c->cmd = c->lastcmd = NULL;
c->user = DefaultUser; c->puser = DefaultUser;
c->multibulklen = 0; c->multibulklen = 0;
c->bulklen = -1; c->bulklen = -1;
c->sentlen = 0; c->sentlen = 0;
@ -127,7 +127,7 @@ client *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime; c->ctime = c->lastinteraction = server.unixtime;
/* If the default user does not require authentication, the user is /* If the default user does not require authentication, the user is
* directly authenticated. */ * directly authenticated. */
c->authenticated = (c->user->flags & USER_FLAG_NOPASS) != 0; c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) != 0;
c->replstate = REPL_STATE_NONE; c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0; c->repl_put_online_on_ack = 0;
c->reploff = 0; c->reploff = 0;

View File

@ -250,7 +250,7 @@ robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs), MALLOC_SHARED); zset *zs = zmalloc(sizeof(*zs), MALLOC_SHARED);
robj *o; robj *o;
zs->dict = dictCreate(&zsetDictType,NULL); zs->pdict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate(); zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs); o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST; o->encoding = OBJ_ENCODING_SKIPLIST;
@ -310,7 +310,7 @@ void freeZsetObject(robj *o) {
switch (o->encoding) { switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: case OBJ_ENCODING_SKIPLIST:
zs = o->ptr; zs = o->ptr;
dictRelease(zs->dict); dictRelease(zs->pdict);
zslFree(zs->zsl); zslFree(zs->zsl);
zfree(zs); zfree(zs);
break; break;
@ -823,7 +823,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
if (o->encoding == OBJ_ENCODING_ZIPLIST) { if (o->encoding == OBJ_ENCODING_ZIPLIST) {
asize = sizeof(*o)+(ziplistBlobLen(o->ptr)); asize = sizeof(*o)+(ziplistBlobLen(o->ptr));
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) { } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
d = ((zset*)o->ptr)->dict; d = ((zset*)o->ptr)->pdict;
zskiplist *zsl = ((zset*)o->ptr)->zsl; zskiplist *zsl = ((zset*)o->ptr)->zsl;
zskiplistNode *znode = zsl->header->level[0].forward; zskiplistNode *znode = zsl->header->level[0].forward;
asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d)); asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d));
@ -859,14 +859,14 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
} else if (o->type == OBJ_STREAM) { } else if (o->type == OBJ_STREAM) {
stream *s = o->ptr; stream *s = o->ptr;
asize = sizeof(*o); asize = sizeof(*o);
asize += streamRadixTreeMemoryUsage(s->rax); asize += streamRadixTreeMemoryUsage(s->prax);
/* Now we have to add the listpacks. The last listpack is often non /* Now we have to add the listpacks. The last listpack is often non
* complete, so we estimate the size of the first N listpacks, and * complete, so we estimate the size of the first N listpacks, and
* use the average to compute the size of the first N-1 listpacks, and * use the average to compute the size of the first N-1 listpacks, and
* finally add the real size of the last node. */ * finally add the real size of the last node. */
raxIterator ri; raxIterator ri;
raxStart(&ri,s->rax); raxStart(&ri,s->prax);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
size_t lpsize = 0, samples = 0; size_t lpsize = 0, samples = 0;
while(samples < sample_size && raxNext(&ri)) { while(samples < sample_size && raxNext(&ri)) {
@ -874,11 +874,11 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
lpsize += lpBytes(lp); lpsize += lpBytes(lp);
samples++; samples++;
} }
if (s->rax->numele <= samples) { if (s->prax->numele <= samples) {
asize += lpsize; asize += lpsize;
} else { } else {
if (samples) lpsize /= samples; /* Compute the average. */ if (samples) lpsize /= samples; /* Compute the average. */
asize += lpsize * (s->rax->numele-1); asize += lpsize * (s->prax->numele-1);
/* No need to check if seek succeeded, we enter this branch only /* No need to check if seek succeeded, we enter this branch only
* if there are a few elements in the radix tree. */ * if there are a few elements in the radix tree. */
raxSeek(&ri,"$",NULL,0); raxSeek(&ri,"$",NULL,0);
@ -1031,16 +1031,16 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j; redisDb *db = server.db+j;
long long keyscount = dictSize(db->dict); long long keyscount = dictSize(db->pdict);
if (keyscount==0) continue; if (keyscount==0) continue;
mh->total_keys += keyscount; mh->total_keys += keyscount;
mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1), MALLOC_LOCAL); mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1), MALLOC_LOCAL);
mh->db[mh->num_dbs].dbid = j; mh->db[mh->num_dbs].dbid = j;
mem = dictSize(db->dict) * sizeof(dictEntry) + mem = dictSize(db->pdict) * sizeof(dictEntry) +
dictSlots(db->dict) * sizeof(dictEntry*) + dictSlots(db->pdict) * sizeof(dictEntry*) +
dictSize(db->dict) * sizeof(robj); dictSize(db->pdict) * sizeof(robj);
mh->db[mh->num_dbs].overhead_ht_main = mem; mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem; mem_total+=mem;
@ -1222,7 +1222,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
robj *objectCommandLookup(client *c, robj *key) { robj *objectCommandLookup(client *c, robj *key) {
dictEntry *de; dictEntry *de;
if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL; if ((de = dictFind(c->db->pdict,key->ptr)) == NULL) return NULL;
return (robj*) dictGetVal(de); return (robj*) dictGetVal(de);
} }
@ -1315,7 +1315,7 @@ NULL
return; return;
} }
} }
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { if ((de = dictFind(c->db->pdict,c->argv[2]->ptr)) == NULL) {
addReplyNull(c); addReplyNull(c);
return; return;
} }

View File

@ -130,7 +130,7 @@ void freePubsubPattern(void *p) {
int listMatchPubsubPattern(void *a, void *b) { int listMatchPubsubPattern(void *a, void *b) {
pubsubPattern *pa = a, *pb = b; pubsubPattern *pa = a, *pb = b;
return (pa->client == pb->client) && return (pa->pclient == pb->pclient) &&
(equalStringObjects(pa->pattern,pb->pattern)); (equalStringObjects(pa->pattern,pb->pattern));
} }
@ -211,7 +211,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
incrRefCount(pattern); incrRefCount(pattern);
pat = zmalloc(sizeof(*pat), MALLOC_LOCAL); pat = zmalloc(sizeof(*pat), MALLOC_LOCAL);
pat->pattern = getDecodedObject(pattern); pat->pattern = getDecodedObject(pattern);
pat->client = c; pat->pclient = c;
listAddNodeTail(server.pubsub_patterns,pat); listAddNodeTail(server.pubsub_patterns,pat);
} }
/* Notify the client */ /* Notify the client */
@ -230,7 +230,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
retval = 1; retval = 1;
listDelNode(c->pubsub_patterns,ln); listDelNode(c->pubsub_patterns,ln);
pat.client = c; pat.pclient = c;
pat.pattern = pattern; pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat); ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln); listDelNode(server.pubsub_patterns,ln);
@ -309,7 +309,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
(char*)channel->ptr, (char*)channel->ptr,
sdslen(channel->ptr),0)) sdslen(channel->ptr),0))
{ {
addReplyPubsubPatMessage(pat->client, addReplyPubsubPatMessage(pat->pclient,
pat->pattern,channel,message); pat->pattern,channel,message);
receivers++; receivers++;
} }

View File

@ -75,7 +75,7 @@ static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
do { \ do { \
(e)->zi = (e)->value = NULL; \ (e)->zi = (e)->value = NULL; \
(e)->longval = -123456789; \ (e)->longval = -123456789; \
(e)->quicklist = NULL; \ (e)->qlist = NULL; \
(e)->node = NULL; \ (e)->node = NULL; \
(e)->offset = 123456789; \ (e)->offset = 123456789; \
(e)->sz = 0; \ (e)->sz = 0; \
@ -634,7 +634,7 @@ REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) { void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev; quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next; quicklistNode *next = entry->node->next;
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist, int deleted_node = quicklistDelIndex((quicklist *)entry->qlist,
entry->node, &entry->zi); entry->node, &entry->zi);
/* after delete, the zi is now invalid for any future usage. */ /* after delete, the zi is now invalid for any future usage. */
@ -1059,7 +1059,7 @@ quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
} }
iter->direction = direction; iter->direction = direction;
iter->quicklist = quicklist; iter->qlist = quicklist;
iter->zi = NULL; iter->zi = NULL;
@ -1088,7 +1088,7 @@ quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
* If we still have a valid current node, then re-encode current node. */ * If we still have a valid current node, then re-encode current node. */
void quicklistReleaseIterator(quicklistIter *iter) { void quicklistReleaseIterator(quicklistIter *iter) {
if (iter->current) if (iter->current)
quicklistCompress(iter->quicklist, iter->current); quicklistCompress(iter->qlist, iter->current);
zfree(iter); zfree(iter);
} }
@ -1122,7 +1122,7 @@ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
return 0; return 0;
} }
entry->quicklist = iter->quicklist; entry->qlist = iter->qlist;
entry->node = iter->current; entry->node = iter->current;
if (!iter->current) { if (!iter->current) {
@ -1160,7 +1160,7 @@ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
} else { } else {
/* We ran out of ziplist entries. /* We ran out of ziplist entries.
* Pick next node, update offset, then re-run retrieval. */ * Pick next node, update offset, then re-run retrieval. */
quicklistCompress(iter->quicklist, iter->current); quicklistCompress(iter->qlist, iter->current);
if (iter->direction == AL_START_HEAD) { if (iter->direction == AL_START_HEAD) {
/* Forward traversal */ /* Forward traversal */
D("Jumping to start of next node"); D("Jumping to start of next node");
@ -1230,7 +1230,7 @@ int quicklistIndex(const quicklist *quicklist, const long long idx,
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */ int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
initEntry(entry); initEntry(entry);
entry->quicklist = quicklist; entry->qlist = quicklist;
if (!forward) { if (!forward) {
index = (-idx) - 1; index = (-idx) - 1;

View File

@ -31,6 +31,12 @@
#ifndef __QUICKLIST_H__ #ifndef __QUICKLIST_H__
#define __QUICKLIST_H__ #define __QUICKLIST_H__
#ifdef __cplusplus
#define ZERO_LENGTH_ARRAY_LENGTH 1
#else
#define ZERO_LENGTH_ARRAY_LENGTH
#endif
/* Node, quicklist, and Iterator are the only data structures used currently. */ /* Node, quicklist, and Iterator are the only data structures used currently. */
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist. /* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
@ -61,7 +67,7 @@ typedef struct quicklistNode {
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */ * When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF { typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/ unsigned int sz; /* LZF size in bytes*/
char compressed[]; char compressed[ZERO_LENGTH_ARRAY_LENGTH];
} quicklistLZF; } quicklistLZF;
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist. /* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
@ -80,7 +86,7 @@ typedef struct quicklist {
} quicklist; } quicklist;
typedef struct quicklistIter { typedef struct quicklistIter {
const quicklist *quicklist; const quicklist *qlist;
quicklistNode *current; quicklistNode *current;
unsigned char *zi; unsigned char *zi;
long offset; /* offset in current ziplist */ long offset; /* offset in current ziplist */
@ -88,7 +94,7 @@ typedef struct quicklistIter {
} quicklistIter; } quicklistIter;
typedef struct quicklistEntry { typedef struct quicklistEntry {
const quicklist *quicklist; const quicklist *qlist;
quicklistNode *node; quicklistNode *node;
unsigned char *zi; unsigned char *zi;
unsigned char *value; unsigned char *value;

View File

@ -31,6 +31,12 @@
#ifndef RAX_H #ifndef RAX_H
#define RAX_H #define RAX_H
#ifdef __cplusplus
#define ZERO_LENGTH_ARRAY_LENGTH 1
#else
#define ZERO_LENGTH_ARRAY_LENGTH
#endif
#include <stdint.h> #include <stdint.h>
/* Representation of a radix tree as implemented in this file, that contains /* Representation of a radix tree as implemented in this file, that contains
@ -127,7 +133,7 @@ typedef struct raxNode {
* children, an additional value pointer is present (as you can see * children, an additional value pointer is present (as you can see
* in the representation above as "value-ptr" field). * in the representation above as "value-ptr" field).
*/ */
unsigned char data[]; unsigned char data[ZERO_LENGTH_ARRAY_LENGTH];
} raxNode; } raxNode;
typedef struct rax { typedef struct rax {

13
src/rdb-s3.cpp Normal file
View File

@ -0,0 +1,13 @@
extern "C" {
#include "rio.h"
#include "server.h"
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
{
(void)s3bucket;
(void)rsi;
// NOP
return C_ERR;
}

View File

@ -895,7 +895,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
} else if (o->type == OBJ_STREAM) { } else if (o->type == OBJ_STREAM) {
/* Store how many listpacks we have inside the radix tree. */ /* Store how many listpacks we have inside the radix tree. */
stream *s = o->ptr; stream *s = o->ptr;
rax *rax = s->rax; rax *rax = s->prax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1; if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n; nwritten += n;
@ -1116,7 +1116,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j; redisDb *db = server.db+j;
dict *d = db->dict; dict *d = db->pdict;
if (dictSize(d) == 0) continue; if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d); di = dictGetSafeIterator(d);
@ -1129,7 +1129,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* However this does not limit the actual size of the DB to load since * However this does not limit the actual size of the DB to load since
* these sizes are just hints to resize the hash tables. */ * these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size; uint64_t db_size, expires_size;
db_size = dictSize(db->dict); db_size = dictSize(db->pdict);
expires_size = dictSize(db->expires); expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr;
@ -1216,13 +1216,28 @@ werr: /* Write error. */
return C_ERR; return C_ERR;
} }
int rdbSaveFd(int fd, rdbSaveInfo *rsi)
{
int error = 0;
rio rdb;
rioInitWithFile(&rdb,fd);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
return C_ERR;
}
return C_OK;
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */ /* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) { int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256]; char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp; FILE *fp;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w"); fp = fopen(tmpfile,"w");
@ -1237,13 +1252,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
return C_ERR; return C_ERR;
} }
rioInitWithFile(&rdb,fp); if (rdbSaveFd(fileno(fp), rsi) == C_ERR){
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr; goto werr;
} }
@ -1459,7 +1468,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
zs = o->ptr; zs = o->ptr;
if (zsetlen > DICT_HT_INITIAL_SIZE) if (zsetlen > DICT_HT_INITIAL_SIZE)
dictExpand(zs->dict,zsetlen); dictExpand(zs->pdict,zsetlen);
/* Load every single element of the sorted set. */ /* Load every single element of the sorted set. */
while(zsetlen--) { while(zsetlen--) {
@ -1480,7 +1489,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele); if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);
znode = zslInsert(zs->zsl,score,sdsele); znode = zslInsert(zs->zsl,score,sdsele);
dictAdd(zs->dict,sdsele,&znode->score); dictAdd(zs->pdict,sdsele,&znode->score);
} }
/* Convert *after* loading, since sorted sets are not stored ordered. */ /* Convert *after* loading, since sorted sets are not stored ordered. */
@ -1667,7 +1676,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
} }
/* Insert the key in the radix tree. */ /* Insert the key in the radix tree. */
int retval = raxInsert(s->rax, int retval = raxInsert(s->prax,
(unsigned char*)nodekey,sizeof(streamID),lp,NULL); (unsigned char*)nodekey,sizeof(streamID),lp,NULL);
sdsfree(nodekey); sdsfree(nodekey);
if (!retval) if (!retval)
@ -1928,7 +1937,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
goto eoferr; goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
dictExpand(db->dict,db_size); dictExpand(db->pdict,db_size);
dictExpand(db->expires,expires_size); dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) { } else if (type == RDB_OPCODE_AUX) {
@ -2074,7 +2083,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR; if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp); startLoading(fp);
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fileno(fp));
retval = rdbLoadRio(&rdb,rsi,0); retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp); fclose(fp);
stopLoading(); stopLoading();

View File

@ -140,6 +140,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid); void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi); int rdbSave(char *filename, rdbSaveInfo *rsi);
int rdbSaveFd(int fd, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o); ssize_t rdbSaveObject(rio *rdb, robj *o);
size_t rdbSavedObjectLen(robj *o); size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb); robj *rdbLoadObject(int type, rio *rdb);

View File

@ -186,7 +186,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
int closefile = (fp == NULL); int closefile = (fp == NULL);
if (fp == NULL && (fp = fopen(rdbfilename,"r")) == NULL) return 1; if (fp == NULL && (fp = fopen(rdbfilename,"r")) == NULL) return 1;
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fileno(fp));
rdbstate.rio = &rdb; rdbstate.rio = &rdb;
rdb.update_cksum = rdbLoadProgressCallback; rdb.update_cksum = rdbLoadProgressCallback;
if (rioRead(&rdb,buf,9) == 0) goto eoferr; if (rioRead(&rdb,buf,9) == 0) goto eoferr;

View File

@ -1080,7 +1080,7 @@ void replicationCreateMasterClient(int fd, int dbid) {
server.master->authenticated = 1; server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset; server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff; server.master->read_reploff = server.master->reploff;
server.master->user = NULL; /* This client can do everything. */ server.master->puser = NULL; /* This client can do everything. */
memcpy(server.master->replid, server.master_replid, memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid)); sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not /* If master offset is set to -1, this master is old and is not

View File

@ -109,14 +109,13 @@ void rioInitWithBuffer(rio *r, sds s) {
static size_t rioFileWrite(rio *r, const void *buf, size_t len) { static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval; size_t retval;
retval = fwrite(buf,len,1,r->io.file.fp); retval = write(r->io.file.fd,buf,len);
r->io.file.buffered += len; r->io.file.buffered += len;
if (r->io.file.autosync && if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync) r->io.file.buffered >= r->io.file.autosync)
{ {
fflush(r->io.file.fp); redis_fsync(r->io.file.fd);
redis_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0; r->io.file.buffered = 0;
} }
return retval; return retval;
@ -124,18 +123,18 @@ static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
/* Returns 1 or 0 for success/failure. */ /* Returns 1 or 0 for success/failure. */
static size_t rioFileRead(rio *r, void *buf, size_t len) { static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp); return read(r->io.file.fd,buf,len);
} }
/* Returns read/write position in file. */ /* Returns read/write position in file. */
static off_t rioFileTell(rio *r) { static off_t rioFileTell(rio *r) {
return ftello(r->io.file.fp); return lseek(r->io.file.fd, 0, SEEK_CUR);
} }
/* Flushes any buffer to target device if applicable. Returns 1 on success /* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */ * and 0 on failures. */
static int rioFileFlush(rio *r) { static int rioFileFlush(rio *r) {
return (fflush(r->io.file.fp) == 0) ? 1 : 0; return (fsync(r->io.file.fd) == 0) ? 1 : 0;
} }
static const rio rioFileIO = { static const rio rioFileIO = {
@ -150,9 +149,9 @@ static const rio rioFileIO = {
{ { NULL, 0 } } /* union for io-specific vars */ { { NULL, 0 } } /* union for io-specific vars */
}; };
void rioInitWithFile(rio *r, FILE *fp) { void rioInitWithFile(rio *r, int fd) {
*r = rioFileIO; *r = rioFileIO;
r->io.file.fp = fp; r->io.file.fd = fd;
r->io.file.buffered = 0; r->io.file.buffered = 0;
r->io.file.autosync = 0; r->io.file.autosync = 0;
} }

View File

@ -69,7 +69,7 @@ struct _rio {
} buffer; } buffer;
/* Stdio file pointer target. */ /* Stdio file pointer target. */
struct { struct {
FILE *fp; int fd;
off_t buffered; /* Bytes written since last fsync. */ off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */ off_t autosync; /* fsync after 'autosync' bytes written. */
} file; } file;
@ -124,7 +124,7 @@ static inline int rioFlush(rio *r) {
return r->flush(r); return r->flush(r);
} }
void rioInitWithFile(rio *r, FILE *fp); void rioInitWithFile(rio *r, int fd);
void rioInitWithBuffer(rio *r, sds s); void rioInitWithBuffer(rio *r, sds s);
void rioInitWithFdset(rio *r, int *fds, int numfds); void rioInitWithFdset(rio *r, int *fds, int numfds);

View File

@ -34,43 +34,49 @@
#define __SDS_H #define __SDS_H
#define SDS_MAX_PREALLOC (1024*1024) #define SDS_MAX_PREALLOC (1024*1024)
const char *SDS_NOINIT; extern const char *SDS_NOINIT;
#include <sys/types.h> #include <sys/types.h>
#include <stdarg.h> #include <stdarg.h>
#include <stdint.h> #include <stdint.h>
#ifdef __cplusplus
#define ZERO_LENGTH_ARRAY_LENGTH 1
#else
#define ZERO_LENGTH_ARRAY_LENGTH
#endif
typedef char *sds; typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags byte directly. /* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */ * However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 { struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */ unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
}; };
struct __attribute__ ((__packed__)) sdshdr8 { struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */ uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */ uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */ unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
}; };
struct __attribute__ ((__packed__)) sdshdr16 { struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */ uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */ uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */ unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
}; };
struct __attribute__ ((__packed__)) sdshdr32 { struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */ uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */ uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */ unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
}; };
struct __attribute__ ((__packed__)) sdshdr64 { struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */ uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */ uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */ unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
}; };
#define SDS_TYPE_5 0 #define SDS_TYPE_5 0
@ -80,7 +86,7 @@ struct __attribute__ ((__packed__)) sdshdr64 {
#define SDS_TYPE_64 4 #define SDS_TYPE_64 4
#define SDS_TYPE_MASK 7 #define SDS_TYPE_MASK 7
#define SDS_TYPE_BITS 3 #define SDS_TYPE_BITS 3
#define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (void*)((s)-(sizeof(struct sdshdr##T))); #define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (struct sdshdr##T *)(((void*)((s)-(sizeof(struct sdshdr##T)))));
#define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T)))) #define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T))))
#define SDS_TYPE_5_LEN(f) ((f)>>SDS_TYPE_BITS) #define SDS_TYPE_5_LEN(f) ((f)>>SDS_TYPE_BITS)

View File

@ -1283,7 +1283,7 @@ dictType zsetDictType = {
NULL /* val destructor */ NULL /* val destructor */
}; };
/* Db->dict, keys are sds strings, vals are Redis objects. */ /* db->pdict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = { dictType dbDictType = {
dictSdsHash, /* hash function */ dictSdsHash, /* hash function */
NULL, /* key dup */ NULL, /* key dup */
@ -1414,8 +1414,8 @@ int htNeedsResize(dict *dict) {
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */ * we resize the hash table to save memory */
void tryResizeHashTables(int dbid) { void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict)) if (htNeedsResize(server.db[dbid].pdict))
dictResize(server.db[dbid].dict); dictResize(server.db[dbid].pdict);
if (htNeedsResize(server.db[dbid].expires)) if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires); dictResize(server.db[dbid].expires);
} }
@ -1429,8 +1429,8 @@ void tryResizeHashTables(int dbid) {
* is returned. */ * is returned. */
int incrementallyRehash(int dbid) { int incrementallyRehash(int dbid) {
/* Keys dictionary */ /* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) { if (dictIsRehashing(server.db[dbid].pdict)) {
dictRehashMilliseconds(server.db[dbid].dict,1); dictRehashMilliseconds(server.db[dbid].pdict,1);
return 1; /* already used our millisecond for this loop... */ return 1; /* already used our millisecond for this loop... */
} }
/* Expires */ /* Expires */
@ -1856,8 +1856,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys; long long size, used, vkeys;
size = dictSlots(server.db[j].dict); size = dictSlots(server.db[j].pdict);
used = dictSize(server.db[j].dict); used = dictSize(server.db[j].pdict);
vkeys = dictSize(server.db[j].expires); vkeys = dictSize(server.db[j].expires);
if (used || vkeys) { if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
@ -2267,6 +2267,7 @@ void initServerConfig(void) {
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL; server.pidfile = NULL;
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
server.rdb_s3bucketpath = NULL;
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
@ -2757,7 +2758,7 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */ /* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].pdict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
@ -3630,7 +3631,7 @@ void authCommand(client *c) {
if (ACLCheckUserCredentials(username,password) == C_OK) { if (ACLCheckUserCredentials(username,password) == C_OK) {
c->authenticated = 1; c->authenticated = 1;
c->user = ACLGetUserByName(username->ptr,sdslen(username->ptr)); c->puser = ACLGetUserByName(username->ptr,sdslen(username->ptr));
addReply(c,shared.ok); addReply(c,shared.ok);
} else { } else {
addReplyError(c,"-WRONGPASS invalid username-password pair"); addReplyError(c,"-WRONGPASS invalid username-password pair");
@ -4364,7 +4365,7 @@ sds genRedisInfoString(char *section) {
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys; long long keys, vkeys;
keys = dictSize(server.db[j].dict); keys = dictSize(server.db[j].pdict);
vkeys = dictSize(server.db[j].expires); vkeys = dictSize(server.db[j].expires);
if (keys || vkeys) { if (keys || vkeys) {
info = sdscatprintf(info, info = sdscatprintf(info,

View File

@ -570,7 +570,7 @@ typedef struct moduleValue {
* to care about error conditions. */ * to care about error conditions. */
typedef struct RedisModuleIO { typedef struct RedisModuleIO {
size_t bytes; /* Bytes read / written so far. */ size_t bytes; /* Bytes read / written so far. */
rio *rio; /* Rio stream. */ rio *prio; /* Rio stream. */
moduleType *type; /* Module type doing the operation. */ moduleType *type; /* Module type doing the operation. */
int error; /* True if error condition happened. */ int error; /* True if error condition happened. */
int ver; /* Module serialization version: 1 (old), int ver; /* Module serialization version: 1 (old),
@ -581,7 +581,7 @@ typedef struct RedisModuleIO {
/* Macro to initialize an IO context. Note that the 'ver' field is populated /* Macro to initialize an IO context. Note that the 'ver' field is populated
* inside rdb.c according to the version of the value to load. */ * inside rdb.c according to the version of the value to load. */
#define moduleInitIOContext(iovar,mtype,rioptr) do { \ #define moduleInitIOContext(iovar,mtype,rioptr) do { \
iovar.rio = rioptr; \ iovar.prio = rioptr; \
iovar.type = mtype; \ iovar.type = mtype; \
iovar.bytes = 0; \ iovar.bytes = 0; \
iovar.error = 0; \ iovar.error = 0; \
@ -652,14 +652,14 @@ struct evictionPoolEntry; /* Defined in evict.c */
* which is actually a linked list of blocks like that, that is: client->reply. */ * which is actually a linked list of blocks like that, that is: client->reply. */
typedef struct clientReplyBlock { typedef struct clientReplyBlock {
size_t size, used; size_t size, used;
char buf[]; char buf[ZERO_LENGTH_ARRAY_LENGTH];
} clientReplyBlock; } clientReplyBlock;
/* Redis database representation. There are multiple databases identified /* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured * by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */ * database. The database number is the 'id' field in the structure. */
typedef struct redisDb { typedef struct redisDb {
dict *dict; /* The keyspace for this DB */ dict *pdict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */ dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */ dict *ready_keys; /* Blocked keys that received a PUSH */
@ -791,7 +791,7 @@ typedef struct client {
int argc; /* Num of arguments of current command. */ int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */ robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */
user *user; /* User associated with this connection. If the user *puser; /* User associated with this connection. If the
user is set to NULL the connection can do user is set to NULL the connection can do
anything (admin). */ anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */ int reqtype; /* Request protocol type: PROTO_REQ_* */
@ -873,7 +873,7 @@ typedef struct zskiplistNode {
struct zskiplistLevel { struct zskiplistLevel {
struct zskiplistNode *forward; struct zskiplistNode *forward;
unsigned long span; unsigned long span;
} level[]; } level[ZERO_LENGTH_ARRAY_LENGTH];
} zskiplistNode; } zskiplistNode;
typedef struct zskiplist { typedef struct zskiplist {
@ -883,7 +883,7 @@ typedef struct zskiplist {
} zskiplist; } zskiplist;
typedef struct zset { typedef struct zset {
dict *dict; dict *pdict;
zskiplist *zsl; zskiplist *zsl;
} zset; } zset;
@ -1165,6 +1165,7 @@ struct redisServer {
struct saveparam *saveparams; /* Save points array for RDB */ struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */ int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */ char *rdb_filename; /* Name of RDB file */
char *rdb_s3bucketpath; /* Path for AWS S3 backup of RDB file */
int rdb_compression; /* Use compression in RDB? */ int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */ int rdb_checksum; /* Use RDB checksum? */
time_t lastsave; /* Unix time of last successful save */ time_t lastsave; /* Unix time of last successful save */
@ -1353,7 +1354,7 @@ struct redisServer {
}; };
typedef struct pubsubPattern { typedef struct pubsubPattern {
client *client; client *pclient;
robj *pattern; robj *pattern;
} pubsubPattern; } pubsubPattern;
@ -1553,7 +1554,7 @@ void freeClientsInAsyncFreeQueue(void);
void asyncCloseClientOnOutputBufferLimitReached(client *c); void asyncCloseClientOnOutputBufferLimitReached(client *c);
int getClientType(client *c); int getClientType(client *c);
int getClientTypeByName(char *name); int getClientTypeByName(char *name);
char *getClientTypeName(int class); char *getClientTypeName(int cclass);
void flushSlavesOutputBuffers(void); void flushSlavesOutputBuffers(void);
void disconnectSlaves(void); void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count); int listenToPort(int port, int *fds, int *count);

View File

@ -309,7 +309,7 @@ void sortCommand(client *c) {
switch(sortval->type) { switch(sortval->type) {
case OBJ_LIST: vectorlen = listTypeLength(sortval); break; case OBJ_LIST: vectorlen = listTypeLength(sortval); break;
case OBJ_SET: vectorlen = setTypeSize(sortval); break; case OBJ_SET: vectorlen = setTypeSize(sortval); break;
case OBJ_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; case OBJ_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->pdict); break;
default: vectorlen = 0; serverPanic("Bad SORT type"); /* Avoid GCC warning */ default: vectorlen = 0; serverPanic("Bad SORT type"); /* Avoid GCC warning */
} }
@ -404,7 +404,7 @@ void sortCommand(client *c) {
/* Check if starting point is trivial, before doing log(N) lookup. */ /* Check if starting point is trivial, before doing log(N) lookup. */
if (desc) { if (desc) {
long zsetlen = dictSize(((zset*)sortval->ptr)->dict); long zsetlen = dictSize(((zset*)sortval->ptr)->pdict);
ln = zsl->tail; ln = zsl->tail;
if (start > 0) if (start > 0)
@ -428,7 +428,7 @@ void sortCommand(client *c) {
end -= start; end -= start;
start = 0; start = 0;
} else if (sortval->type == OBJ_ZSET) { } else if (sortval->type == OBJ_ZSET) {
dict *set = ((zset*)sortval->ptr)->dict; dict *set = ((zset*)sortval->ptr)->pdict;
dictIterator *di; dictIterator *di;
dictEntry *setele; dictEntry *setele;
sds sdsele; sds sdsele;

View File

@ -11,10 +11,10 @@ enum MALLOC_CLASS
void storage_init(const char *tmpfilePath, size_t cbFileReserve); void storage_init(const char *tmpfilePath, size_t cbFileReserve);
void *salloc(size_t cb, enum MALLOC_CLASS class); void *salloc(size_t cb, enum MALLOC_CLASS mclass);
void *scalloc(size_t cb, size_t c, enum MALLOC_CLASS class); void *scalloc(size_t cb, size_t c, enum MALLOC_CLASS mclass);
void sfree(void*); void sfree(void*);
void *srealloc(void *pv, size_t cb, enum MALLOC_CLASS class); void *srealloc(void *pv, size_t cb, enum MALLOC_CLASS mclass);
size_t salloc_usable_size(void *ptr); size_t salloc_usable_size(void *ptr);
#endif #endif

View File

@ -14,7 +14,7 @@ typedef struct streamID {
} streamID; } streamID;
typedef struct stream { typedef struct stream {
rax *rax; /* The radix tree holding the stream. */ rax *prax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */ uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */ streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
@ -27,7 +27,7 @@ typedef struct stream {
* rewriting code that also needs to iterate the stream to emit the XADD * rewriting code that also needs to iterate the stream to emit the XADD
* commands. */ * commands. */
typedef struct streamIterator { typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */ stream *pstream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */ streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */ uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */ unsigned char *master_fields_start; /* Master entries start in listpack. */

View File

@ -144,10 +144,10 @@ void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
sds str = value->ptr; sds str = value->ptr;
size_t len = sdslen(str); size_t len = sdslen(str);
if (where == LIST_TAIL) { if (where == LIST_TAIL) {
quicklistInsertAfter((quicklist *)entry->entry.quicklist, quicklistInsertAfter((quicklist *)entry->entry.qlist,
&entry->entry, str, len); &entry->entry, str, len);
} else if (where == LIST_HEAD) { } else if (where == LIST_HEAD) {
quicklistInsertBefore((quicklist *)entry->entry.quicklist, quicklistInsertBefore((quicklist *)entry->entry.qlist,
&entry->entry, str, len); &entry->entry, str, len);
} }
decrRefCount(value); decrRefCount(value);

View File

@ -51,7 +51,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
/* Create a new stream data structure. */ /* Create a new stream data structure. */
stream *streamNew(void) { stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s), MALLOC_SHARED); stream *s = zmalloc(sizeof(*s), MALLOC_SHARED);
s->rax = raxNew(); s->prax = raxNew();
s->length = 0; s->length = 0;
s->last_id.ms = 0; s->last_id.ms = 0;
s->last_id.seq = 0; s->last_id.seq = 0;
@ -61,7 +61,7 @@ stream *streamNew(void) {
/* Free a stream, including the listpacks stored inside the radix tree. */ /* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) { void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); raxFreeWithCallback(s->prax,(void(*)(void*))lpFree);
if (s->cgroups) if (s->cgroups)
raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
zfree(s); zfree(s);
@ -179,7 +179,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Add the new entry. */ /* Add the new entry. */
raxIterator ri; raxIterator ri;
raxStart(&ri,s->rax); raxStart(&ri,s->prax);
raxSeek(&ri,"$",NULL,0); raxSeek(&ri,"$",NULL,0);
size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
@ -265,7 +265,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
} }
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); raxInsert(s->prax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the /* The first entry we insert, has obviously the same fields of the
* master entry. */ * master entry. */
flags |= STREAM_ITEM_FLAG_SAMEFIELDS; flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
@ -350,7 +350,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
/* Insert back into the tree in order to update the listpack pointer. */ /* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp) if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); raxInsert(s->prax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++; s->length++;
s->last_id = id; s->last_id = id;
if (added_id) *added_id = id; if (added_id) *added_id = id;
@ -375,7 +375,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0; if (s->length <= maxlen) return 0;
raxIterator ri; raxIterator ri;
raxStart(&ri,s->rax); raxStart(&ri,s->prax);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0; int64_t deleted = 0;
@ -387,7 +387,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* least maxlen elements. */ * least maxlen elements. */
if (s->length - entries >= maxlen) { if (s->length - entries >= maxlen) {
lpFree(lp); lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL); raxRemove(s->prax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len); raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries; s->length -= entries;
deleted += entries; deleted += entries;
@ -454,7 +454,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
} }
/* Update the listpack with the new pointer. */ /* Update the listpack with the new pointer. */
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); raxInsert(s->prax,ri.key,ri.key_len,lp,NULL);
break; /* If we are here, there was enough to delete in the current break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */ node, so no need to go to the next node. */
@ -503,7 +503,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
} }
/* Seek the correct node in the radix tree. */ /* Seek the correct node in the radix tree. */
raxStart(&si->ri,s->rax); raxStart(&si->ri,s->prax);
if (!rev) { if (!rev) {
if (start && (start->ms || start->seq)) { if (start && (start->ms || start->seq)) {
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
@ -521,7 +521,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
raxSeek(&si->ri,"$",NULL,0); raxSeek(&si->ri,"$",NULL,0);
} }
} }
si->stream = s; si->pstream = s;
si->lp = NULL; /* There is no current listpack right now. */ si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */ si->lp_ele = NULL; /* Current listpack cursor. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
@ -718,7 +718,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* If this is the last element in the listpack, we can remove the whole /* If this is the last element in the listpack, we can remove the whole
* node. */ * node. */
lpFree(lp); lpFree(lp);
raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL); raxRemove(si->pstream->prax,si->ri.key,si->ri.key_len,NULL);
} else { } else {
/* In the base case we alter the counters of valid/deleted entries. */ /* In the base case we alter the counters of valid/deleted entries. */
lp = lpReplaceInteger(lp,&p,aux-1); lp = lpReplaceInteger(lp,&p,aux-1);
@ -728,11 +728,11 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
/* Update the listpack with the new pointer. */ /* Update the listpack with the new pointer. */
if (si->lp != lp) if (si->lp != lp)
raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL); raxInsert(si->pstream->prax,si->ri.key,si->ri.key_len,lp,NULL);
} }
/* Update the number of entries counter. */ /* Update the number of entries counter. */
si->stream->length--; si->pstream->length--;
/* Re-seek the iterator to fix the now messed up state. */ /* Re-seek the iterator to fix the now messed up state. */
streamID start, end; streamID start, end;
@ -744,7 +744,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
streamDecodeID(si->end_key,&end); streamDecodeID(si->end_key,&end);
} }
streamIteratorStop(si); streamIteratorStop(si);
streamIteratorStart(si,si->stream,&start,&end,si->rev); streamIteratorStart(si,si->pstream,&start,&end,si->rev);
/* TODO: perform a garbage collection here if the ration between /* TODO: perform a garbage collection here if the ration between
* deleted and valid goes over a certain limit. */ * deleted and valid goes over a certain limit. */
@ -2517,9 +2517,9 @@ NULL
addReplyBulkCString(c,"length"); addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length); addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys"); addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax)); addReplyLongLong(c,raxSize(s->prax));
addReplyBulkCString(c,"radix-tree-nodes"); addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes); addReplyLongLong(c,s->prax->numnodes);
addReplyBulkCString(c,"groups"); addReplyBulkCString(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
addReplyBulkCString(c,"last-generated-id"); addReplyBulkCString(c,"last-generated-id");

View File

@ -1183,7 +1183,7 @@ void zsetConvert(robj *zobj, int encoding) {
serverPanic("Unknown target encoding"); serverPanic("Unknown target encoding");
zs = zmalloc(sizeof(*zs), MALLOC_SHARED); zs = zmalloc(sizeof(*zs), MALLOC_SHARED);
zs->dict = dictCreate(&zsetDictType,NULL); zs->pdict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate(); zs->zsl = zslCreate();
eptr = ziplistIndex(zl,0); eptr = ziplistIndex(zl,0);
@ -1200,7 +1200,7 @@ void zsetConvert(robj *zobj, int encoding) {
ele = sdsnewlen((char*)vstr,vlen); ele = sdsnewlen((char*)vstr,vlen);
node = zslInsert(zs->zsl,score,ele); node = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); serverAssert(dictAdd(zs->pdict,ele,&node->score) == DICT_OK);
zzlNext(zl,&eptr,&sptr); zzlNext(zl,&eptr,&sptr);
} }
@ -1216,7 +1216,7 @@ void zsetConvert(robj *zobj, int encoding) {
/* Approach similar to zslFree(), since we want to free the skiplist at /* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */ * the same time as creating the ziplist. */
zs = zobj->ptr; zs = zobj->ptr;
dictRelease(zs->dict); dictRelease(zs->pdict);
node = zs->zsl->header->level[0].forward; node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header); zfree(zs->zsl->header);
zfree(zs->zsl); zfree(zs->zsl);
@ -1259,7 +1259,7 @@ int zsetScore(robj *zobj, sds member, double *score) {
if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR; if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr; zset *zs = zobj->ptr;
dictEntry *de = dictFind(zs->dict, member); dictEntry *de = dictFind(zs->pdict, member);
if (de == NULL) return C_ERR; if (de == NULL) return C_ERR;
*score = *(double*)dictGetVal(de); *score = *(double*)dictGetVal(de);
} else { } else {
@ -1373,7 +1373,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
zskiplistNode *znode; zskiplistNode *znode;
dictEntry *de; dictEntry *de;
de = dictFind(zs->dict,ele); de = dictFind(zs->pdict,ele);
if (de != NULL) { if (de != NULL) {
/* NX? Return, same element already exists. */ /* NX? Return, same element already exists. */
if (nx) { if (nx) {
@ -1405,7 +1405,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
} else if (!xx) { } else if (!xx) {
ele = sdsdup(ele); ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele); znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); serverAssert(dictAdd(zs->pdict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED; *flags |= ZADD_ADDED;
if (newscore) *newscore = score; if (newscore) *newscore = score;
return 1; return 1;
@ -1434,7 +1434,7 @@ int zsetDel(robj *zobj, sds ele) {
dictEntry *de; dictEntry *de;
double score; double score;
de = dictUnlink(zs->dict,ele); de = dictUnlink(zs->pdict,ele);
if (de != NULL) { if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */ /* Get the score in order to delete from the skiplist later. */
score = *(double*)dictGetVal(de); score = *(double*)dictGetVal(de);
@ -1444,13 +1444,13 @@ int zsetDel(robj *zobj, sds ele) {
* actually releases the SDS string representing the element, * actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so * which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */ * we need to delete from the skiplist as the final step. */
dictFreeUnlinkedEntry(zs->dict,de); dictFreeUnlinkedEntry(zs->pdict,de);
/* Delete from skiplist. */ /* Delete from skiplist. */
int retval = zslDelete(zs->zsl,score,ele,NULL); int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval); serverAssert(retval);
if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (htNeedsResize(zs->pdict)) dictResize(zs->pdict);
return 1; return 1;
} }
} else { } else {
@ -1507,7 +1507,7 @@ long zsetRank(robj *zobj, sds ele, int reverse) {
dictEntry *de; dictEntry *de;
double score; double score;
de = dictFind(zs->dict,ele); de = dictFind(zs->pdict,ele);
if (de != NULL) { if (de != NULL) {
score = *(double*)dictGetVal(de); score = *(double*)dictGetVal(de);
rank = zslGetRank(zsl,score,ele); rank = zslGetRank(zsl,score,ele);
@ -1758,17 +1758,17 @@ void zremrangeGenericCommand(client *c, int rangetype) {
zset *zs = zobj->ptr; zset *zs = zobj->ptr;
switch(rangetype) { switch(rangetype) {
case ZRANGE_RANK: case ZRANGE_RANK:
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->pdict);
break; break;
case ZRANGE_SCORE: case ZRANGE_SCORE:
deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict); deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->pdict);
break; break;
case ZRANGE_LEX: case ZRANGE_LEX:
deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict); deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->pdict);
break; break;
} }
if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (htNeedsResize(zs->pdict)) dictResize(zs->pdict);
if (dictSize(zs->dict) == 0) { if (dictSize(zs->pdict) == 0) {
dbDelete(c->db,key); dbDelete(c->db,key);
keyremoved = 1; keyremoved = 1;
} }
@ -2117,7 +2117,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
} else if (op->encoding == OBJ_ENCODING_SKIPLIST) { } else if (op->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = op->subject->ptr; zset *zs = op->subject->ptr;
dictEntry *de; dictEntry *de;
if ((de = dictFind(zs->dict,val->ele)) != NULL) { if ((de = dictFind(zs->pdict,val->ele)) != NULL) {
*score = *(double*)dictGetVal(de); *score = *(double*)dictGetVal(de);
return 1; return 1;
} else { } else {
@ -2303,7 +2303,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
if (j == setnum) { if (j == setnum) {
tmp = zuiNewSdsFromValue(&zval); tmp = zuiNewSdsFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp); znode = zslInsert(dstzset->zsl,score,tmp);
dictAdd(dstzset->dict,tmp,&znode->score); dictAdd(dstzset->pdict,tmp,&znode->score);
if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp); if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
} }
} }
@ -2363,13 +2363,13 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
/* We now are aware of the final size of the resulting sorted set, /* We now are aware of the final size of the resulting sorted set,
* let's resize the dictionary embedded inside the sorted set to the * let's resize the dictionary embedded inside the sorted set to the
* right size, in order to save rehashing time. */ * right size, in order to save rehashing time. */
dictExpand(dstzset->dict,dictSize(accumulator)); dictExpand(dstzset->pdict,dictSize(accumulator));
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de); sds ele = dictGetKey(de);
score = dictGetDoubleVal(de); score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl,score,ele); znode = zslInsert(dstzset->zsl,score,ele);
dictAdd(dstzset->dict,ele,&znode->score); dictAdd(dstzset->pdict,ele,&znode->score);
} }
dictReleaseIterator(di); dictReleaseIterator(di);
dictRelease(accumulator); dictRelease(accumulator);

View File

@ -85,9 +85,9 @@
#define HAVE_DEFRAG #define HAVE_DEFRAG
#endif #endif
void *zmalloc(size_t size, enum MALLOC_CLASS class); void *zmalloc(size_t size, enum MALLOC_CLASS mclass);
void *zcalloc(size_t size, enum MALLOC_CLASS class); void *zcalloc(size_t size, enum MALLOC_CLASS mclass);
void *zrealloc(void *ptr, size_t size, enum MALLOC_CLASS class); void *zrealloc(void *ptr, size_t size, enum MALLOC_CLASS mclass);
void zfree(void *ptr); void zfree(void *ptr);
char *zstrdup(const char *s); char *zstrdup(const char *s);
size_t zmalloc_used_memory(void); size_t zmalloc_used_memory(void);