Allow blocked XREAD on a cluster replica (#7881)
I suppose that it was overlooked, since till recently none of the blocked commands were readonly. other changes: - add test for the above. - add better support for additional (and deferring) clients for cluster tests - improve a test which left the client in MULTI state.
This commit is contained in:
parent
6f3b91fe26
commit
216c110609
@ -5855,6 +5855,15 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
|
|||||||
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
|
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
|
||||||
clusterNode *node = server.cluster->slots[slot];
|
clusterNode *node = server.cluster->slots[slot];
|
||||||
|
|
||||||
|
/* if the client is read-only and attempting to access key that our
|
||||||
|
* replica can handle, allow it. */
|
||||||
|
if ((c->flags & CLIENT_READONLY) &&
|
||||||
|
(c->lastcmd->flags & CMD_READONLY) &&
|
||||||
|
nodeIsSlave(myself) && myself->slaveof == node)
|
||||||
|
{
|
||||||
|
node = myself;
|
||||||
|
}
|
||||||
|
|
||||||
/* We send an error and unblock the client if:
|
/* We send an error and unblock the client if:
|
||||||
* 1) The slot is unassigned, emitting a cluster down error.
|
* 1) The slot is unassigned, emitting a cluster down error.
|
||||||
* 2) The slot is not handled by this node, nor being imported. */
|
* 2) The slot is not handled by this node, nor being imported. */
|
||||||
|
@ -45,4 +45,25 @@ test "MULTI-EXEC with write operations is MOVED" {
|
|||||||
$replica MULTI
|
$replica MULTI
|
||||||
catch {$replica HSET h b 4} err
|
catch {$replica HSET h b 4} err
|
||||||
assert {[string range $err 0 4] eq {MOVED}}
|
assert {[string range $err 0 4] eq {MOVED}}
|
||||||
|
catch {$replica exec} err
|
||||||
|
assert {[string range $err 0 8] eq {EXECABORT}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "read-only blocking operations from replica" {
|
||||||
|
set rd [redis_deferring_client redis 1]
|
||||||
|
$rd readonly
|
||||||
|
$rd read
|
||||||
|
$rd XREAD BLOCK 0 STREAMS k 0
|
||||||
|
|
||||||
|
wait_for_condition 1000 50 {
|
||||||
|
[RI 1 blocked_clients] eq {1}
|
||||||
|
} else {
|
||||||
|
fail "client wasn't blocked"
|
||||||
|
}
|
||||||
|
|
||||||
|
$primary XADD k * foo bar
|
||||||
|
set res [$rd read]
|
||||||
|
set res [lindex [lindex [lindex [lindex $res 0] 1] 0] 1]
|
||||||
|
assert {$res eq {foo bar}}
|
||||||
|
$rd close
|
||||||
}
|
}
|
||||||
|
@ -606,3 +606,16 @@ proc restart_instance {type id} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
proc redis_deferring_client {type id} {
|
||||||
|
set port [get_instance_attrib $type $id port]
|
||||||
|
set host [get_instance_attrib $type $id host]
|
||||||
|
set client [redis $host $port 1 $::tls]
|
||||||
|
return $client
|
||||||
|
}
|
||||||
|
|
||||||
|
proc redis_client {type id} {
|
||||||
|
set port [get_instance_attrib $type $id port]
|
||||||
|
set host [get_instance_attrib $type $id host]
|
||||||
|
set client [redis $host $port 0 $::tls]
|
||||||
|
return $client
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user