Remove read-only flag from non-keyspace cmds, different approach for EXEC to propagate MULTI (#8216)
In the distant history there was only the read flag for commands, and whatever command that didn't have the read flag was a write one. Then we added the write flag, but some portions of the code still used !read Also some commands that don't work on the keyspace at all, still have the read flag. Changes in this commit: 1. remove the read-only flag from TIME, ECHO, ROLE and LASTSAVE 2. EXEC command used to decides if it should propagate a MULTI by looking at the command flags (!read & !admin). When i was about to change it to look at the write flag instead, i realized that this would cause it not to propagate a MULTI for PUBLISH, EVAL, and SCRIPT, all 3 are not marked as either a read command or a write one (as they should), but all 3 are calling forceCommandPropagation. So instead of introducing a new flag to denote a command that "writes" but not into the keyspace, and still needs propagation, i decided to rely on the forceCommandPropagation, and just fix the code to propagate MULTI when needed rather than depending on the command flags at all. The implication of my change then is that now it won't decide to propagate MULTI when it sees one of these: SELECT, PING, INFO, COMMAND, TIME and other commands which are neither read nor write. 3. Changing getNodeByQuery and clusterRedirectBlockedClientIfNeeded in cluster.c to look at !write rather than read flag. This should have no implications, since these code paths are only reachable for commands which access keys, and these are always marked as either read or write. This commit improve MULTI propagation tests, for modules and a bunch of other special cases, all of which used to pass already before that commit. the only one that test change that uncovered a change of behavior is the one that DELs a non-existing key, it used to propagate an empty multi-exec block, and no longer does.
This commit is contained in:
parent
4bc14da2b3
commit
411c18bbce
@ -5763,7 +5763,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
* cluster is down. */
|
||||
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
|
||||
return NULL;
|
||||
} else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
|
||||
} else if ((cmd->flags & CMD_WRITE) && !(cmd->proc == evalCommand)
|
||||
&& !(cmd->proc == evalShaCommand))
|
||||
{
|
||||
/* The cluster is configured to allow read only commands
|
||||
@ -5812,11 +5812,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
/* Handle the read-only client case reading from a slave: if this
|
||||
* node is a slave and the request is about a hash slot our master
|
||||
* is serving, we can reply without redirection. */
|
||||
int is_readonly_command = (c->cmd->flags & CMD_READONLY) ||
|
||||
(c->cmd->proc == execCommand && !(c->mstate.cmd_inv_flags & CMD_READONLY));
|
||||
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
|
||||
if (c->flags & CLIENT_READONLY &&
|
||||
(is_readonly_command || cmd->proc == evalCommand ||
|
||||
cmd->proc == evalShaCommand) &&
|
||||
(!is_write_command || cmd->proc == evalCommand || cmd->proc == evalShaCommand) &&
|
||||
nodeIsSlave(myself) &&
|
||||
myself->slaveof == n)
|
||||
{
|
||||
@ -5901,7 +5900,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
|
||||
/* if the client is read-only and attempting to access key that our
|
||||
* replica can handle, allow it. */
|
||||
if ((c->flags & CLIENT_READONLY) &&
|
||||
(c->lastcmd->flags & CMD_READONLY) &&
|
||||
!(c->lastcmd->flags & CMD_WRITE) &&
|
||||
nodeIsSlave(myself) && myself->slaveof == node)
|
||||
{
|
||||
node = myself;
|
||||
|
@ -1629,7 +1629,7 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
|
||||
ctx->saved_oparray = server.also_propagate;
|
||||
redisOpArrayInit(&server.also_propagate);
|
||||
}
|
||||
execCommandPropagateMulti(ctx->client);
|
||||
execCommandPropagateMulti(ctx->client->db->id);
|
||||
}
|
||||
|
||||
/* Replicate the specified command and arguments to slaves and AOF, as effect
|
||||
|
24
src/multi.c
24
src/multi.c
@ -127,15 +127,15 @@ void beforePropagateMultiOrExec(int multi) {
|
||||
|
||||
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
|
||||
* implementation for more information. */
|
||||
void execCommandPropagateMulti(client *c) {
|
||||
void execCommandPropagateMulti(int dbid) {
|
||||
beforePropagateMultiOrExec(1);
|
||||
propagate(server.multiCommand,c->db->id,&shared.multi,1,
|
||||
propagate(server.multiCommand,dbid,&shared.multi,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
|
||||
void execCommandPropagateExec(client *c) {
|
||||
void execCommandPropagateExec(int dbid) {
|
||||
beforePropagateMultiOrExec(0);
|
||||
propagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
propagate(server.execCommand,dbid,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
|
||||
@ -162,7 +162,6 @@ void execCommand(client *c) {
|
||||
robj **orig_argv;
|
||||
int orig_argc;
|
||||
struct redisCommand *orig_cmd;
|
||||
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
|
||||
int was_master = server.masterhost == NULL;
|
||||
|
||||
if (!(c->flags & CLIENT_MULTI)) {
|
||||
@ -202,19 +201,6 @@ void execCommand(client *c) {
|
||||
c->argv = c->mstate.commands[j].argv;
|
||||
c->cmd = c->mstate.commands[j].cmd;
|
||||
|
||||
/* Propagate a MULTI request once we encounter the first command which
|
||||
* is not readonly nor an administrative one.
|
||||
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
||||
* both the AOF and the replication link will have the same consistency
|
||||
* and atomicity guarantees. */
|
||||
if (!must_propagate &&
|
||||
!server.loading &&
|
||||
!(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
|
||||
{
|
||||
execCommandPropagateMulti(c);
|
||||
must_propagate = 1;
|
||||
}
|
||||
|
||||
/* ACL permissions are also checked at the time of execution in case
|
||||
* they were changed after the commands were ququed. */
|
||||
int acl_errpos;
|
||||
@ -265,7 +251,7 @@ void execCommand(client *c) {
|
||||
|
||||
/* Make sure the EXEC command will be propagated as well if MULTI
|
||||
* was already propagated. */
|
||||
if (must_propagate) {
|
||||
if (server.propagate_in_transaction) {
|
||||
int is_master = server.masterhost == NULL;
|
||||
server.dirty++;
|
||||
beforePropagateMultiOrExec(0);
|
||||
|
@ -721,7 +721,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
server.lua_write_dirty &&
|
||||
server.lua_repl != PROPAGATE_NONE)
|
||||
{
|
||||
execCommandPropagateMulti(server.lua_caller);
|
||||
execCommandPropagateMulti(server.lua_caller->db->id);
|
||||
server.lua_multi_emitted = 1;
|
||||
/* Now we are in the MULTI context, the lua_client should be
|
||||
* flag as CLIENT_MULTI. */
|
||||
@ -1638,7 +1638,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
if (server.lua_replicate_commands) {
|
||||
preventCommandPropagation(c);
|
||||
if (server.lua_multi_emitted) {
|
||||
execCommandPropagateExec(c);
|
||||
execCommandPropagateExec(c->db->id);
|
||||
}
|
||||
}
|
||||
|
||||
|
26
src/server.c
26
src/server.c
@ -115,9 +115,9 @@ struct redisServer server; /* Server global state */
|
||||
*
|
||||
* write: Write command (may modify the key space).
|
||||
*
|
||||
* read-only: All the non special commands just reading from keys without
|
||||
* changing the content, or returning other information like
|
||||
* the TIME command. Special commands such administrative commands
|
||||
* read-only: Commands just reading from keys without changing the content.
|
||||
* Note that commands that don't read from the keyspace such as
|
||||
* TIME, SELECT, INFO, administrative commands, and connection
|
||||
* or transaction related commands (multi, exec, discard, ...)
|
||||
* are not flagged as read-only commands, since they affect the
|
||||
* server or the connection in other ways.
|
||||
@ -685,7 +685,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"echo",echoCommand,2,
|
||||
"read-only fast @connection",
|
||||
"fast @connection",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"save",saveCommand,1,
|
||||
@ -705,7 +705,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"lastsave",lastsaveCommand,1,
|
||||
"read-only random fast ok-loading ok-stale @admin @dangerous",
|
||||
"random fast ok-loading ok-stale @admin @dangerous",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"type",typeCommand,2,
|
||||
@ -781,7 +781,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"role",roleCommand,1,
|
||||
"ok-loading ok-stale no-script fast read-only @dangerous",
|
||||
"ok-loading ok-stale no-script fast @dangerous",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"debug",debugCommand,-2,
|
||||
@ -891,7 +891,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"time",timeCommand,1,
|
||||
"read-only random fast ok-loading ok-stale",
|
||||
"random fast ok-loading ok-stale",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"bitop",bitopCommand,-4,
|
||||
@ -3377,6 +3377,14 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
|
||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
|
||||
int flags)
|
||||
{
|
||||
/* Propagate a MULTI request once we encounter the first command which
|
||||
* is a write command.
|
||||
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
||||
* both the AOF and the replication link will have the same consistency
|
||||
* and atomicity guarantees. */
|
||||
if (server.in_exec && !server.propagate_in_transaction)
|
||||
execCommandPropagateMulti(dbid);
|
||||
|
||||
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
|
||||
feedAppendOnlyFile(cmd,dbid,argv,argc);
|
||||
if (flags & PROPAGATE_REPL)
|
||||
@ -3607,7 +3615,7 @@ void call(client *c, int flags) {
|
||||
!(c->flags & CLIENT_MULTI) &&
|
||||
!(flags & CMD_CALL_NOWRAP))
|
||||
{
|
||||
execCommandPropagateMulti(c);
|
||||
execCommandPropagateMulti(c->db->id);
|
||||
multi_emitted = 1;
|
||||
}
|
||||
|
||||
@ -3622,7 +3630,7 @@ void call(client *c, int flags) {
|
||||
}
|
||||
|
||||
if (multi_emitted) {
|
||||
execCommandPropagateExec(c);
|
||||
execCommandPropagateExec(c->db->id);
|
||||
}
|
||||
}
|
||||
redisOpArrayFree(&server.also_propagate);
|
||||
|
@ -1861,8 +1861,8 @@ void touchWatchedKeysOnFlush(int dbid);
|
||||
void discardTransaction(client *c);
|
||||
void flagTransaction(client *c);
|
||||
void execCommandAbort(client *c, sds error);
|
||||
void execCommandPropagateMulti(client *c);
|
||||
void execCommandPropagateExec(client *c);
|
||||
void execCommandPropagateMulti(int dbid);
|
||||
void execCommandPropagateExec(int dbid);
|
||||
void beforePropagateMultiOrExec(int multi);
|
||||
|
||||
/* Redis object implementation */
|
||||
|
@ -51,18 +51,31 @@ void timerHandler(RedisModuleCtx *ctx, void *data) {
|
||||
RedisModule_Replicate(ctx,"INCR","c","timer");
|
||||
times++;
|
||||
|
||||
if (times < 10)
|
||||
if (times < 3)
|
||||
RedisModule_CreateTimer(ctx,100,timerHandler,NULL);
|
||||
else
|
||||
times = 0;
|
||||
}
|
||||
|
||||
int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
RedisModuleTimerID timer_id =
|
||||
RedisModule_CreateTimer(ctx,100,timerHandler,NULL);
|
||||
REDISMODULE_NOT_USED(timer_id);
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx,"OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* The thread entry point. */
|
||||
void *threadMain(void *arg) {
|
||||
REDISMODULE_NOT_USED(arg);
|
||||
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL);
|
||||
RedisModule_SelectDb(ctx,9); /* Tests ran in database number 9. */
|
||||
for (int i = 0; i < 10; i++) {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
RedisModule_ThreadSafeContextLock(ctx);
|
||||
RedisModule_Replicate(ctx,"INCR","c","a-from-thread");
|
||||
RedisModule_Replicate(ctx,"INCR","c","b-from-thread");
|
||||
@ -72,15 +85,11 @@ void *threadMain(void *arg) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int propagateTestCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
int propagateTestThreadCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
RedisModuleTimerID timer_id =
|
||||
RedisModule_CreateTimer(ctx,100,timerHandler,NULL);
|
||||
REDISMODULE_NOT_USED(timer_id);
|
||||
|
||||
pthread_t tid;
|
||||
if (pthread_create(&tid,NULL,threadMain,NULL) != 0)
|
||||
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
||||
@ -90,7 +99,7 @@ int propagateTestCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int propagateTest2Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -102,7 +111,7 @@ int propagateTest2Command(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int propagateTest3Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -129,18 +138,23 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_Init(ctx,"propagate-test",1,REDISMODULE_APIVER_1)
|
||||
== REDISMODULE_ERR) return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test",
|
||||
propagateTestCommand,
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test.timer",
|
||||
propagateTestTimerCommand,
|
||||
"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test-2",
|
||||
propagateTest2Command,
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test.thread",
|
||||
propagateTestThreadCommand,
|
||||
"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test-3",
|
||||
propagateTest3Command,
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test.simple",
|
||||
propagateTestSimpleCommand,
|
||||
"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"propagate-test.mixed",
|
||||
propagateTestMixedCommand,
|
||||
"",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
|
@ -14,25 +14,103 @@ tags "modules" {
|
||||
# Start the replication process...
|
||||
$replica replicaof $master_host $master_port
|
||||
wait_for_sync $replica
|
||||
|
||||
after 1000
|
||||
$master propagate-test
|
||||
|
||||
wait_for_condition 5000 10 {
|
||||
([$replica get timer] eq "10") && \
|
||||
([$replica get a-from-thread] eq "10")
|
||||
} else {
|
||||
fail "The two counters don't match the expected value."
|
||||
test {module propagates from timer} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
$master propagate-test.timer
|
||||
|
||||
wait_for_condition 5000 10 {
|
||||
[$replica get timer] eq "3"
|
||||
} else {
|
||||
fail "The two counters don't match the expected value."
|
||||
}
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{incr timer}
|
||||
{exec}
|
||||
{multi}
|
||||
{incr timer}
|
||||
{exec}
|
||||
{multi}
|
||||
{incr timer}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
$master propagate-test-2
|
||||
$master propagate-test-3
|
||||
$master multi
|
||||
$master propagate-test-2
|
||||
$master propagate-test-3
|
||||
$master exec
|
||||
wait_for_ofs_sync $master $replica
|
||||
test {module propagates from thread} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
$master propagate-test.thread
|
||||
|
||||
wait_for_condition 5000 10 {
|
||||
[$replica get a-from-thread] eq "3"
|
||||
} else {
|
||||
fail "The two counters don't match the expected value."
|
||||
}
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{incr a-from-thread}
|
||||
{incr b-from-thread}
|
||||
{incr a-from-thread}
|
||||
{incr b-from-thread}
|
||||
{incr a-from-thread}
|
||||
{incr b-from-thread}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
test {module propagates from from command} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
$master propagate-test.simple
|
||||
$master propagate-test.mixed
|
||||
|
||||
# Note the 'after-call' propagation below is out of order (known limitation)
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{incr counter-1}
|
||||
{incr counter-2}
|
||||
{exec}
|
||||
{multi}
|
||||
{incr using-call}
|
||||
{incr after-call}
|
||||
{incr counter-1}
|
||||
{incr counter-2}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
test {module propagates from from multi-exec} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
$master multi
|
||||
$master propagate-test.simple
|
||||
$master propagate-test.mixed
|
||||
$master exec
|
||||
wait_for_ofs_sync $master $replica
|
||||
|
||||
# Note the 'after-call' propagation below is out of order (known limitation)
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{incr counter-1}
|
||||
{incr counter-2}
|
||||
{incr using-call}
|
||||
{incr after-call}
|
||||
{incr counter-1}
|
||||
{incr counter-2}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
assert_equal [s -1 unexpected_error_replies] 0
|
||||
}
|
||||
}
|
||||
@ -47,11 +125,11 @@ tags "modules aof" {
|
||||
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
|
||||
waitForBgrewriteaof r
|
||||
|
||||
r propagate-test-2
|
||||
r propagate-test-3
|
||||
r propagate-test.simple
|
||||
r propagate-test.mixed
|
||||
r multi
|
||||
r propagate-test-2
|
||||
r propagate-test-3
|
||||
r propagate-test.simple
|
||||
r propagate-test.mixed
|
||||
r exec
|
||||
|
||||
# Load the AOF
|
||||
|
@ -299,10 +299,14 @@ start_server {tags {"multi"}} {
|
||||
r multi
|
||||
r del foo
|
||||
r exec
|
||||
|
||||
# add another command so that when we see it we know multi-exec wasn't
|
||||
# propagated
|
||||
r incr foo
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{exec}
|
||||
{incr foo}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
@ -521,4 +525,83 @@ start_server {tags {"multi"}} {
|
||||
|
||||
list $m $res
|
||||
} {OK {{} {} {} {} {} {} {} {}}}
|
||||
|
||||
test {MULTI propagation of PUBLISH} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that PUBLISH inside MULTI is propagated in a transaction
|
||||
r multi
|
||||
r publish bla bla
|
||||
r exec
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{publish bla bla}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
test {MULTI propagation of SCRIPT LOAD} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that SCRIPT LOAD inside MULTI is propagated in a transaction
|
||||
r multi
|
||||
r script load {redis.call('set', KEYS[1], 'foo')}
|
||||
set res [r exec]
|
||||
set sha [lindex $res 0]
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{script load *}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
test {MULTI propagation of SCRIPT LOAD} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# make sure that EVAL inside MULTI is propagated in a transaction
|
||||
r config set lua-replicate-commands no
|
||||
r multi
|
||||
r eval {redis.call('set', KEYS[1], 'bar')} 1 bar
|
||||
r exec
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{multi}
|
||||
{eval *}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
|
||||
tags {"stream"} {
|
||||
test {MULTI propagation of XREADGROUP} {
|
||||
# stream is a special case because it calls propagate() directly for XREADGROUP
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
r XADD mystream * foo bar
|
||||
r XGROUP CREATE mystream mygroup 0
|
||||
|
||||
# make sure the XCALIM (propagated by XREADGROUP) is indeed inside MULTI/EXEC
|
||||
r multi
|
||||
r XREADGROUP GROUP mygroup consumer1 STREAMS mystream ">"
|
||||
r exec
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{xadd *}
|
||||
{xgroup CREATE *}
|
||||
{multi}
|
||||
{xclaim *}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user