Merge pull request #7037 from guybe7/fix_module_replicate_multi

Modules: Test MULTI/EXEC replication of RM_Replicate
This commit is contained in:
Salvatore Sanfilippo 2020-03-31 17:00:57 +02:00 committed by GitHub
commit 0c52ce6c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 9 deletions

View File

@ -831,7 +831,7 @@ int loadAppendOnlyFile(char *filename) {
if (cmd == server.multiCommand) valid_before_multi = valid_up_to; if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
/* Run the command in the context of a fake client */ /* Run the command in the context of a fake client */
fakeClient->cmd = cmd; fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flags & CLIENT_MULTI && if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand) fakeClient->cmd->proc != execCommand)
{ {

View File

@ -373,13 +373,16 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
* will produce an error. However it is useful to log such events since * will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */ * they are rare and may hint at errors in a script or a bug in Redis. */
int ctype = getClientType(c); int ctype = getClientType(c);
if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) { if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica"; char* to = c->id == CLIENT_ID_AOF ? "AOF-client" :
char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master"; ctype == CLIENT_TYPE_MASTER ? "master" : "replica";
char* from = c->id == CLIENT_ID_AOF ? "server" :
ctype == CLIENT_TYPE_MASTER ? "replica" : "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command " "to its %s: '%s' after processing the command "
"'%s'", from, to, s, cmdname); "'%s'", from, to, s, cmdname);
server.stat_unexpected_error_replies++;
} }
} }

View File

@ -673,7 +673,7 @@ int REDISMODULE_API_FUNC(RedisModule_AuthenticateClientWithUser)(RedisModuleCtx
void REDISMODULE_API_FUNC(RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id); void REDISMODULE_API_FUNC(RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id);
#endif #endif
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) #define RedisModule_IsAOFClient(id) ((id) == CLIENT_ID_AOF)
/* This is included inline inside each Redis module. */ /* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused)); static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));

View File

@ -2661,6 +2661,7 @@ void resetServerStats(void) {
} }
server.stat_net_input_bytes = 0; server.stat_net_input_bytes = 0;
server.stat_net_output_bytes = 0; server.stat_net_output_bytes = 0;
server.stat_unexpected_error_replies = 0;
server.aof_delayed_fsync = 0; server.aof_delayed_fsync = 0;
} }
@ -3266,7 +3267,7 @@ void call(client *c, int flags) {
if (flags & CMD_CALL_PROPAGATE) { if (flags & CMD_CALL_PROPAGATE) {
int multi_emitted = 0; int multi_emitted = 0;
/* Wrap the commands in server.also_propagate array, /* Wrap the commands in server.also_propagate array,
* but don't wrap it if we are already in MULIT context, * but don't wrap it if we are already in MULTI context,
* in case the nested MULTI/EXEC. * in case the nested MULTI/EXEC.
* *
* And if the array contains only one command, no need to * And if the array contains only one command, no need to
@ -3975,7 +3976,7 @@ sds genRedisInfoString(const char *section) {
"client_recent_max_output_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n"
"blocked_clients:%d\r\n" "blocked_clients:%d\r\n"
"tracking_clients:%d\r\n" "tracking_clients:%d\r\n"
"clients_in_timeout_table:%lld\r\n", "clients_in_timeout_table:%ld\r\n",
listLength(server.clients)-listLength(server.slaves), listLength(server.clients)-listLength(server.slaves),
maxin, maxout, maxin, maxout,
server.blocked_clients, server.blocked_clients,
@ -4230,7 +4231,8 @@ sds genRedisInfoString(const char *section) {
"active_defrag_key_hits:%lld\r\n" "active_defrag_key_hits:%lld\r\n"
"active_defrag_key_misses:%lld\r\n" "active_defrag_key_misses:%lld\r\n"
"tracking_total_keys:%lld\r\n" "tracking_total_keys:%lld\r\n"
"tracking_total_items:%lld\r\n", "tracking_total_items:%lld\r\n"
"unexpected_error_replies:%lld\r\n",
server.stat_numconnections, server.stat_numconnections,
server.stat_numcommands, server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND), getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -4259,7 +4261,8 @@ sds genRedisInfoString(const char *section) {
server.stat_active_defrag_key_hits, server.stat_active_defrag_key_hits,
server.stat_active_defrag_key_misses, server.stat_active_defrag_key_misses,
(unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems()); (unsigned long long) trackingGetTotalItems(),
server.stat_unexpected_error_replies);
} }
/* Replication */ /* Replication */

View File

@ -1129,6 +1129,7 @@ struct redisServer {
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
/* The following two are used to track instantaneous metrics, like /* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */ * number of operations per second, network traffic. */
struct { struct {

View File

@ -24,7 +24,40 @@ tags "modules" {
} else { } else {
fail "The two counters don't match the expected value." fail "The two counters don't match the expected value."
} }
$master propagate-test-2
$master propagate-test-3
$master multi
$master propagate-test-2
$master propagate-test-3
$master exec
wait_for_ofs_sync $master $replica
assert_equal [s -1 unexpected_error_replies] 0
} }
} }
} }
} }
tags "modules aof" {
test {Modules RM_Replicate replicates MULTI/EXEC correctly} {
start_server [list overrides [list loadmodule "$testmodule"]] {
# Enable the AOF
r config set appendonly yes
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
waitForBgrewriteaof r
r propagate-test-2
r propagate-test-3
r multi
r propagate-test-2
r propagate-test-3
r exec
# Load the AOF
r debug loadaof
assert_equal [s 0 unexpected_error_replies] 0
}
}
}