Cleanup key tracking documentation and table management (#8039)

Cleanup key tracking documentation, always cleanup the tracking table, and free the tracking table in an async manner when applicable.
This commit is contained in:
Madelyn Olson 2020-12-23 19:13:12 -08:00 committed by GitHub
parent efaf09ee4b
commit 59ff42c421
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 121 additions and 75 deletions

View File

@ -691,7 +691,7 @@ replica-priority 100
# Redis implements server assisted support for client side caching of values. # Redis implements server assisted support for client side caching of values.
# This is implemented using an invalidation table that remembers, using # This is implemented using an invalidation table that remembers, using
# 16 millions of slots, what clients may have certain subsets of keys. In turn # a radix key indexed by key name, what clients have which keys. In turn
# this is used in order to send invalidation messages to clients. Please # this is used in order to send invalidation messages to clients. Please
# check this page to understand more about the feature: # check this page to understand more about the feature:
# #

View File

@ -206,7 +206,7 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified /* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */ * file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) { void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); bioCreateFsyncJob(fd);
} }
/* Kills an AOFRW child process if exists */ /* Kills an AOFRW child process if exists */
@ -1909,7 +1909,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
server.aof_state = AOF_ON; server.aof_state = AOF_ON;
/* Asynchronously close the overwritten AOF. */ /* Asynchronously close the overwritten AOF. */
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); if (oldfd != -1) bioCreateCloseJob(oldfd);
serverLog(LL_VERBOSE, serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now); "Background AOF rewrite signal handler took %lldus", ustime()-now);

View File

@ -78,15 +78,13 @@ static unsigned long long bio_pending[BIO_NUM_OPS];
* file as the API does not expose the internals at all. */ * file as the API does not expose the internals at all. */
struct bio_job { struct bio_job {
time_t time; /* Time at which the job was created. */ time_t time; /* Time at which the job was created. */
/* Job specific arguments pointers. If we need to pass more than three /* Job specific arguments.*/
* arguments we can just pass a pointer to a structure or alike. */ int fd; /* Fd for file based background jobs */
void *arg1, *arg2, *arg3; lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
}; };
void *bioProcessBackgroundJobs(void *arg); void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
/* Make sure we have enough stack to perform all the things we do in the /* Make sure we have enough stack to perform all the things we do in the
* main thread. */ * main thread. */
@ -128,13 +126,8 @@ void bioInit(void) {
} }
} }
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { void bioSubmitJob(int type, struct bio_job *job) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL); job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]); pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job); listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++; bio_pending[type]++;
@ -142,6 +135,35 @@ void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
pthread_mutex_unlock(&bio_mutex[type]); pthread_mutex_unlock(&bio_mutex[type]);
} }
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
va_list valist;
/* Allocate memory for the job structure and all required
* arguments */
struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
job->free_fn = free_fn;
va_start(valist, arg_count);
for (int i = 0; i < arg_count; i++) {
job->free_args[i] = va_arg(valist, void *);
}
va_end(valist);
bioSubmitJob(BIO_LAZY_FREE, job);
}
void bioCreateCloseJob(int fd) {
struct bio_job *job = zmalloc(sizeof(*job));
job->fd = fd;
bioSubmitJob(BIO_CLOSE_FILE, job);
}
void bioCreateFsyncJob(int fd) {
struct bio_job *job = zmalloc(sizeof(*job));
job->fd = fd;
bioSubmitJob(BIO_AOF_FSYNC, job);
}
void *bioProcessBackgroundJobs(void *arg) { void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job; struct bio_job *job;
unsigned long type = (unsigned long) arg; unsigned long type = (unsigned long) arg;
@ -196,20 +218,11 @@ void *bioProcessBackgroundJobs(void *arg) {
/* Process the job accordingly to its type. */ /* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) { if (type == BIO_CLOSE_FILE) {
close((long)job->arg1); close(job->fd);
} else if (type == BIO_AOF_FSYNC) { } else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1); redis_fsync(job->fd);
} else if (type == BIO_LAZY_FREE) { } else if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are set: job->free_fn(job->free_args);
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the radix tree. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else { } else {
serverPanic("Wrong job type in bioProcessBackgroundJobs()."); serverPanic("Wrong job type in bioProcessBackgroundJobs().");
} }

View File

@ -30,13 +30,17 @@
#ifndef __BIO_H #ifndef __BIO_H
#define __BIO_H #define __BIO_H
typedef void lazy_free_fn(void *args[]);
/* Exported API */ /* Exported API */
void bioInit(void); void bioInit(void);
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
unsigned long long bioPendingJobsOfType(int type); unsigned long long bioPendingJobsOfType(int type);
unsigned long long bioWaitStepOfType(int type); unsigned long long bioWaitStepOfType(int type);
time_t bioOlderJobOfType(int type); time_t bioOlderJobOfType(int type);
void bioKillThreads(void); void bioKillThreads(void);
void bioCreateCloseJob(int fd);
void bioCreateFsyncJob(int fd);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
/* Background job opcodes */ /* Background job opcodes */
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */

View File

@ -433,7 +433,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
/* Make sure the WATCHed keys are affected by the FLUSH* commands. /* Make sure the WATCHed keys are affected by the FLUSH* commands.
* Note that we need to call the function while the keys are still * Note that we need to call the function while the keys are still
* there. */ * there. */
signalFlushedDb(dbnum); signalFlushedDb(dbnum, async);
/* Empty redis database structure. */ /* Empty redis database structure. */
removed = emptyDbStructure(server.db, dbnum, async, callback); removed = emptyDbStructure(server.db, dbnum, async, callback);
@ -572,9 +572,9 @@ void signalModifiedKey(client *c, redisDb *db, robj *key) {
trackingInvalidateKey(c,key); trackingInvalidateKey(c,key);
} }
void signalFlushedDb(int dbid) { void signalFlushedDb(int dbid, int async) {
touchWatchedKeysOnFlush(dbid); touchWatchedKeysOnFlush(dbid);
trackingInvalidateKeysOnFlush(dbid); trackingInvalidateKeysOnFlush(async);
} }
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------

View File

@ -6,6 +6,49 @@
static redisAtomic size_t lazyfree_objects = 0; static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0; static redisAtomic size_t lazyfreed_objects = 0;
/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObject(void *args[]) {
robj *o = (robj *) args[0];
decrRefCount(o);
atomicDecr(lazyfree_objects,1);
atomicIncr(lazyfreed_objects,1);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) {
dict *ht1 = (dict *) args[0];
dict *ht2 = (dict *) args[1];
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
}
/* Release the skiplist mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyfreeFreeSlotsMap(void *args[]) {
rax *rt = args[0];
size_t len = rt->numele;
raxFree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release the rax mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = args[0];
size_t len = rt->numele;
raxFree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Return the number of currently pending objects to free. */ /* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) { size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux; size_t aux;
@ -120,7 +163,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
* equivalent to just calling decrRefCount(). */ * equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1); atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
dictSetVal(db->dict,de,NULL); dictSetVal(db->dict,de,NULL);
} }
} }
@ -141,7 +184,7 @@ void freeObjAsync(robj *key, robj *obj) {
size_t free_effort = lazyfreeGetFreeEffort(key,obj); size_t free_effort = lazyfreeGetFreeEffort(key,obj);
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects,1); atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,obj,NULL,NULL); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
} else { } else {
decrRefCount(obj); decrRefCount(obj);
} }
@ -155,39 +198,17 @@ void emptyDbAsync(redisDb *db) {
db->dict = dictCreate(&dbDictType,NULL); db->dict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&dbExpiresDictType,NULL); db->expires = dictCreate(&dbExpiresDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
} }
/* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */
void freeSlotsToKeysMapAsync(rax *rt) { void freeSlotsToKeysMapAsync(rax *rt) {
atomicIncr(lazyfree_objects,rt->numele); atomicIncr(lazyfree_objects,rt->numele);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt); bioCreateLazyFreeJob(lazyfreeFreeSlotsMap,1,rt);
} }
/* Release objects from the lazyfree thread. It's just decrRefCount() /* Free an object, if the object is huge enough, free it in async way. */
* updating the count of objects to release. */ void freeTrackingRadixTreeAsync(rax *tracking) {
void lazyfreeFreeObjectFromBioThread(robj *o) { atomicIncr(lazyfree_objects,tracking->numele);
decrRefCount(o); bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
atomicDecr(lazyfree_objects,1);
atomicIncr(lazyfreed_objects,1);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
}
/* Release the radix tree mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
size_t len = rt->numele;
raxFree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
} }

View File

@ -102,7 +102,7 @@ int bg_unlink(const char *filename) {
errno = old_errno; errno = old_errno;
return -1; return -1;
} }
bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)fd,NULL,NULL); bioCreateCloseJob(fd);
return 0; /* Success. */ return 0; /* Success. */
} }
} }
@ -1752,7 +1752,7 @@ void readSyncBulkPayload(connection *conn) {
return; return;
} }
/* Close old rdb asynchronously. */ /* Close old rdb asynchronously. */
if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL); if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd);
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING, serverLog(LL_WARNING,

View File

@ -1826,7 +1826,8 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr
void disableTracking(client *c); void disableTracking(client *c);
void trackingRememberKeys(client *c); void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj); void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid); void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTreeAsync(rax *rt);
void trackingLimitUsedSlots(void); void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalItems(void);
uint64_t trackingGetTotalKeys(void); uint64_t trackingGetTotalKeys(void);
@ -2260,7 +2261,7 @@ void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*));
int selectDb(client *c, int id); int selectDb(client *c, int id);
void signalModifiedKey(client *c, redisDb *db, robj *key); void signalModifiedKey(client *c, redisDb *db, robj *key);
void signalFlushedDb(int dbid); void signalFlushedDb(int dbid, int async);
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot);

View File

@ -350,19 +350,22 @@ void trackingInvalidateKey(client *c, robj *keyobj) {
} }
/* This function is called when one or all the Redis databases are /* This function is called when one or all the Redis databases are
* flushed (dbid == -1 in case of FLUSHALL). Caching keys are not * flushed. Caching keys are not specific for each DB but are global:
* specific for each DB but are global: currently what we do is send a * currently what we do is send a special notification to clients with
* special notification to clients with tracking enabled, sending a * tracking enabled, sending a RESP NULL, which means, "all the keys",
* RESP NULL, which means, "all the keys", in order to avoid flooding * in order to avoid flooding clients with many invalidation messages
* clients with many invalidation messages for all the keys they may * for all the keys they may hold.
* hold.
*/ */
void freeTrackingRadixTree(void *rt) { void freeTrackingRadixTreeCallback(void *rt) {
raxFree(rt); raxFree(rt);
} }
void freeTrackingRadixTree(rax *rt) {
raxFreeWithCallback(rt,freeTrackingRadixTreeCallback);
}
/* A RESP NULL is sent to indicate that all keys are invalid */ /* A RESP NULL is sent to indicate that all keys are invalid */
void trackingInvalidateKeysOnFlush(int dbid) { void trackingInvalidateKeysOnFlush(int async) {
if (server.tracking_clients) { if (server.tracking_clients) {
listNode *ln; listNode *ln;
listIter li; listIter li;
@ -376,8 +379,12 @@ void trackingInvalidateKeysOnFlush(int dbid) {
} }
/* In case of FLUSHALL, reclaim all the memory used by tracking. */ /* In case of FLUSHALL, reclaim all the memory used by tracking. */
if (dbid == -1 && TrackingTable) { if (TrackingTable) {
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); if (async) {
freeTrackingRadixTreeAsync(TrackingTable);
} else {
freeTrackingRadixTree(TrackingTable);
}
TrackingTable = raxNew(); TrackingTable = raxNew();
TrackingTableTotalItems = 0; TrackingTableTotalItems = 0;
} }