Tracking: first conversion from hashing to key names.
This commit is contained in:
parent
9c00bdd86e
commit
92357b2d61
@ -4221,7 +4221,7 @@ sds genRedisInfoString(const char *section) {
|
||||
"active_defrag_misses:%lld\r\n"
|
||||
"active_defrag_key_hits:%lld\r\n"
|
||||
"active_defrag_key_misses:%lld\r\n"
|
||||
"tracking_used_slots:%lld\r\n",
|
||||
"tracking_tracked_keys:%lld\r\n",
|
||||
server.stat_numconnections,
|
||||
server.stat_numcommands,
|
||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||
@ -4249,7 +4249,7 @@ sds genRedisInfoString(const char *section) {
|
||||
server.stat_active_defrag_misses,
|
||||
server.stat_active_defrag_key_hits,
|
||||
server.stat_active_defrag_key_misses,
|
||||
trackingGetUsedSlots());
|
||||
(unsigned long long) trackingGetTotalItems());
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
@ -1654,7 +1654,7 @@ void trackingRememberKeys(client *c);
|
||||
void trackingInvalidateKey(robj *keyobj);
|
||||
void trackingInvalidateKeysOnFlush(int dbid);
|
||||
void trackingLimitUsedSlots(void);
|
||||
unsigned long long trackingGetUsedSlots(void);
|
||||
uint64_t trackingGetTotalItems(void);
|
||||
|
||||
/* List data type */
|
||||
void listTypeTryConversion(robj *subject, robj *value);
|
||||
|
206
src/tracking.c
206
src/tracking.c
@ -30,37 +30,22 @@
|
||||
|
||||
#include "server.h"
|
||||
|
||||
/* The tracking table is constituted by 2^24 radix trees (each tree, and the
|
||||
* table itself, are allocated in a lazy way only when needed) tracking
|
||||
* clients that may have certain keys in their local, client side, cache.
|
||||
*
|
||||
* Keys are grouped into 2^24 slots, in a way similar to Redis Cluster hash
|
||||
* slots, however here the function we use is crc64, taking the least
|
||||
* significant 24 bits of the output.
|
||||
/* The tracking table is constituted by a radix tree of keys, each pointing
|
||||
* to a radix tree of client IDs, used to track the clients that may have
|
||||
* certain keys in their local, client side, cache.
|
||||
*
|
||||
* When a client enables tracking with "CLIENT TRACKING on", each key served to
|
||||
* the client is hashed to one of such slots, and Redis will remember what
|
||||
* client may have keys about such slot. Later, when a key in a given slot is
|
||||
* modified, all the clients that may have local copies of keys in that slot
|
||||
* will receive an invalidation message. There is no distinction of database
|
||||
* number: a single table is used.
|
||||
* the client is remembered in the table mapping the keys to the client IDs.
|
||||
* Later, when a key is modified, all the clients that may have local copy
|
||||
* of such key will receive an invalidation message.
|
||||
*
|
||||
* Clients will normally take frequently requested objects in memory, removing
|
||||
* them when invalidation messages are received. A strategy clients may use is
|
||||
* to just cache objects in a dictionary, associating to each cached object
|
||||
* some incremental epoch, or just a timestamp. When invalidation messages are
|
||||
* received clients may store, in a different table, the timestamp (or epoch)
|
||||
* of the invalidation of such given slot: later when accessing objects, the
|
||||
* eviction of stale objects may be performed in a lazy way by checking if the
|
||||
* cached object timestamp is older than the invalidation timestamp for such
|
||||
* objects.
|
||||
*
|
||||
* The output of the 24 bit hash function is very large (more than 16 million
|
||||
* possible slots), so clients that may want to use less resources may only
|
||||
* use the most significant bits instead of the full 24 bits. */
|
||||
#define TRACKING_TABLE_SIZE (1<<24)
|
||||
rax **TrackingTable = NULL;
|
||||
unsigned long TrackingTableUsedSlots = 0;
|
||||
* them when invalidation messages are received. */
|
||||
rax *TrackingTable = NULL;
|
||||
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
||||
the whole tracking table. This givesn
|
||||
an hint about the total memory we
|
||||
are using server side for CSC. */
|
||||
robj *TrackingChannelName;
|
||||
|
||||
/* Remove the tracking state from the client 'c'. Note that there is not much
|
||||
@ -90,7 +75,7 @@ void enableTracking(client *c, uint64_t redirect_to) {
|
||||
c->client_tracking_redirection = redirect_to;
|
||||
server.tracking_clients++;
|
||||
if (TrackingTable == NULL) {
|
||||
TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
|
||||
TrackingTable = raxNew();
|
||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||
}
|
||||
}
|
||||
@ -108,19 +93,20 @@ void trackingRememberKeys(client *c) {
|
||||
for(int j = 0; j < numkeys; j++) {
|
||||
int idx = keys[j];
|
||||
sds sdskey = c->argv[idx]->ptr;
|
||||
uint64_t hash = crc64(0,
|
||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
||||
if (TrackingTable[hash] == NULL) {
|
||||
TrackingTable[hash] = raxNew();
|
||||
TrackingTableUsedSlots++;
|
||||
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||
if (ids == raxNotFound) {
|
||||
ids = raxNew();
|
||||
int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey,
|
||||
sdslen(sdskey),ids, NULL);
|
||||
serverAssert(inserted == 1);
|
||||
}
|
||||
raxTryInsert(TrackingTable[hash],
|
||||
(unsigned char*)&c->id,sizeof(c->id),NULL,NULL);
|
||||
if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL))
|
||||
TrackingTableTotalItems++;
|
||||
}
|
||||
getKeysFreeResult(keys);
|
||||
}
|
||||
|
||||
void sendTrackingMessage(client *c, long long hash) {
|
||||
void sendTrackingMessage(client *c, char *keyname, size_t keylen) {
|
||||
int using_redirection = 0;
|
||||
if (c->client_tracking_redirection) {
|
||||
client *redir = lookupClientByID(c->client_tracking_redirection);
|
||||
@ -146,49 +132,44 @@ void sendTrackingMessage(client *c, long long hash) {
|
||||
if (c->resp > 2) {
|
||||
addReplyPushLen(c,2);
|
||||
addReplyBulkCBuffer(c,"invalidate",10);
|
||||
addReplyLongLong(c,hash);
|
||||
addReplyBulkCBuffer(c,keyname,keylen);
|
||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||
robj *msg = createStringObjectFromLongLong(hash);
|
||||
addReplyPubsubMessage(c,TrackingChannelName,msg);
|
||||
decrRefCount(msg);
|
||||
/* We use a static object to speedup things, however we assume
|
||||
* that addReplyPubsubMessage() will not take a reference. */
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,keyname);
|
||||
addReplyPubsubMessage(c,TrackingChannelName,&keyobj);
|
||||
serverAssert(keyobj.refcount == 1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Invalidates a caching slot: this is actually the low level implementation
|
||||
* of the API that Redis calls externally, that is trackingInvalidateKey(). */
|
||||
void trackingInvalidateSlot(uint64_t slot) {
|
||||
if (TrackingTable == NULL || TrackingTable[slot] == NULL) return;
|
||||
|
||||
raxIterator ri;
|
||||
raxStart(&ri,TrackingTable[slot]);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id;
|
||||
memcpy(&id,ri.key,sizeof(id));
|
||||
client *c = lookupClientByID(id);
|
||||
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
||||
sendTrackingMessage(c,slot);
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
||||
/* Free the tracking table: we'll create the radix tree and populate it
|
||||
* again if more keys will be modified in this caching slot. */
|
||||
raxFree(TrackingTable[slot]);
|
||||
TrackingTable[slot] = NULL;
|
||||
TrackingTableUsedSlots--;
|
||||
}
|
||||
|
||||
/* This function is called from signalModifiedKey() or other places in Redis
|
||||
* when a key changes value. In the context of keys tracking, our task here is
|
||||
* to send a notification to every client that may have keys about such caching
|
||||
* slot. */
|
||||
void trackingInvalidateKey(robj *keyobj) {
|
||||
if (TrackingTable == NULL || TrackingTableUsedSlots == 0) return;
|
||||
|
||||
if (TrackingTable == NULL) return;
|
||||
sds sdskey = keyobj->ptr;
|
||||
uint64_t hash = crc64(0,
|
||||
(unsigned char*)sdskey,sdslen(sdskey))&(TRACKING_TABLE_SIZE-1);
|
||||
trackingInvalidateSlot(hash);
|
||||
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||
if (ids == raxNotFound) return;;
|
||||
|
||||
raxIterator ri;
|
||||
raxStart(&ri,ids);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id;
|
||||
memcpy(&id,ri.key,sizeof(id));
|
||||
client *c = lookupClientByID(id);
|
||||
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
|
||||
sendTrackingMessage(c,sdskey,sdslen(sdskey));
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
||||
/* Free the tracking table: we'll create the radix tree and populate it
|
||||
* again if more keys will be modified in this caching slot. */
|
||||
TrackingTableTotalItems -= raxSize(ids);
|
||||
raxFree(ids);
|
||||
raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
|
||||
}
|
||||
|
||||
/* This function is called when one or all the Redis databases are flushed
|
||||
@ -205,6 +186,10 @@ void trackingInvalidateKey(robj *keyobj) {
|
||||
* we just send the invalidation message to all the clients, but don't
|
||||
* flush the table: it will slowly get garbage collected as more keys
|
||||
* are modified in the used caching slots. */
|
||||
void freeTrackingRadixTree(void *rt) {
|
||||
raxFree(rt);
|
||||
}
|
||||
|
||||
void trackingInvalidateKeysOnFlush(int dbid) {
|
||||
if (server.tracking_clients) {
|
||||
listNode *ln;
|
||||
@ -213,84 +198,69 @@ void trackingInvalidateKeysOnFlush(int dbid) {
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = listNodeValue(ln);
|
||||
if (c->flags & CLIENT_TRACKING) {
|
||||
sendTrackingMessage(c,-1);
|
||||
sendTrackingMessage(c,"",1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* In case of FLUSHALL, reclaim all the memory used by tracking. */
|
||||
if (dbid == -1 && TrackingTable) {
|
||||
for (int j = 0; j < TRACKING_TABLE_SIZE && TrackingTableUsedSlots > 0; j++) {
|
||||
if (TrackingTable[j] != NULL) {
|
||||
raxFree(TrackingTable[j]);
|
||||
TrackingTable[j] = NULL;
|
||||
TrackingTableUsedSlots--;
|
||||
}
|
||||
}
|
||||
|
||||
/* If there are no clients with tracking enabled, we can even
|
||||
* reclaim the memory used by the table itself. The code assumes
|
||||
* the table is allocated only if there is at least one client alive
|
||||
* with tracking enabled. */
|
||||
if (server.tracking_clients == 0) {
|
||||
zfree(TrackingTable);
|
||||
TrackingTable = NULL;
|
||||
}
|
||||
raxFreeWithCallback(TrackingTable,freeTrackingRadixTree);
|
||||
TrackingTableTotalItems = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Tracking forces Redis to remember information about which client may have
|
||||
* keys about certian caching slots. In workloads where there are a lot of
|
||||
* reads, but keys are hardly modified, the amount of information we have
|
||||
* to remember server side could be a lot: for each 16 millions of caching
|
||||
* slots we may end with a radix tree containing many entries.
|
||||
* certain keys. In workloads where there are a lot of reads, but keys are
|
||||
* hardly modified, the amount of information we have to remember server side
|
||||
* could be a lot, with the number of keys being totally not bound.
|
||||
*
|
||||
* So Redis allows the user to configure a maximum fill rate for the
|
||||
* So Redis allows the user to configure a maximum number of keys for the
|
||||
* invalidation table. This function makes sure that we don't go over the
|
||||
* specified fill rate: if we are over, we can just evict informations about
|
||||
* random caching slots, and send invalidation messages to clients like if
|
||||
* the key was modified. */
|
||||
* a random key, and send invalidation messages to clients like if the key was
|
||||
* modified. */
|
||||
void trackingLimitUsedSlots(void) {
|
||||
static unsigned int timeout_counter = 0;
|
||||
|
||||
if (TrackingTable == NULL) return;
|
||||
if (server.tracking_table_max_fill == 0) return; /* No limits set. */
|
||||
unsigned int max_slots =
|
||||
(TRACKING_TABLE_SIZE/100) * server.tracking_table_max_fill;
|
||||
if (TrackingTableUsedSlots <= max_slots) {
|
||||
size_t max_keys = server.tracking_table_max_fill;
|
||||
if (raxSize(TrackingTable) <= max_keys) {
|
||||
timeout_counter = 0;
|
||||
return; /* Limit not reached. */
|
||||
}
|
||||
|
||||
/* We have to invalidate a few slots to reach the limit again. The effort
|
||||
/* We have to invalidate a few keys to reach the limit again. The effort
|
||||
* we do here is proportional to the number of times we entered this
|
||||
* function and found that we are still over the limit. */
|
||||
int effort = 100 * (timeout_counter+1);
|
||||
|
||||
/* Let's start at a random position, and perform linear probing, in order
|
||||
* to improve cache locality. However once we are able to find an used
|
||||
* slot, jump again randomly, in order to avoid creating big holes in the
|
||||
* table (that will make this funciton use more resourced later). */
|
||||
/* We just remove one key after another by using a random walk. */
|
||||
raxIterator ri;
|
||||
raxStart(&ri,TrackingTable);
|
||||
while(effort > 0) {
|
||||
unsigned int idx = rand() % TRACKING_TABLE_SIZE;
|
||||
do {
|
||||
effort--;
|
||||
idx = (idx+1) % TRACKING_TABLE_SIZE;
|
||||
if (TrackingTable[idx] != NULL) {
|
||||
trackingInvalidateSlot(idx);
|
||||
if (TrackingTableUsedSlots <= max_slots) {
|
||||
timeout_counter = 0;
|
||||
return; /* Return ASAP: we are again under the limit. */
|
||||
} else {
|
||||
break; /* Jump to next random position. */
|
||||
}
|
||||
}
|
||||
} while(effort > 0);
|
||||
effort--;
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
raxRandomWalk(&ri,0);
|
||||
rax *ids = ri.data;
|
||||
TrackingTableTotalItems -= raxSize(ids);
|
||||
raxFree(ids);
|
||||
raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
|
||||
if (raxSize(TrackingTable) <= max_keys) {
|
||||
timeout_counter = 0;
|
||||
raxStop(&ri);
|
||||
return; /* Return ASAP: we are again under the limit. */
|
||||
}
|
||||
}
|
||||
|
||||
/* If we reach this point, we were not able to go under the configured
|
||||
* limit using the maximum effort we had for this run. */
|
||||
raxStop(&ri);
|
||||
timeout_counter++;
|
||||
}
|
||||
|
||||
/* This is just used in order to access the amount of used slots in the
|
||||
* tracking table. */
|
||||
unsigned long long trackingGetUsedSlots(void) {
|
||||
return TrackingTableUsedSlots;
|
||||
uint64_t trackingGetTotalItems(void) {
|
||||
return TrackingTableTotalItems;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user