Make tracking invalidation messages always after command's reply (#9422)

Tracking invalidation messages were sometimes sent in inconsistent order,
before the command's reply rather than after.
In addition to that, they were sometimes embedded inside other commands
responses, like MULTI-EXEC and MGET.
This commit is contained in:
Huang Zhw 2021-10-07 20:13:42 +08:00 committed by GitHub
parent d98d1ad574
commit fd135f3e2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 182 additions and 12 deletions

View File

@ -295,6 +295,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* call. */
if (dstkey) incrRefCount(dstkey);
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
serveClientBlockedOnList(receiver, o,
@ -303,6 +305,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
&deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
if (dstkey) decrRefCount(dstkey);
@ -343,11 +347,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
? 1 : 0;
int reply_nil_when_empty = use_nested_array;
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
/* Replicate the command. */
int argc = 2;
@ -442,6 +450,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
@ -470,6 +480,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
}
}
}
@ -514,12 +526,16 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
moduleUnblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
}
}
}

View File

@ -570,7 +570,7 @@ long long dbTotalServerKeyCount() {
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
trackingInvalidateKey(c,key,1);
}
void signalFlushedDb(int dbid, int async) {

View File

@ -2933,6 +2933,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* our clients. */
updateFailoverStatus();
/* Since we rely on current_client to send scheduled invalidation messages
* we have to flush them after each command, so when we get here, the list
* must be empty. */
serverAssert(listLength(server.tracking_pending_keys) == 0);
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
@ -3663,6 +3668,7 @@ void initServer(void) {
server.current_client = NULL;
server.errors = raxNew();
server.fixed_time_expire = 0;
server.in_nested_call = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
@ -3675,6 +3681,7 @@ void initServer(void) {
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.tracking_pending_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.client_pause_type = CLIENT_PAUSE_OFF;
@ -4322,6 +4329,7 @@ void call(client *c, int flags) {
if (server.fixed_time_expire++ == 0) {
updateCachedTime(0);
}
server.in_nested_call++;
elapsedStart(&call_timer);
c->cmd->proc(c);
@ -4330,6 +4338,8 @@ void call(client *c, int flags) {
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
server.in_nested_call--;
/* Update failed command calls if required.
* We leverage a static variable (prev_err_count) to retain
* the counter across nested function calls and avoid logging
@ -4504,6 +4514,9 @@ void call(client *c, int flags) {
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;
/* Do some maintenance job and cleanup */
afterCommand(c);
}
/* Used when a command that is ready for execution needs to be rejected, due to
@ -4541,6 +4554,14 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
}
}
/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
UNUSED(c);
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (!server.in_nested_call) trackingHandlePendingKeyInvalidations();
}
/* Returns 1 for commands that may have key names in their arguments, but the legacy range
* spec doesn't cover all of them. */
void populateCommandMovableKeys(struct redisCommand *cmd) {
@ -4713,6 +4734,13 @@ int processCommand(client *c) {
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = (performEvictions() == EVICT_FAIL);
/* performEvictions may evict keys, so we need flush pending tracking
* invalidation keys. If we don't do this, we may get an invalidation
* message after we perform operation on the key, where in fact this
* message belongs to the old value of the key before it gets evicted.*/
trackingHandlePendingKeyInvalidations();
/* performEvictions may flush slave output buffers. This may result
* in a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;

View File

@ -1314,6 +1314,7 @@ struct redisServer {
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
int in_nested_call; /* If > 0, in a nested call of a call */
rax *clients_index; /* Active clients dictionary by client ID. */
pause_type client_pause_type; /* True if clients are currently paused */
list *paused_clients; /* List of pause clients */
@ -1598,6 +1599,7 @@ struct redisServer {
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
list *tracking_pending_keys; /* tracking invalidation keys pending to flush */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
@ -2108,7 +2110,9 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKey(client *c, robj *keyobj, int bcast);
void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj);
void trackingHandlePendingKeyInvalidations(void);
void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt);
@ -2432,6 +2436,8 @@ void preventCommandAOF(client *c);
void preventCommandReplication(client *c);
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
int prepareForShutdown(int flags);
void afterCommand(client *c);
int inNestedCall(void);
#ifdef __GNUC__
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));

View File

@ -348,13 +348,16 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
if (TrackingTable == NULL) return;
if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,key,keylen);
unsigned char *key = (unsigned char*)keyobj->ptr;
size_t keylen = sdslen(keyobj->ptr);
rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen);
if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,(char *)key,keylen);
rax *ids = raxFind(TrackingTable,key,keylen);
if (ids == raxNotFound) return;
raxIterator ri;
@ -384,7 +387,15 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
continue;
}
sendTrackingMessage(target,key,keylen,0);
/* If target is current client, we need schedule key invalidation.
* As the invalidation messages may be interleaved with command
* response and should after command response */
if (target == server.current_client){
incrRefCount(keyobj);
listAddNodeTail(server.tracking_pending_keys, keyobj);
} else {
sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0);
}
}
raxStop(&ri);
@ -395,10 +406,22 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}
/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1);
void trackingHandlePendingKeyInvalidations() {
if (!listLength(server.tracking_pending_keys)) return;
listNode *ln;
listIter li;
listRewind(server.tracking_pending_keys,&li);
while ((ln = listNext(&li)) != NULL) {
robj *key = listNodeValue(ln);
/* current_client maybe freed, so we need to send invalidation
* message only when current_client is still alive */
if (server.current_client != NULL)
sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0);
decrRefCount(key);
}
listEmpty(server.tracking_pending_keys);
}
/* This function is called when one or all the Redis databases are
@ -475,7 +498,9 @@ void trackingLimitUsedSlots(void) {
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
if (raxEOF(&ri)) break;
trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0);
robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
trackingInvalidateKey(NULL,keyobj,0);
decrRefCount(keyobj);
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
raxStop(&ri);

View File

@ -369,6 +369,101 @@ start_server {tags {"tracking network"}} {
$r CLIENT TRACKING OFF
}
test {hdel deliver invlidate message after response in the same connection} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r HSET myhash f 1
r HGET myhash f
set res [r HDEL myhash f]
assert_equal $res 1
set res [r read]
assert_equal $res {invalidate myhash}
}
test {Tracking invalidation message is not interleaved with multiple keys response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
# We need disable active expire, so we can trigger lazy expire
r DEBUG SET-ACTIVE-EXPIRE 0
r MULTI
r MSET x{t} 1 y{t} 2
r PEXPIRE y{t} 100
r GET y{t}
r EXEC
after 110
# Read expired key y{t}, generate invalidate message about this key
set res [r MGET x{t} y{t}]
assert_equal $res {1 {}}
# Consume the invalidate message which is after command response
set res [r read]
assert_equal $res {invalidate y{t}}
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
test {Tracking invalidation message is not interleaved with transaction response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r MSET a{t} 1 b{t} 2
r GET a{t}
# Start a transaction, make a{t} generate an invalidate message
r MULTI
r INCR a{t}
r GET b{t}
set res [r EXEC]
assert_equal $res {2 2}
set res [r read]
# Consume the invalidate message which is after command response
assert_equal $res {invalidate a{t}}
}
test {Tracking invalidation message of eviction keys should be before response} {
# Get the current memory limit and calculate a new limit.
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
set used [s used_memory]
set limit [expr {$used+100*1024}]
set old_policy [lindex [r config get maxmemory-policy] 1]
r config set maxmemory $limit
# We set policy volatile-random, so only keys with ttl will be evicted
r config set maxmemory-policy volatile-random
# Add a volatile key and tracking it.
r setex volatile-key 10000 x
r get volatile-key
# We use SETBIT here, so we can set a big key and get the used_memory
# bigger than maxmemory. Next command will evict volatile keys. We
# can't use SET, as SET uses big input buffer, so it will fail.
r setbit big-key 1000000 0
# volatile-key is evicted before response.
set res [r getbit big-key 0]
assert_equal $res {invalidate volatile-key}
set res [r read]
assert_equal $res 0
r config set maxmemory-policy $old_policy
r config set maxmemory 0
}
test {Unblocked BLMOVE gets notification after response} {
r RPUSH list2{t} a
$rd HELLO 3
$rd read
$rd CLIENT TRACKING on
$rd read
# Tracking key list2{t}
$rd LRANGE list2{t} 0 -1
$rd read
# We block on list1{t}
$rd BLMOVE list1{t} list2{t} left left 0
wait_for_blocked_clients_count 1
# unblock $rd, list2{t} gets element and generate invalidation message
r rpush list1{t} foo
assert_equal [$rd read] {foo}
assert_equal [$rd read] {invalidate list2{t}}
}
test {Tracking gets notification on tracking table key eviction} {
r CLIENT TRACKING off
r CLIENT TRACKING on REDIRECT $redir_id NOLOOP