
Redis 6.0 introduces I/O threads, it is so cool and efficient, we use C11 _Atomic to establish inter-thread synchronization without mutex. But the compiler that must supports C11 _Atomic can compile redis code, that brings a lot of inconvenience since some common platforms can't support by default such as CentOS7, so we want to implement redis atomic type to make it more portable. We have implemented our atomic variable for redis that only has 'relaxed' operations in src/atomicvar.h, so we implement some operations with 'sequentially-consistent', just like the default behavior of C11 _Atomic that can establish inter-thread synchronization. And we replace all uses of C11 _Atomic with redis atomic variable. Our implementation of redis atomic variable uses C11 _Atomic, __atomic or __sync macros if available, it supports most common platforms, and we will detect automatically which feature we use. In Makefile we use a dummy file to detect if the compiler supports C11 _Atomic. Now for gcc, we can compile redis code theoretically if your gcc version is not less than 4.1.2(starts to support __sync_xxx operations). Otherwise, we remove use mutex fallback to implement redis atomic variable for performance and test. You will get compiling errors if your compiler doesn't support all features of above. For cover redis atomic variable tests, we add other CI jobs that build redis on CentOS6 and CentOS7 and workflow daily jobs that run the tests on them. For them, we just install gcc by default in order to cover different compiler versions, gcc is 4.4.7 by default installation on CentOS6 and 4.8.5 on CentOS7. We restore the feature that we can test redis with Helgrind to find data race errors. But you need install Valgrind in the default path configuration firstly before running your tests, since we use macros in helgrind.h to tell Helgrind inter-thread happens-before relationship explicitly for avoiding false positives. Please open an issue on github if you find data race errors relate to this commit. Unrelated: - Fix redefinition of typedef 'RedisModuleUserChangedFunc' For some old version compilers, they will report errors or warnings, if we re-define function type.
176 lines
6.6 KiB
C
176 lines
6.6 KiB
C
#include "server.h"
|
|
#include "bio.h"
|
|
#include "atomicvar.h"
|
|
#include "cluster.h"
|
|
|
|
static redisAtomic size_t lazyfree_objects = 0;
|
|
|
|
/* Return the number of currently pending objects to free. */
|
|
size_t lazyfreeGetPendingObjectsCount(void) {
|
|
size_t aux;
|
|
atomicGet(lazyfree_objects,aux);
|
|
return aux;
|
|
}
|
|
|
|
/* Return the amount of work needed in order to free an object.
|
|
* The return value is not always the actual number of allocations the
|
|
* object is composed of, but a number proportional to it.
|
|
*
|
|
* For strings the function always returns 1.
|
|
*
|
|
* For aggregated objects represented by hash tables or other data structures
|
|
* the function just returns the number of elements the object is composed of.
|
|
*
|
|
* Objects composed of single allocations are always reported as having a
|
|
* single item even if they are actually logical composed of multiple
|
|
* elements.
|
|
*
|
|
* For lists the function returns the number of elements in the quicklist
|
|
* representing the list. */
|
|
size_t lazyfreeGetFreeEffort(robj *obj) {
|
|
if (obj->type == OBJ_LIST) {
|
|
quicklist *ql = obj->ptr;
|
|
return ql->len;
|
|
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
|
|
dict *ht = obj->ptr;
|
|
return dictSize(ht);
|
|
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
|
|
zset *zs = obj->ptr;
|
|
return zs->zsl->length;
|
|
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
|
|
dict *ht = obj->ptr;
|
|
return dictSize(ht);
|
|
} else if (obj->type == OBJ_STREAM) {
|
|
size_t effort = 0;
|
|
stream *s = obj->ptr;
|
|
|
|
/* Make a best effort estimate to maintain constant runtime. Every macro
|
|
* node in the Stream is one allocation. */
|
|
effort += s->rax->numnodes;
|
|
|
|
/* Every consumer group is an allocation and so are the entries in its
|
|
* PEL. We use size of the first group's PEL as an estimate for all
|
|
* others. */
|
|
if (s->cgroups) {
|
|
raxIterator ri;
|
|
streamCG *cg;
|
|
raxStart(&ri,s->cgroups);
|
|
raxSeek(&ri,"^",NULL,0);
|
|
/* There must be at least one group so the following should always
|
|
* work. */
|
|
serverAssert(raxNext(&ri));
|
|
cg = ri.data;
|
|
effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
|
|
raxStop(&ri);
|
|
}
|
|
return effort;
|
|
} else {
|
|
return 1; /* Everything else is a single allocation. */
|
|
}
|
|
}
|
|
|
|
/* Delete a key, value, and associated expiration entry if any, from the DB.
|
|
* If there are enough allocations to free the value object may be put into
|
|
* a lazy free list instead of being freed synchronously. The lazy free list
|
|
* will be reclaimed in a different bio.c thread. */
|
|
#define LAZYFREE_THRESHOLD 64
|
|
int dbAsyncDelete(redisDb *db, robj *key) {
|
|
/* Deleting an entry from the expires dict will not free the sds of
|
|
* the key, because it is shared with the main dictionary. */
|
|
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
|
|
|
|
/* If the value is composed of a few allocations, to free in a lazy way
|
|
* is actually just slower... So under a certain limit we just free
|
|
* the object synchronously. */
|
|
dictEntry *de = dictUnlink(db->dict,key->ptr);
|
|
if (de) {
|
|
robj *val = dictGetVal(de);
|
|
size_t free_effort = lazyfreeGetFreeEffort(val);
|
|
|
|
/* If releasing the object is too much work, do it in the background
|
|
* by adding the object to the lazy free list.
|
|
* Note that if the object is shared, to reclaim it now it is not
|
|
* possible. This rarely happens, however sometimes the implementation
|
|
* of parts of the Redis core may call incrRefCount() to protect
|
|
* objects, and then call dbDelete(). In this case we'll fall
|
|
* through and reach the dictFreeUnlinkedEntry() call, that will be
|
|
* equivalent to just calling decrRefCount(). */
|
|
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
|
|
atomicIncr(lazyfree_objects,1);
|
|
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
|
|
dictSetVal(db->dict,de,NULL);
|
|
}
|
|
}
|
|
|
|
/* Release the key-val pair, or just the key if we set the val
|
|
* field to NULL in order to lazy free it later. */
|
|
if (de) {
|
|
dictFreeUnlinkedEntry(db->dict,de);
|
|
if (server.cluster_enabled) slotToKeyDel(key->ptr);
|
|
return 1;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* Free an object, if the object is huge enough, free it in async way. */
|
|
void freeObjAsync(robj *o) {
|
|
size_t free_effort = lazyfreeGetFreeEffort(o);
|
|
if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
|
|
atomicIncr(lazyfree_objects,1);
|
|
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
|
|
} else {
|
|
decrRefCount(o);
|
|
}
|
|
}
|
|
|
|
/* Empty a Redis DB asynchronously. What the function does actually is to
|
|
* create a new empty set of hash tables and scheduling the old ones for
|
|
* lazy freeing. */
|
|
void emptyDbAsync(redisDb *db) {
|
|
dict *oldht1 = db->dict, *oldht2 = db->expires;
|
|
db->dict = dictCreate(&dbDictType,NULL);
|
|
db->expires = dictCreate(&keyptrDictType,NULL);
|
|
atomicIncr(lazyfree_objects,dictSize(oldht1));
|
|
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
|
|
}
|
|
|
|
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
|
|
* and scheduling the old for lazy freeing. */
|
|
void slotToKeyFlushAsync(void) {
|
|
rax *old = server.cluster->slots_to_keys;
|
|
|
|
server.cluster->slots_to_keys = raxNew();
|
|
memset(server.cluster->slots_keys_count,0,
|
|
sizeof(server.cluster->slots_keys_count));
|
|
atomicIncr(lazyfree_objects,old->numele);
|
|
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old);
|
|
}
|
|
|
|
/* Release objects from the lazyfree thread. It's just decrRefCount()
|
|
* updating the count of objects to release. */
|
|
void lazyfreeFreeObjectFromBioThread(robj *o) {
|
|
decrRefCount(o);
|
|
atomicDecr(lazyfree_objects,1);
|
|
}
|
|
|
|
/* Release a database from the lazyfree thread. The 'db' pointer is the
|
|
* database which was substituted with a fresh one in the main thread
|
|
* when the database was logically deleted. 'sl' is a skiplist used by
|
|
* Redis Cluster in order to take the hash slots -> keys mapping. This
|
|
* may be NULL if Redis Cluster is disabled. */
|
|
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
|
|
size_t numkeys = dictSize(ht1);
|
|
dictRelease(ht1);
|
|
dictRelease(ht2);
|
|
atomicDecr(lazyfree_objects,numkeys);
|
|
}
|
|
|
|
/* Release the skiplist mapping Redis Cluster keys to slots in the
|
|
* lazyfree thread. */
|
|
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
|
|
size_t len = rt->numele;
|
|
raxFree(rt);
|
|
atomicDecr(lazyfree_objects,len);
|
|
}
|