Tracking: send eviction messages when evicting entries.

A fix for #7249.
This commit is contained in:
antirez 2020-05-14 10:41:28 +02:00
parent b726d64229
commit a4d0720fa1
2 changed files with 29 additions and 12 deletions

View File

@ -2053,6 +2053,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Stop the I/O threads if we don't have enough pending work. */ /* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded(); stopThreadedIOIfNeeded();
/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
* executed changes the value via CONFIG SET, the server will perform
* the operation even if completely idle. */
if (server.tracking_clients) trackingLimitUsedSlots();
/* Start a scheduled BGSAVE if the corresponding flag is set. This is /* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF * useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress. * rewrite is in progress.

View File

@ -279,15 +279,22 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
* *
* Note that 'c' may be NULL in case the operation was performed outside the * Note that 'c' may be NULL in case the operation was performed outside the
* context of a client modifying the database (for instance when we delete a * context of a client modifying the database (for instance when we delete a
* key because of expire). */ * key because of expire).
void trackingInvalidateKey(client *c, robj *keyobj) { *
* The last argument 'bcast' tells the function if it should also schedule
* the key for broadcasting to clients in BCAST mode. This is the case when
* the function is called from the Redis core once a key is modified, however
* we also call the function in order to evict keys in the key table in case
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
if (TrackingTable == NULL) return; if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
if (raxSize(PrefixTable) > 0) if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey)); trackingRememberKeyToBroadcast(c,key,keylen);
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen);
if (ids == raxNotFound) return; if (ids == raxNotFound) return;
raxIterator ri; raxIterator ri;
@ -317,7 +324,7 @@ void trackingInvalidateKey(client *c, robj *keyobj) {
continue; continue;
} }
sendTrackingMessage(target,sdskey,sdslen(sdskey),0); sendTrackingMessage(target,key,keylen,0);
} }
raxStop(&ri); raxStop(&ri);
@ -325,7 +332,13 @@ void trackingInvalidateKey(client *c, robj *keyobj) {
* again if more keys will be modified in this caching slot. */ * again if more keys will be modified in this caching slot. */
TrackingTableTotalItems -= raxSize(ids); TrackingTableTotalItems -= raxSize(ids);
raxFree(ids); raxFree(ids);
raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}
/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1);
} }
/* This function is called when one or all the Redis databases are flushed /* This function is called when one or all the Redis databases are flushed
@ -392,10 +405,8 @@ void trackingLimitUsedSlots(void) {
effort--; effort--;
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0); raxRandomWalk(&ri,0);
rax *ids = ri.data; if (raxEOF(&ri)) break;
TrackingTableTotalItems -= raxSize(ids); trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0);
raxFree(ids);
raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
if (raxSize(TrackingTable) <= max_keys) { if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0; timeout_counter = 0;
raxStop(&ri); raxStop(&ri);