diff --git a/redis.conf b/redis.conf index af4b4be1f..891bb1663 100644 --- a/redis.conf +++ b/redis.conf @@ -691,7 +691,7 @@ replica-priority 100 # Redis implements server assisted support for client side caching of values. # This is implemented using an invalidation table that remembers, using -# 16 millions of slots, what clients may have certain subsets of keys. In turn +# a radix key indexed by key name, what clients have which keys. In turn # this is used in order to send invalidation messages to clients. Please # check this page to understand more about the feature: # diff --git a/src/aof.c b/src/aof.c index 79b2f1284..2accc781e 100644 --- a/src/aof.c +++ b/src/aof.c @@ -206,7 +206,7 @@ int aofFsyncInProgress(void) { /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { - bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); + bioCreateFsyncJob(fd); } /* Kills an AOFRW child process if exists */ @@ -1909,7 +1909,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_state = AOF_ON; /* Asynchronously close the overwritten AOF. */ - if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); + if (oldfd != -1) bioCreateCloseJob(oldfd); serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); diff --git a/src/bio.c b/src/bio.c index a11bcb18b..c6e17f49d 100644 --- a/src/bio.c +++ b/src/bio.c @@ -78,15 +78,13 @@ static unsigned long long bio_pending[BIO_NUM_OPS]; * file as the API does not expose the internals at all. */ struct bio_job { time_t time; /* Time at which the job was created. */ - /* Job specific arguments pointers. If we need to pass more than three - * arguments we can just pass a pointer to a structure or alike. */ - void *arg1, *arg2, *arg3; + /* Job specific arguments.*/ + int fd; /* Fd for file based background jobs */ + lazy_free_fn *free_fn; /* Function that will free the provided arguments */ + void *free_args[]; /* List of arguments to be passed to the free function */ }; void *bioProcessBackgroundJobs(void *arg); -void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); -void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ @@ -128,13 +126,8 @@ void bioInit(void) { } } -void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { - struct bio_job *job = zmalloc(sizeof(*job)); - +void bioSubmitJob(int type, struct bio_job *job) { job->time = time(NULL); - job->arg1 = arg1; - job->arg2 = arg2; - job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; @@ -142,6 +135,35 @@ void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { pthread_mutex_unlock(&bio_mutex[type]); } +void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { + va_list valist; + /* Allocate memory for the job structure and all required + * arguments */ + struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); + job->free_fn = free_fn; + + va_start(valist, arg_count); + for (int i = 0; i < arg_count; i++) { + job->free_args[i] = va_arg(valist, void *); + } + va_end(valist); + bioSubmitJob(BIO_LAZY_FREE, job); +} + +void bioCreateCloseJob(int fd) { + struct bio_job *job = zmalloc(sizeof(*job)); + job->fd = fd; + + bioSubmitJob(BIO_CLOSE_FILE, job); +} + +void bioCreateFsyncJob(int fd) { + struct bio_job *job = zmalloc(sizeof(*job)); + job->fd = fd; + + bioSubmitJob(BIO_AOF_FSYNC, job); +} + void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; @@ -196,20 +218,11 @@ void *bioProcessBackgroundJobs(void *arg) { /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { - close((long)job->arg1); + close(job->fd); } else if (type == BIO_AOF_FSYNC) { - redis_fsync((long)job->arg1); + redis_fsync(job->fd); } else if (type == BIO_LAZY_FREE) { - /* What we free changes depending on what arguments are set: - * arg1 -> free the object at pointer. - * arg2 & arg3 -> free two dictionaries (a Redis DB). - * only arg3 -> free the radix tree. */ - if (job->arg1) - lazyfreeFreeObjectFromBioThread(job->arg1); - else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); - else if (job->arg3) - lazyfreeFreeSlotsMapFromBioThread(job->arg3); + job->free_fn(job->free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/bio.h b/src/bio.h index 6c2155941..1e6e97297 100644 --- a/src/bio.h +++ b/src/bio.h @@ -30,13 +30,17 @@ #ifndef __BIO_H #define __BIO_H +typedef void lazy_free_fn(void *args[]); + /* Exported API */ void bioInit(void); -void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); unsigned long long bioPendingJobsOfType(int type); unsigned long long bioWaitStepOfType(int type); time_t bioOlderJobOfType(int type); void bioKillThreads(void); +void bioCreateCloseJob(int fd); +void bioCreateFsyncJob(int fd); +void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); /* Background job opcodes */ #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ diff --git a/src/db.c b/src/db.c index d5e8b07d3..64a473bf3 100644 --- a/src/db.c +++ b/src/db.c @@ -433,7 +433,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { /* Make sure the WATCHed keys are affected by the FLUSH* commands. * Note that we need to call the function while the keys are still * there. */ - signalFlushedDb(dbnum); + signalFlushedDb(dbnum, async); /* Empty redis database structure. */ removed = emptyDbStructure(server.db, dbnum, async, callback); @@ -572,9 +572,9 @@ void signalModifiedKey(client *c, redisDb *db, robj *key) { trackingInvalidateKey(c,key); } -void signalFlushedDb(int dbid) { +void signalFlushedDb(int dbid, int async) { touchWatchedKeysOnFlush(dbid); - trackingInvalidateKeysOnFlush(dbid); + trackingInvalidateKeysOnFlush(async); } /*----------------------------------------------------------------------------- diff --git a/src/lazyfree.c b/src/lazyfree.c index 125e6a1b0..8b9f0e2dc 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -6,6 +6,49 @@ static redisAtomic size_t lazyfree_objects = 0; static redisAtomic size_t lazyfreed_objects = 0; +/* Release objects from the lazyfree thread. It's just decrRefCount() + * updating the count of objects to release. */ +void lazyfreeFreeObject(void *args[]) { + robj *o = (robj *) args[0]; + decrRefCount(o); + atomicDecr(lazyfree_objects,1); + atomicIncr(lazyfreed_objects,1); +} + +/* Release a database from the lazyfree thread. The 'db' pointer is the + * database which was substituted with a fresh one in the main thread + * when the database was logically deleted. */ +void lazyfreeFreeDatabase(void *args[]) { + dict *ht1 = (dict *) args[0]; + dict *ht2 = (dict *) args[1]; + + size_t numkeys = dictSize(ht1); + dictRelease(ht1); + dictRelease(ht2); + atomicDecr(lazyfree_objects,numkeys); + atomicIncr(lazyfreed_objects,numkeys); +} + +/* Release the skiplist mapping Redis Cluster keys to slots in the + * lazyfree thread. */ +void lazyfreeFreeSlotsMap(void *args[]) { + rax *rt = args[0]; + size_t len = rt->numele; + raxFree(rt); + atomicDecr(lazyfree_objects,len); + atomicIncr(lazyfreed_objects,len); +} + +/* Release the rax mapping Redis Cluster keys to slots in the + * lazyfree thread. */ +void lazyFreeTrackingTable(void *args[]) { + rax *rt = args[0]; + size_t len = rt->numele; + raxFree(rt); + atomicDecr(lazyfree_objects,len); + atomicIncr(lazyfreed_objects,len); +} + /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { size_t aux; @@ -120,7 +163,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { * equivalent to just calling decrRefCount(). */ if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { atomicIncr(lazyfree_objects,1); - bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); + bioCreateLazyFreeJob(lazyfreeFreeObject,1, val); dictSetVal(db->dict,de,NULL); } } @@ -141,7 +184,7 @@ void freeObjAsync(robj *key, robj *obj) { size_t free_effort = lazyfreeGetFreeEffort(key,obj); if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { atomicIncr(lazyfree_objects,1); - bioCreateBackgroundJob(BIO_LAZY_FREE,obj,NULL,NULL); + bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); } else { decrRefCount(obj); } @@ -155,39 +198,17 @@ void emptyDbAsync(redisDb *db) { db->dict = dictCreate(&dbDictType,NULL); db->expires = dictCreate(&dbExpiresDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); + bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2); } /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ void freeSlotsToKeysMapAsync(rax *rt) { atomicIncr(lazyfree_objects,rt->numele); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt); + bioCreateLazyFreeJob(lazyfreeFreeSlotsMap,1,rt); } -/* Release objects from the lazyfree thread. It's just decrRefCount() - * updating the count of objects to release. */ -void lazyfreeFreeObjectFromBioThread(robj *o) { - decrRefCount(o); - atomicDecr(lazyfree_objects,1); - atomicIncr(lazyfreed_objects,1); -} - -/* Release a database from the lazyfree thread. The 'db' pointer is the - * database which was substituted with a fresh one in the main thread - * when the database was logically deleted. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { - size_t numkeys = dictSize(ht1); - dictRelease(ht1); - dictRelease(ht2); - atomicDecr(lazyfree_objects,numkeys); - atomicIncr(lazyfreed_objects,numkeys); -} - -/* Release the radix tree mapping Redis Cluster keys to slots in the - * lazyfree thread. */ -void lazyfreeFreeSlotsMapFromBioThread(rax *rt) { - size_t len = rt->numele; - raxFree(rt); - atomicDecr(lazyfree_objects,len); - atomicIncr(lazyfreed_objects,len); +/* Free an object, if the object is huge enough, free it in async way. */ +void freeTrackingRadixTreeAsync(rax *tracking) { + atomicIncr(lazyfree_objects,tracking->numele); + bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); } diff --git a/src/replication.c b/src/replication.c index 92d7294f5..8a15c01a7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -102,7 +102,7 @@ int bg_unlink(const char *filename) { errno = old_errno; return -1; } - bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)fd,NULL,NULL); + bioCreateCloseJob(fd); return 0; /* Success. */ } } @@ -1752,7 +1752,7 @@ void readSyncBulkPayload(connection *conn) { return; } /* Close old rdb asynchronously. */ - if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL); + if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd); if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, diff --git a/src/server.h b/src/server.h index 04fd709a8..887f7752e 100644 --- a/src/server.h +++ b/src/server.h @@ -1826,7 +1826,8 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(client *c, robj *keyobj); -void trackingInvalidateKeysOnFlush(int dbid); +void trackingInvalidateKeysOnFlush(int async); +void freeTrackingRadixTreeAsync(rax *rt); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); @@ -2260,7 +2261,7 @@ void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)); int selectDb(client *c, int id); void signalModifiedKey(client *c, redisDb *db, robj *key); -void signalFlushedDb(int dbid); +void signalFlushedDb(int dbid, int async); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); unsigned int countKeysInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); diff --git a/src/tracking.c b/src/tracking.c index 913577eab..852fa229b 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -350,19 +350,22 @@ void trackingInvalidateKey(client *c, robj *keyobj) { } /* This function is called when one or all the Redis databases are - * flushed (dbid == -1 in case of FLUSHALL). Caching keys are not - * specific for each DB but are global: currently what we do is send a - * special notification to clients with tracking enabled, sending a - * RESP NULL, which means, "all the keys", in order to avoid flooding - * clients with many invalidation messages for all the keys they may - * hold. + * flushed. Caching keys are not specific for each DB but are global: + * currently what we do is send a special notification to clients with + * tracking enabled, sending a RESP NULL, which means, "all the keys", + * in order to avoid flooding clients with many invalidation messages + * for all the keys they may hold. */ -void freeTrackingRadixTree(void *rt) { +void freeTrackingRadixTreeCallback(void *rt) { raxFree(rt); } +void freeTrackingRadixTree(rax *rt) { + raxFreeWithCallback(rt,freeTrackingRadixTreeCallback); +} + /* A RESP NULL is sent to indicate that all keys are invalid */ -void trackingInvalidateKeysOnFlush(int dbid) { +void trackingInvalidateKeysOnFlush(int async) { if (server.tracking_clients) { listNode *ln; listIter li; @@ -376,8 +379,12 @@ void trackingInvalidateKeysOnFlush(int dbid) { } /* In case of FLUSHALL, reclaim all the memory used by tracking. */ - if (dbid == -1 && TrackingTable) { - raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); + if (TrackingTable) { + if (async) { + freeTrackingRadixTreeAsync(TrackingTable); + } else { + freeTrackingRadixTree(TrackingTable); + } TrackingTable = raxNew(); TrackingTableTotalItems = 0; }