active defrag improvements
This commit is contained in:
parent
7aa9e6d2ae
commit
5ab6a54cc6
@ -457,8 +457,9 @@ void debugCommand(client *c) {
|
|||||||
if (valsize==0)
|
if (valsize==0)
|
||||||
val = createStringObject(buf,strlen(buf));
|
val = createStringObject(buf,strlen(buf));
|
||||||
else {
|
else {
|
||||||
|
int buflen = strlen(buf);
|
||||||
val = createStringObject(NULL,valsize);
|
val = createStringObject(NULL,valsize);
|
||||||
memset(val->ptr, 0, valsize);
|
memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen);
|
||||||
}
|
}
|
||||||
dbAdd(c->db,key,val);
|
dbAdd(c->db,key,val);
|
||||||
signalModifiedKey(c->db,key);
|
signalModifiedKey(c->db,key);
|
||||||
|
70
src/defrag.c
70
src/defrag.c
@ -94,15 +94,17 @@ sds activeDefragSds(sds sdsptr) {
|
|||||||
* returns NULL in case the allocatoin wasn't moved.
|
* returns NULL in case the allocatoin wasn't moved.
|
||||||
* when it returns a non-null value, the old pointer was already released
|
* when it returns a non-null value, the old pointer was already released
|
||||||
* and should NOT be accessed. */
|
* and should NOT be accessed. */
|
||||||
robj *activeDefragStringOb(robj* ob) {
|
robj *activeDefragStringOb(robj* ob, int *defragged) {
|
||||||
robj *ret = NULL;
|
robj *ret = NULL;
|
||||||
if (ob->refcount!=1)
|
if (ob->refcount!=1)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
/* try to defrag robj (only if not an EMBSTR type (handled below) */
|
/* try to defrag robj (only if not an EMBSTR type (handled below) */
|
||||||
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
||||||
if ((ret = activeDefragAlloc(ob)))
|
if ((ret = activeDefragAlloc(ob))) {
|
||||||
ob = ret;
|
ob = ret;
|
||||||
|
(*defragged)++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* try to defrag string object */
|
/* try to defrag string object */
|
||||||
@ -111,18 +113,14 @@ robj *activeDefragStringOb(robj* ob) {
|
|||||||
sds newsds = activeDefragSds((sds)ob->ptr);
|
sds newsds = activeDefragSds((sds)ob->ptr);
|
||||||
if (newsds) {
|
if (newsds) {
|
||||||
ob->ptr = newsds;
|
ob->ptr = newsds;
|
||||||
/* we don't need to change the return value here.
|
(*defragged)++;
|
||||||
* we can return NULL if 'ret' is still NULL (since the object pointer itself wasn't changed).
|
|
||||||
* but we set return value to ob as an indication that we defragged a pointer (for stats).
|
|
||||||
* NOTE: if ret is already set and the robj was moved, then our stats will be a bit off
|
|
||||||
* since two pointers were moved, but we show only one in the stats */
|
|
||||||
ret = ob;
|
|
||||||
}
|
}
|
||||||
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
||||||
/* the sds is embedded in the object allocation, calculate the offset and update the pointer in the new allocation */
|
/* the sds is embedded in the object allocation, calculate the offset and update the pointer in the new allocation */
|
||||||
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
||||||
if ((ret = activeDefragAlloc(ob))) {
|
if ((ret = activeDefragAlloc(ob))) {
|
||||||
ret->ptr = (void*)((intptr_t)ret + ofs);
|
ret->ptr = (void*)((intptr_t)ret + ofs);
|
||||||
|
(*defragged)++;
|
||||||
}
|
}
|
||||||
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
||||||
serverPanic("Unknown string encoding");
|
serverPanic("Unknown string encoding");
|
||||||
@ -191,18 +189,18 @@ void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnod
|
|||||||
if (update[i]->level[i].forward == oldnode)
|
if (update[i]->level[i].forward == oldnode)
|
||||||
update[i]->level[i].forward = newnode;
|
update[i]->level[i].forward = newnode;
|
||||||
}
|
}
|
||||||
if (zsl->header==oldnode)
|
serverAssert(zsl->header!=oldnode);
|
||||||
zsl->header = newnode;
|
|
||||||
if (zsl->tail==oldnode)
|
|
||||||
zsl->tail = newnode;
|
|
||||||
if (newnode->level[0].forward) {
|
if (newnode->level[0].forward) {
|
||||||
serverAssert(newnode->level[0].forward->backward==oldnode);
|
serverAssert(newnode->level[0].forward->backward==oldnode);
|
||||||
newnode->level[0].forward->backward = newnode;
|
newnode->level[0].forward->backward = newnode;
|
||||||
|
} else {
|
||||||
|
serverAssert(zsl->tail==oldnode);
|
||||||
|
zsl->tail = newnode;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag helper for sorted set.
|
/* Defrag helper for sorted set.
|
||||||
* Update the robj pointer, defrag the struct and return the new score reference.
|
* Update the robj pointer, defrag the skiplist struct and return the new score reference.
|
||||||
* we may not access oldele pointer (not even the pointer stored in the skiplist), as it was already freed.
|
* we may not access oldele pointer (not even the pointer stored in the skiplist), as it was already freed.
|
||||||
* newele may be null, in which case we only need to defrag the skiplist, but not update the obj pointer.
|
* newele may be null, in which case we only need to defrag the skiplist, but not update the obj pointer.
|
||||||
* when return value is non-NULL, it is the score reference that must be updated in the dict record. */
|
* when return value is non-NULL, it is the score reference that must be updated in the dict record. */
|
||||||
@ -239,6 +237,28 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Utility function that replaces an old key pointer in the dictionary with a new pointer.
|
||||||
|
* Additionally, we try to defrag the dictEntry in that dict.
|
||||||
|
* oldkey mey be a dead pointer and should not be accessed (we get a pre-calculated hash value).
|
||||||
|
* newkey may be null if the key pointer wasn't moved.
|
||||||
|
* return value is the the dictEntry if found, or NULL if not found.
|
||||||
|
* NOTE: this is very ugly code, but it let's us avoid the complication of doing a scan on another dict. */
|
||||||
|
dictEntry* replaceSateliteDictKeyPtrAndOrDifragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, int *defragged) {
|
||||||
|
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
|
||||||
|
if (deref) {
|
||||||
|
dictEntry *de = *deref;
|
||||||
|
dictEntry *newde = activeDefragAlloc(de);
|
||||||
|
if (newde) {
|
||||||
|
de = *deref = newde;
|
||||||
|
(*defragged)++;
|
||||||
|
}
|
||||||
|
if (newkey)
|
||||||
|
de->key = newkey;
|
||||||
|
return de;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* for each key we scan in the main dict, this function will attempt to defrag all the various pointers it has.
|
/* for each key we scan in the main dict, this function will attempt to defrag all the various pointers it has.
|
||||||
* returns a stat of how many pointers were moved. */
|
* returns a stat of how many pointers were moved. */
|
||||||
int defargKey(redisDb *db, dictEntry *de) {
|
int defargKey(redisDb *db, dictEntry *de) {
|
||||||
@ -252,24 +272,21 @@ int defargKey(redisDb *db, dictEntry *de) {
|
|||||||
|
|
||||||
/* try to defrag the key name */
|
/* try to defrag the key name */
|
||||||
newsds = activeDefragSds(keysds);
|
newsds = activeDefragSds(keysds);
|
||||||
if (newsds) {
|
if (newsds)
|
||||||
de->key = newsds;
|
defragged++, de->key = newsds;
|
||||||
if (dictSize(db->expires)) {
|
if (dictSize(db->expires)) {
|
||||||
/* Dirty code:
|
/* Dirty code:
|
||||||
* i can't search in db->expires for that key after i already released the pointer it holds
|
* i can't search in db->expires for that key after i already released the pointer it holds
|
||||||
* it won't be able to do the string compare */
|
* it won't be able to do the string compare */
|
||||||
unsigned int hash = dictGetHash(db->dict, newsds);
|
unsigned int hash = dictGetHash(db->dict, de->key);
|
||||||
dictReplaceKeyPtr(db->expires, keysds, newsds, hash);
|
replaceSateliteDictKeyPtrAndOrDifragDictEntry(db->expires, keysds, newsds, hash, &defragged);
|
||||||
}
|
|
||||||
defragged++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* try to defrag robj and / or string value */
|
/* try to defrag robj and / or string value */
|
||||||
ob = dictGetVal(de);
|
ob = dictGetVal(de);
|
||||||
if ((newob = activeDefragStringOb(ob))) {
|
if ((newob = activeDefragStringOb(ob, &defragged))) {
|
||||||
de->v.val = newob;
|
de->v.val = newob;
|
||||||
ob = newob;
|
ob = newob;
|
||||||
defragged++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ob->type == OBJ_STRING) {
|
if (ob->type == OBJ_STRING) {
|
||||||
@ -328,13 +345,15 @@ int defargKey(redisDb *db, dictEntry *de) {
|
|||||||
defragged++, ob->ptr = newzl;
|
defragged++, ob->ptr = newzl;
|
||||||
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
zset *zs = (zset*)ob->ptr;
|
zset *zs = (zset*)ob->ptr;
|
||||||
zset *newzs = activeDefragAlloc(zs);
|
zset *newzs;
|
||||||
zskiplist *newzsl;
|
zskiplist *newzsl;
|
||||||
if (newzs)
|
struct zskiplistNode *newheader;
|
||||||
|
if ((newzs = activeDefragAlloc(zs)))
|
||||||
defragged++, ob->ptr = zs = newzs;
|
defragged++, ob->ptr = zs = newzs;
|
||||||
newzsl = activeDefragAlloc(zs->zsl);
|
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
||||||
if (newzsl)
|
|
||||||
defragged++, zs->zsl = newzsl;
|
defragged++, zs->zsl = newzsl;
|
||||||
|
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
||||||
|
defragged++, zs->zsl->header = newheader;
|
||||||
d = zs->dict;
|
d = zs->dict;
|
||||||
di = dictGetIterator(d);
|
di = dictGetIterator(d);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
@ -383,7 +402,6 @@ int defargKey(redisDb *db, dictEntry *de) {
|
|||||||
|
|
||||||
/* defrag scan callback for the main db dictionary */
|
/* defrag scan callback for the main db dictionary */
|
||||||
void defragScanCallback(void *privdata, const dictEntry *de) {
|
void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||||
/* TODO: defrag the dictEntry (and also the entriy in expire dict). */
|
|
||||||
int defragged = defargKey((redisDb*)privdata, (dictEntry*)de);
|
int defragged = defargKey((redisDb*)privdata, (dictEntry*)de);
|
||||||
server.stat_active_defrag_hits += defragged;
|
server.stat_active_defrag_hits += defragged;
|
||||||
if(defragged)
|
if(defragged)
|
||||||
|
20
src/dict.c
20
src/dict.c
@ -1048,25 +1048,25 @@ unsigned int dictGetHash(dict *d, const void *key) {
|
|||||||
return dictHashKey(d, key);
|
return dictHashKey(d, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replace an old key pointer in the dictionary with a new pointer.
|
/* Finds the dictEntry reference by using pointer and pre-calculated hash.
|
||||||
* oldkey is a dead pointer and should not be accessed.
|
* oldkey is a dead pointer and should not be accessed.
|
||||||
* the hash value should be provided using dictGetHash.
|
* the hash value should be provided using dictGetHash.
|
||||||
* no string / key comparison is performed.
|
* no string / key comparison is performed.
|
||||||
* return value is the dictEntry if found, or NULL if not found. */
|
* return value is the reference to the dictEntry if found, or NULL if not found. */
|
||||||
dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash) {
|
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, unsigned int hash) {
|
||||||
dictEntry *he;
|
dictEntry *he, **heref;
|
||||||
unsigned int idx, table;
|
unsigned int idx, table;
|
||||||
|
|
||||||
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
|
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
|
||||||
for (table = 0; table <= 1; table++) {
|
for (table = 0; table <= 1; table++) {
|
||||||
idx = hash & d->ht[table].sizemask;
|
idx = hash & d->ht[table].sizemask;
|
||||||
he = d->ht[table].table[idx];
|
heref = &d->ht[table].table[idx];
|
||||||
|
he = *heref;
|
||||||
while(he) {
|
while(he) {
|
||||||
if (oldptr==he->key) {
|
if (oldptr==he->key)
|
||||||
he->key = newptr;
|
return heref;
|
||||||
return he;
|
heref = &he->next;
|
||||||
}
|
he = *heref;
|
||||||
he = he->next;
|
|
||||||
}
|
}
|
||||||
if (!dictIsRehashing(d)) return NULL;
|
if (!dictIsRehashing(d)) return NULL;
|
||||||
}
|
}
|
||||||
|
@ -179,7 +179,7 @@ void dictSetHashFunctionSeed(unsigned int initval);
|
|||||||
unsigned int dictGetHashFunctionSeed(void);
|
unsigned int dictGetHashFunctionSeed(void);
|
||||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
||||||
unsigned int dictGetHash(dict *d, const void *key);
|
unsigned int dictGetHash(dict *d, const void *key);
|
||||||
dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash);
|
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, unsigned int hash);
|
||||||
|
|
||||||
/* Hash table types */
|
/* Hash table types */
|
||||||
extern dictType dictTypeHeapStringCopyKey;
|
extern dictType dictTypeHeapStringCopyKey;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user