Delete key doesn't dirty client who watched stale key (#10256)
When WATCH is called on a key that's already logically expired, avoid discarding the transaction when the keys is actually deleted. When WATCH is called, a flag is stored if the key is already expired at the time of watch. The expired key is not deleted, only checked. When a key is "touched", if it is deleted and it was already expired when a client watched it, the client is not marked as dirty. Co-authored-by: Oran Agra <oran@redislabs.com> Co-authored-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com>
This commit is contained in:
parent
47c51d0c78
commit
e9ae03787e
22
src/db.c
22
src/db.c
@ -1340,6 +1340,11 @@ int dbSwapDatabases(int id1, int id2) {
|
|||||||
redisDb aux = server.db[id1];
|
redisDb aux = server.db[id1];
|
||||||
redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
|
redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
|
||||||
|
|
||||||
|
/* Swapdb should make transaction fail if there is any
|
||||||
|
* client watching keys */
|
||||||
|
touchAllWatchedKeysInDb(db1, db2);
|
||||||
|
touchAllWatchedKeysInDb(db2, db1);
|
||||||
|
|
||||||
/* Swap hash tables. Note that we don't swap blocking_keys,
|
/* Swap hash tables. Note that we don't swap blocking_keys,
|
||||||
* ready_keys and watched_keys, since we want clients to
|
* ready_keys and watched_keys, since we want clients to
|
||||||
* remain in the same DB they were. */
|
* remain in the same DB they were. */
|
||||||
@ -1361,14 +1366,9 @@ int dbSwapDatabases(int id1, int id2) {
|
|||||||
* However normally we only do this check for efficiency reasons
|
* However normally we only do this check for efficiency reasons
|
||||||
* in dbAdd() when a list is created. So here we need to rescan
|
* in dbAdd() when a list is created. So here we need to rescan
|
||||||
* the list of clients blocked on lists and signal lists as ready
|
* the list of clients blocked on lists and signal lists as ready
|
||||||
* if needed.
|
* if needed. */
|
||||||
*
|
|
||||||
* Also the swapdb should make transaction fail if there is any
|
|
||||||
* client watching keys */
|
|
||||||
scanDatabaseForReadyLists(db1);
|
scanDatabaseForReadyLists(db1);
|
||||||
touchAllWatchedKeysInDb(db1, db2);
|
|
||||||
scanDatabaseForReadyLists(db2);
|
scanDatabaseForReadyLists(db2);
|
||||||
touchAllWatchedKeysInDb(db2, db1);
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1387,6 +1387,10 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
|||||||
redisDb aux = server.db[i];
|
redisDb aux = server.db[i];
|
||||||
redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
|
redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
|
||||||
|
|
||||||
|
/* Swapping databases should make transaction fail if there is any
|
||||||
|
* client watching keys. */
|
||||||
|
touchAllWatchedKeysInDb(activedb, newdb);
|
||||||
|
|
||||||
/* Swap hash tables. Note that we don't swap blocking_keys,
|
/* Swap hash tables. Note that we don't swap blocking_keys,
|
||||||
* ready_keys and watched_keys, since clients
|
* ready_keys and watched_keys, since clients
|
||||||
* remain in the same DB they were. */
|
* remain in the same DB they were. */
|
||||||
@ -1408,12 +1412,8 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
|||||||
* However normally we only do this check for efficiency reasons
|
* However normally we only do this check for efficiency reasons
|
||||||
* in dbAdd() when a list is created. So here we need to rescan
|
* in dbAdd() when a list is created. So here we need to rescan
|
||||||
* the list of clients blocked on lists and signal lists as ready
|
* the list of clients blocked on lists and signal lists as ready
|
||||||
* if needed.
|
* if needed. */
|
||||||
*
|
|
||||||
* Also the swapdb should make transaction fail if there is any
|
|
||||||
* client watching keys. */
|
|
||||||
scanDatabaseForReadyLists(activedb);
|
scanDatabaseForReadyLists(activedb);
|
||||||
touchAllWatchedKeysInDb(activedb, newdb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
trackingInvalidateKeysOnFlush(1);
|
trackingInvalidateKeysOnFlush(1);
|
||||||
|
59
src/multi.c
59
src/multi.c
@ -257,10 +257,13 @@ void execCommand(client *c) {
|
|||||||
|
|
||||||
/* In the client->watched_keys list we need to use watchedKey structures
|
/* In the client->watched_keys list we need to use watchedKey structures
|
||||||
* as in order to identify a key in Redis we need both the key name and the
|
* as in order to identify a key in Redis we need both the key name and the
|
||||||
* DB */
|
* DB. This struct is also referenced from db->watched_keys dict, where the
|
||||||
|
* values are lists of watchedKey pointers. */
|
||||||
typedef struct watchedKey {
|
typedef struct watchedKey {
|
||||||
robj *key;
|
robj *key;
|
||||||
redisDb *db;
|
redisDb *db;
|
||||||
|
client *client;
|
||||||
|
unsigned expired:1; /* Flag that we're watching an already expired key. */
|
||||||
} watchedKey;
|
} watchedKey;
|
||||||
|
|
||||||
/* Watch for the specified key */
|
/* Watch for the specified key */
|
||||||
@ -284,13 +287,15 @@ void watchForKey(client *c, robj *key) {
|
|||||||
dictAdd(c->db->watched_keys,key,clients);
|
dictAdd(c->db->watched_keys,key,clients);
|
||||||
incrRefCount(key);
|
incrRefCount(key);
|
||||||
}
|
}
|
||||||
listAddNodeTail(clients,c);
|
|
||||||
/* Add the new key to the list of keys watched by this client */
|
/* Add the new key to the list of keys watched by this client */
|
||||||
wk = zmalloc(sizeof(*wk));
|
wk = zmalloc(sizeof(*wk));
|
||||||
wk->key = key;
|
wk->key = key;
|
||||||
|
wk->client = c;
|
||||||
wk->db = c->db;
|
wk->db = c->db;
|
||||||
|
wk->expired = keyIsExpired(c->db, key);
|
||||||
incrRefCount(key);
|
incrRefCount(key);
|
||||||
listAddNodeTail(c->watched_keys,wk);
|
listAddNodeTail(c->watched_keys,wk);
|
||||||
|
listAddNodeTail(clients,wk);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
|
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
|
||||||
@ -305,12 +310,12 @@ void unwatchAllKeys(client *c) {
|
|||||||
list *clients;
|
list *clients;
|
||||||
watchedKey *wk;
|
watchedKey *wk;
|
||||||
|
|
||||||
/* Lookup the watched key -> clients list and remove the client
|
/* Lookup the watched key -> clients list and remove the client's wk
|
||||||
* from the list */
|
* from the list */
|
||||||
wk = listNodeValue(ln);
|
wk = listNodeValue(ln);
|
||||||
clients = dictFetchValue(wk->db->watched_keys, wk->key);
|
clients = dictFetchValue(wk->db->watched_keys, wk->key);
|
||||||
serverAssertWithInfo(c,NULL,clients != NULL);
|
serverAssertWithInfo(c,NULL,clients != NULL);
|
||||||
listDelNode(clients,listSearchKey(clients,c));
|
listDelNode(clients,listSearchKey(clients,wk));
|
||||||
/* Kill the entry at all if this was the only client */
|
/* Kill the entry at all if this was the only client */
|
||||||
if (listLength(clients) == 0)
|
if (listLength(clients) == 0)
|
||||||
dictDelete(wk->db->watched_keys, wk->key);
|
dictDelete(wk->db->watched_keys, wk->key);
|
||||||
@ -321,8 +326,8 @@ void unwatchAllKeys(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* iterates over the watched_keys list and
|
/* Iterates over the watched_keys list and looks for an expired key. Keys which
|
||||||
* look for an expired key . */
|
* were expired already when WATCH was called are ignored. */
|
||||||
int isWatchedKeyExpired(client *c) {
|
int isWatchedKeyExpired(client *c) {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
@ -331,6 +336,7 @@ int isWatchedKeyExpired(client *c) {
|
|||||||
listRewind(c->watched_keys,&li);
|
listRewind(c->watched_keys,&li);
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
wk = listNodeValue(ln);
|
wk = listNodeValue(ln);
|
||||||
|
if (wk->expired) continue; /* was expired when WATCH was called */
|
||||||
if (keyIsExpired(wk->db, wk->key)) return 1;
|
if (keyIsExpired(wk->db, wk->key)) return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -352,13 +358,31 @@ void touchWatchedKey(redisDb *db, robj *key) {
|
|||||||
/* Check if we are already watching for this key */
|
/* Check if we are already watching for this key */
|
||||||
listRewind(clients,&li);
|
listRewind(clients,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *c = listNodeValue(ln);
|
watchedKey *wk = listNodeValue(ln);
|
||||||
|
client *c = wk->client;
|
||||||
|
|
||||||
|
if (wk->expired) {
|
||||||
|
/* The key was already expired when WATCH was called. */
|
||||||
|
if (db == wk->db &&
|
||||||
|
equalStringObjects(key, wk->key) &&
|
||||||
|
dictFind(db->dict, key->ptr) == NULL)
|
||||||
|
{
|
||||||
|
/* Already expired key is deleted, so logically no change. Clear
|
||||||
|
* the flag. Deleted keys are not flagged as expired. */
|
||||||
|
wk->expired = 0;
|
||||||
|
goto skip_client;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
c->flags |= CLIENT_DIRTY_CAS;
|
c->flags |= CLIENT_DIRTY_CAS;
|
||||||
/* As the client is marked as dirty, there is no point in getting here
|
/* As the client is marked as dirty, there is no point in getting here
|
||||||
* again in case that key (or others) are modified again (or keep the
|
* again in case that key (or others) are modified again (or keep the
|
||||||
* memory overhead till EXEC). */
|
* memory overhead till EXEC). */
|
||||||
unwatchAllKeys(c);
|
unwatchAllKeys(c);
|
||||||
|
|
||||||
|
skip_client:
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,14 +403,31 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
|
|||||||
dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
|
dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
robj *key = dictGetKey(de);
|
robj *key = dictGetKey(de);
|
||||||
if (dictFind(emptied->dict, key->ptr) ||
|
int exists_in_emptied = dictFind(emptied->dict, key->ptr) != NULL;
|
||||||
|
if (exists_in_emptied ||
|
||||||
(replaced_with && dictFind(replaced_with->dict, key->ptr)))
|
(replaced_with && dictFind(replaced_with->dict, key->ptr)))
|
||||||
{
|
{
|
||||||
list *clients = dictGetVal(de);
|
list *clients = dictGetVal(de);
|
||||||
if (!clients) continue;
|
if (!clients) continue;
|
||||||
listRewind(clients,&li);
|
listRewind(clients,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *c = listNodeValue(ln);
|
watchedKey *wk = listNodeValue(ln);
|
||||||
|
if (wk->expired) {
|
||||||
|
if (!replaced_with || !dictFind(replaced_with->dict, key->ptr)) {
|
||||||
|
/* Expired key now deleted. No logical change. Clear the
|
||||||
|
* flag. Deleted keys are not flagged as expired. */
|
||||||
|
wk->expired = 0;
|
||||||
|
continue;
|
||||||
|
} else if (keyIsExpired(replaced_with, key)) {
|
||||||
|
/* Expired key remains expired. */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else if (!exists_in_emptied && keyIsExpired(replaced_with, key)) {
|
||||||
|
/* Non-existing key is replaced with an expired key. */
|
||||||
|
wk->expired = 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
client *c = wk->client;
|
||||||
c->flags |= CLIENT_DIRTY_CAS;
|
c->flags |= CLIENT_DIRTY_CAS;
|
||||||
/* As the client is marked as dirty, there is no point in getting here
|
/* As the client is marked as dirty, there is no point in getting here
|
||||||
* again for others keys (or keep the memory overhead till EXEC). */
|
* again for others keys (or keep the memory overhead till EXEC). */
|
||||||
|
@ -147,6 +147,45 @@ start_server {tags {"multi"}} {
|
|||||||
r debug set-active-expire 1
|
r debug set-active-expire 1
|
||||||
} {OK} {needs:debug}
|
} {OK} {needs:debug}
|
||||||
|
|
||||||
|
test {WATCH stale keys should not fail EXEC} {
|
||||||
|
r del x
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r set x foo px 1
|
||||||
|
after 2
|
||||||
|
r watch x
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {needs:debug}
|
||||||
|
|
||||||
|
test {Delete WATCHed stale keys should not fail EXEC} {
|
||||||
|
r del x
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r set x foo px 1
|
||||||
|
after 2
|
||||||
|
r watch x
|
||||||
|
# EXISTS triggers lazy expiry/deletion
|
||||||
|
assert_equal 0 [r exists x]
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {needs:debug}
|
||||||
|
|
||||||
|
test {FLUSHDB while watching stale keys should not fail EXEC} {
|
||||||
|
r del x
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r set x foo px 1
|
||||||
|
after 2
|
||||||
|
r watch x
|
||||||
|
r flushdb
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {needs:debug}
|
||||||
|
|
||||||
test {After successful EXEC key is no longer watched} {
|
test {After successful EXEC key is no longer watched} {
|
||||||
r set x 30
|
r set x 30
|
||||||
r watch x
|
r watch x
|
||||||
@ -245,6 +284,52 @@ start_server {tags {"multi"}} {
|
|||||||
r exec
|
r exec
|
||||||
} {} {singledb:skip}
|
} {} {singledb:skip}
|
||||||
|
|
||||||
|
test {SWAPDB does not touch watched stale keys} {
|
||||||
|
r flushall
|
||||||
|
r select 1
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r set x foo px 1
|
||||||
|
after 2
|
||||||
|
r watch x
|
||||||
|
r swapdb 0 1 ; # expired key replaced with no key => no change
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {singledb:skip needs:debug}
|
||||||
|
|
||||||
|
test {SWAPDB does not touch non-existing key replaced with stale key} {
|
||||||
|
r flushall
|
||||||
|
r select 0
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r set x foo px 1
|
||||||
|
after 2
|
||||||
|
r select 1
|
||||||
|
r watch x
|
||||||
|
r swapdb 0 1 ; # no key replaced with expired key => no change
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {singledb:skip needs:debug}
|
||||||
|
|
||||||
|
test {SWAPDB does not touch stale key replaced with another stale key} {
|
||||||
|
r flushall
|
||||||
|
r debug set-active-expire 0
|
||||||
|
r select 1
|
||||||
|
r set x foo px 1
|
||||||
|
r select 0
|
||||||
|
r set x bar px 1
|
||||||
|
after 2
|
||||||
|
r select 1
|
||||||
|
r watch x
|
||||||
|
r swapdb 0 1 ; # no key replaced with expired key => no change
|
||||||
|
r multi
|
||||||
|
r ping
|
||||||
|
assert_equal {PONG} [r exec]
|
||||||
|
r debug set-active-expire 1
|
||||||
|
} {OK} {singledb:skip needs:debug}
|
||||||
|
|
||||||
test {WATCH is able to remember the DB a key belongs to} {
|
test {WATCH is able to remember the DB a key belongs to} {
|
||||||
r select 5
|
r select 5
|
||||||
r set x 30
|
r set x 30
|
||||||
|
Loading…
x
Reference in New Issue
Block a user