Merge tag '6.0-rc3' into redis_6_merge

Redis 6.0 RC3.


Former-commit-id: b2cb10de5f39b4d8e1ee19877c2bdaf19eefd9db
This commit is contained in:
John Sully 2020-04-14 22:56:19 -04:00
commit d3ebe3b6bd
17 changed files with 486 additions and 75 deletions

View File

@ -11,6 +11,323 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release. SECURITY: There are security fixes in the release.
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
================================================================================
Redis 6.0-rc3 Released Tue Mar 31 17:42:39 CEST 2020
================================================================================
Upgrade urgency CRITICAL: A connection management bug introduced with the
SSL implementation can crash Redis easily.
Dear users, this is a list of the major changes in this release, please check
the list of commits for detail:
* Fix crash due to refactoring for SSL, for the connection code.
* Precise timeouts for blocking commands. Now the timeouts have HZ
resolution regardless of the number of connected clinets. New timeouts
are stored in a radix tree and sorted by expire time.
* Fix rare crash when resizing the event loop because of CONFIG maxclients.
* Fix systemd readiness after successful partial resync.
* Redis-cli ask password mode to be prompted at startup (for additional safety).
* Keyspace notifications added to MIGRATE / RESTORE.
* Threaded I/O bugs fixed.
* Implement new ACL style AUTH in Sentinel.
* Make 'requirepass' more backward compatible with Redis <= 5.
* ACL: Handle default user as disabled if it's off regardless of "nopass".
* Fix a potential inconsistency when upgrading an instance in Redis Cluster
and restarting it. The instance will act as a replica but will actually be
set as a master immediately. However the choice of what to do with already
expired keys, on loading, was made from the POV of replicas.
* Abort transactions after -READONLY error.
* Many different fixes to module APIs.
* BITFIELD_RO added to call the command on read only replicas.
* PSYNC2: meaningful offset implementation. Allow the disconnected master
that is still sending PINGs to replicas, to be able to successfully
PSYNC incrementally to new slaves, discarding the last part of the
replication backlog consisting only of PINGs.
* Fix pipelined MULTI/EXEC during Lua scripts are in BUSY state.
* Re-fix propagation API in modules, broken again after other changes.
antirez in commit ef1b1f01:
cast raxSize() to avoid warning with format spec.
1 file changed, 1 insertion(+), 1 deletion(-)
antirez in commit 9f347fab:
Minor changes to #7037.
2 files changed, 14 insertions(+), 5 deletions(-)
Guy Benoish in commit a509400d:
Modules: Test MULTI/EXEC replication of RM_Replicate
6 files changed, 49 insertions(+), 9 deletions(-)
Guy Benoish in commit 805c8c94:
RENAME can unblock XREADGROUP
3 files changed, 25 insertions(+), 1 deletion(-)
antirez in commit 97b80b57:
Fix the propagate Tcl test after module changes.
1 file changed, 1 insertion(+), 1 deletion(-)
antirez in commit 4f6b6b80:
Modify the propagate unit test to show more cases.
1 file changed, 30 insertions(+), 2 deletions(-)
antirez in commit 616b1cb7:
Fix module commands propagation double MULTI bug.
4 files changed, 25 insertions(+), 8 deletions(-)
antirez in commit 08fdef4b:
Fix RM_Call() stale comment due to cut&paste.
1 file changed, 1 insertion(+), 3 deletions(-)
OMG-By in commit 26b79ca1:
fix: dict.c->dictResize()->minimal type
1 file changed, 1 insertion(+), 1 deletion(-)
zhaozhao.zz in commit fa418637:
PSYNC2: reset backlog_idx and master_repl_offset correctly
1 file changed, 10 insertions(+), 5 deletions(-)
antirez in commit bbbc80ac:
Precise timeouts: reference client pointer directly.
1 file changed, 13 insertions(+), 16 deletions(-)
antirez in commit c3b268a0:
timeout.c created: move client timeouts code there.
5 files changed, 198 insertions(+), 167 deletions(-)
Oran Agra in commit 0f7dfc37:
AOFRW on an empty stream created with MKSTREAM loads badkly
2 files changed, 15 insertions(+), 1 deletion(-)
antirez in commit 67643ead:
Precise timeouts: cleaup the table on unblock.
3 files changed, 21 insertions(+), 2 deletions(-)
antirez in commit ad94066e:
Precise timeouts: fix comments after functional change.
2 files changed, 6 insertions(+), 6 deletions(-)
antirez in commit a443ec2e:
Precise timeouts: use only radix tree for timeouts.
3 files changed, 15 insertions(+), 38 deletions(-)
antirez in commit 6862fd70:
Precise timeouts: fast exit for clientsHandleShortTimeout().
1 file changed, 1 insertion(+)
antirez in commit 30f1df8c:
Precise timeouts: fix bugs in initial implementation.
2 files changed, 5 insertions(+), 1 deletion(-)
antirez in commit 7add0f24:
Precise timeouts: working initial implementation.
3 files changed, 110 insertions(+), 28 deletions(-)
antirez in commit 9d6d1779:
Precise timeouts: refactor unblocking on timeout.
2 files changed, 33 insertions(+), 13 deletions(-)
antirez in commit 316a8f15:
PSYNC2: fix backlog_idx when adjusting for meaningful offset
1 file changed, 3 insertions(+)
伯成 in commit 11db53f8:
Boost up performance for redis PUB-SUB patterns matching
3 files changed, 43 insertions(+), 11 deletions(-)
antirez in commit e257f121:
PSYNC2: meaningful offset test.
2 files changed, 62 insertions(+)
antirez in commit 5f72f696:
PSYNC2: meaningful offset implemented.
3 files changed, 40 insertions(+), 1 deletion(-)
antirez in commit 8caa2714:
Explain why we allow transactions in -BUSY state.
1 file changed, 9 insertions(+), 2 deletions(-)
Oran Agra in commit e43cd831:
MULTI/EXEC during LUA script timeout are messed up
2 files changed, 73 insertions(+)
antirez in commit 34b89832:
Improve comments of replicationCacheMasterUsingMyself().
1 file changed, 6 insertions(+), 1 deletion(-)
antirez in commit 70a98a43:
Fix BITFIELD_RO test.
2 files changed, 5 insertions(+), 5 deletions(-)
antirez in commit 8783304a:
Abort transactions after -READONLY error. Fix #7014.
1 file changed, 1 insertion(+)
antirez in commit ec9cf002:
Minor changes to BITFIELD_RO PR #6951.
1 file changed, 9 insertions(+), 6 deletions(-)
bodong.ybd in commit b3e4abf0:
Added BITFIELD_RO variants for read-only operations.
4 files changed, 54 insertions(+), 1 deletion(-)
antirez in commit 50f8f950:
Modules: updated function doc after #7003.
1 file changed, 6 insertions(+), 1 deletion(-)
Guy Benoish in commit f2f3dc5e:
Allow RM_GetContextFlags to work with ctx==NULL
1 file changed, 16 insertions(+), 14 deletions(-)
hwware in commit eb808879:
fix potentical memory leak in redis-cli
1 file changed, 2 insertions(+)
Yossi Gottlieb in commit cdcab0e8:
Fix crashes related to failed/rejected accepts.
1 file changed, 6 insertions(+), 5 deletions(-)
Yossi Gottlieb in commit 50dcd9f9:
Cluster: fix misleading accept errors.
1 file changed, 4 insertions(+), 3 deletions(-)
Yossi Gottlieb in commit 87dbd8f5:
Conns: Fix connClose() / connAccept() behavior.
3 files changed, 48 insertions(+), 32 deletions(-)
hwware in commit 81e8686c:
remove redundant Semicolon
1 file changed, 1 insertion(+), 1 deletion(-)
hwware in commit c7524a7e:
clean CLIENT_TRACKING_CACHING flag when disabled caching
1 file changed, 1 insertion(+), 1 deletion(-)
hwware in commit 2dd1ca6a:
add missing commands in cluster help
1 file changed, 2 insertions(+), 1 deletion(-)
artix in commit 95324b81:
Support Redis Cluster Proxy PROXY INFO command
1 file changed, 5 insertions(+), 1 deletion(-)
박승현 in commit 04c53fa1:
Update redis.conf
1 file changed, 1 insertion(+), 1 deletion(-)
WuYunlong in commit 0578157d:
Fix master replica inconsistency for upgrading scenario.
3 files changed, 9 insertions(+), 2 deletions(-)
WuYunlong in commit 299f1d02:
Add 14-consistency-check.tcl to prove there is a data consistency issue.
1 file changed, 87 insertions(+)
antirez in commit 61b98f32:
Regression test for #7011.
1 file changed, 7 insertions(+)
antirez in commit 34ea2f4e:
ACL: default user off should not allow automatic authentication.
2 files changed, 3 insertions(+), 2 deletions(-)
antirez in commit cbbf9b39:
Sentinel: document auth-user directive.
1 file changed, 12 insertions(+)
antirez in commit 9c2e42dd:
ACL: Make Redis 6 more backward compatible with requirepass.
4 files changed, 17 insertions(+), 15 deletions(-)
antirez in commit d387f67d:
Sentinel: implement auth-user directive for ACLs.
1 file changed, 38 insertions(+), 7 deletions(-)
zhaozhao.zz in commit 7c078416:
Threaded IO: bugfix client kill may crash redis
1 file changed, 11 insertions(+), 5 deletions(-)
zhaozhao.zz in commit 9cc7038e:
Threaded IO: handle pending reads clients ASAP after event loop
1 file changed, 3 insertions(+), 1 deletion(-)
antirez in commit da8c7c49:
Example sentinel conf: document requirepass.
1 file changed, 8 insertions(+)
antirez in commit bdb338cf:
Aesthetic changes in PR #6989.
1 file changed, 9 insertions(+), 5 deletions(-)
zhaozhao.zz in commit b3e03054:
Threaded IO: bugfix #6988 process events while blocked
1 file changed, 5 insertions(+)
antirez in commit e628f944:
Restore newline at the end of redis-cli.c
1 file changed, 2 insertions(+), 1 deletion(-)
chendianqiang in commit 5d4c4df3:
use correct list for moduleUnregisterUsedAPI
1 file changed, 1 insertion(+), 1 deletion(-)
guodongxiaren in commit da14982d:
string literal should be const char*
1 file changed, 1 insertion(+), 1 deletion(-)
Itamar Haber in commit dc8885a1:
Adds keyspace notifications to migrate and restore
1 file changed, 3 insertions(+), 1 deletion(-)
bodong.ybd in commit bfb18e55:
Remove duplicate obj files in Makefile
1 file changed, 2 insertions(+), 2 deletions(-)
bodong.ybd in commit 76d57161:
Fix bug of tcl test using external server
2 files changed, 8 insertions(+), 2 deletions(-)
fengpf in commit 0e5820d8:
fix comments in latency.c
2 files changed, 2 insertions(+), 1 deletion(-)
antirez in commit 916dd79f:
Update linenoise.
1 file changed, 2 insertions(+), 1 deletion(-)
lifubang in commit c0c67c9b:
add askpass mode
1 file changed, 19 insertions(+), 1 deletion(-)
lifubang in commit e1c29434:
update linenoise to https://github.com/antirez/linenoise/tree/fc9667a81d43911a6690fb1e68c16e6e3bb8df05
4 files changed, 59 insertions(+), 4 deletions(-)
Jamie Scott in commit e5a063bc:
Remove default guidance in Redis.conf
1 file changed, 1 insertion(+), 2 deletions(-)
Jamie Scott in commit d28cbaf7:
Update Redis.conf to improve TLS usability
1 file changed, 2 insertions(+), 1 deletion(-)
Johannes Truschnigg in commit 23d5e8b8:
Signal systemd readiness atfer Partial Resync
1 file changed, 4 insertions(+)
Oran Agra in commit 61738154:
fix for flaky psync2 test
1 file changed, 21 insertions(+)
antirez in commit 70e0e499:
ae.c: fix crash when resizing the event loop.
1 file changed, 6 insertions(+), 2 deletions(-)
antirez in commit b3e4aa67:
Fix release notes spelling mistake.
1 file changed, 1 insertion(+), 1 deletion(-)
================================================================================ ================================================================================
Redis 6.0 RC2 Released Thu Mar 05 15:40:53 CET 2020 Redis 6.0 RC2 Released Thu Mar 05 15:40:53 CET 2020
================================================================================ ================================================================================

View File

@ -857,7 +857,7 @@ int loadAppendOnlyFile(char *filename) {
if (cmd == cserver.multiCommand) valid_before_multi = valid_up_to; if (cmd == cserver.multiCommand) valid_before_multi = valid_up_to;
/* Run the command in the context of a fake client */ /* Run the command in the context of a fake client */
fakeClient->cmd = cmd; fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flags & CLIENT_MULTI && if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand) fakeClient->cmd->proc != execCommand)
{ {

View File

@ -224,6 +224,11 @@ void dbAdd(redisDb *db, robj *key, robj *val)
{ {
int retval = dbAddCore(db, key, val); int retval = dbAddCore(db, key, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK); serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET ||
val->type == OBJ_STREAM)
signalKeyAsReady(db, key);
if (g_pserver->cluster_enabled) slotToKeyAdd(key);
} }
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)

View File

@ -846,6 +846,8 @@ void serverLogObjectDebugInfo(robj_roptr o) {
serverLog(LL_WARNING,"Sorted set size: %d", (int) zsetLength(o)); serverLog(LL_WARNING,"Sorted set size: %d", (int) zsetLength(o));
if (o->encoding == OBJ_ENCODING_SKIPLIST) if (o->encoding == OBJ_ENCODING_SKIPLIST)
serverLog(LL_WARNING,"Skiplist level: %d", (int) ((const zset*)ptrFromObj(o))->zsl->level); serverLog(LL_WARNING,"Skiplist level: %d", (int) ((const zset*)ptrFromObj(o))->zsl->level);
} else if (o->type == OBJ_STREAM) {
serverLog(LL_WARNING,"Stream size: %d", (int) streamLength(o));
} }
} }

View File

@ -134,7 +134,7 @@ int _dictInit(dict *d, dictType *type,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */ * but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d) int dictResize(dict *d)
{ {
int minimal; unsigned long minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR; if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used; minimal = d->ht[0].used;

View File

@ -3402,13 +3402,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
* a Lua script in the context of AOF and slaves. */ * a Lua script in the context of AOF and slaves. */
if (replicate) moduleReplicateMultiIfNeeded(ctx); if (replicate) moduleReplicateMultiIfNeeded(ctx);
if (ctx->client->flags & CLIENT_MULTI ||
ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
c->flags |= CLIENT_MULTI;
}
/* Run the command */ /* Run the command */
call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP;
if (replicate) { if (replicate) {
if (!(flags & REDISMODULE_ARGV_NO_AOF)) if (!(flags & REDISMODULE_ARGV_NO_AOF))
call_flags |= CMD_CALL_PROPAGATE_AOF; call_flags |= CMD_CALL_PROPAGATE_AOF;
@ -3417,9 +3412,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
} }
call(c,call_flags); call(c,call_flags);
/* Convert the result of the Redis command into a suitable Lua type. /* Convert the result of the Redis command into a module reply. */
* The first thing we need is to create a single string from the client
* output buffers. */
proto = sdsnewlen(c->buf,c->bufpos); proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0; c->bufpos = 0;
while(listLength(c->reply)) { while(listLength(c->reply)) {

View File

@ -488,9 +488,20 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
* will produce an error. However it is useful to log such events since * will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */ * they are rare and may hint at errors in a script or a bug in Redis. */
int ctype = getClientType(c); int ctype = getClientType(c);
if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) { if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
const char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica"; const char *to, *from;
const char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master";
if (c->id == CLIENT_ID_AOF) {
to = "AOF-loading-client";
from = "server";
} else if (ctype == CLIENT_TYPE_MASTER) {
to = "master";
from = "replica";
} else {
to = "replica";
from = "master";
}
const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command " "to its %s: '%s' after processing the command "
@ -500,7 +511,8 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
std::string str = escapeString(c->querybuf); std::string str = escapeString(c->querybuf);
printf("\tquerybuf: %s\n", str.c_str()); printf("\tquerybuf: %s\n", str.c_str());
} }
c->master_error = 1;
g_pserver->stat_unexpected_error_replies++;
} }
} }

View File

@ -677,7 +677,7 @@ int REDISMODULE_API_FUNC(RedisModule_AuthenticateClientWithUser)(RedisModuleCtx
void REDISMODULE_API_FUNC(RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id); void REDISMODULE_API_FUNC(RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id);
#endif #endif
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) #define RedisModule_IsAOFClient(id) ((id) == CLIENT_ID_AOF)
/* This is included inline inside each Redis module. */ /* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused)); static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));

View File

@ -3255,11 +3255,16 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
g_pserver->master_repl_offset, g_pserver->master_repl_offset,
delta); delta);
mi->master_initial_offset = g_pserver->master_repl_meaningful_offset; mi->master_initial_offset = g_pserver->master_repl_meaningful_offset;
g_pserver->repl_backlog_histlen -= delta; g_pserver->master_repl_offset = g_pserver->master_repl_meaningful_offset;
g_pserver->repl_backlog_idx = if (g_pserver->repl_backlog_histlen <= delta) {
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) % g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_size; g_pserver->repl_backlog_idx = 0;
if (g_pserver->repl_backlog_histlen < 0) g_pserver->repl_backlog_histlen = 0; } else {
g_pserver->repl_backlog_histlen -= delta;
g_pserver->repl_backlog_idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) %
g_pserver->repl_backlog_size;
}
} }
/* The master client we create can be set to any DBID, because /* The master client we create can be set to any DBID, because

View File

@ -2796,6 +2796,7 @@ void resetServerStats(void) {
} }
g_pserver->stat_net_input_bytes = 0; g_pserver->stat_net_input_bytes = 0;
g_pserver->stat_net_output_bytes = 0; g_pserver->stat_net_output_bytes = 0;
g_pserver->stat_unexpected_error_replies = 0;
g_pserver->aof_delayed_fsync = 0; g_pserver->aof_delayed_fsync = 0;
} }
@ -3470,12 +3471,16 @@ void call(client *c, int flags) {
if (flags & CMD_CALL_PROPAGATE) { if (flags & CMD_CALL_PROPAGATE) {
bool multi_emitted = false; bool multi_emitted = false;
/* Wrap the commands in g_pserver->also_propagate array, /* Wrap the commands in g_pserver->also_propagate array,
* but don't wrap it if we are already in MULIT context, * but don't wrap it if we are already in MULTI context,
* in case the nested MULIT/EXEC. * in case the nested MULTI/EXEC.
* *
* And if the array contains only one command, no need to * And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */ * wrap it, since the single command is atomic. */
if (g_pserver->also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) { if (g_pserver->also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
execCommandPropagateMulti(c); execCommandPropagateMulti(c);
multi_emitted = true; multi_emitted = true;
} }
@ -4473,7 +4478,8 @@ sds genRedisInfoString(const char *section) {
"active_defrag_key_hits:%lld\r\n" "active_defrag_key_hits:%lld\r\n"
"active_defrag_key_misses:%lld\r\n" "active_defrag_key_misses:%lld\r\n"
"tracking_total_keys:%lld\r\n" "tracking_total_keys:%lld\r\n"
"tracking_total_items:%llu\r\n", "tracking_total_items:%llu\r\n"
"unexpected_error_replies:%lld\r\n",
g_pserver->stat_numconnections, g_pserver->stat_numconnections,
g_pserver->stat_numcommands, g_pserver->stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND), getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -4502,7 +4508,8 @@ sds genRedisInfoString(const char *section) {
g_pserver->stat_active_defrag_key_hits, g_pserver->stat_active_defrag_key_hits,
g_pserver->stat_active_defrag_key_misses, g_pserver->stat_active_defrag_key_misses,
(unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems()); (unsigned long long) trackingGetTotalItems(),
g_pserver->stat_unexpected_error_replies);
} }
/* Replication */ /* Replication */

View File

@ -563,6 +563,8 @@ public:
#define CMD_CALL_PROPAGATE_REPL (1<<3) #define CMD_CALL_PROPAGATE_REPL (1<<3)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) #define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
#define CMD_CALL_NOWRAP (1<<4) /* Don't wrap also propagate array into
MULTI/EXEC: the caller will handle it. */
/* Command propagation flags, see propagate() function */ /* Command propagation flags, see propagate() function */
#define PROPAGATE_NONE 0 #define PROPAGATE_NONE 0
@ -1704,6 +1706,7 @@ struct redisServer {
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
/* The following two are used to track instantaneous metrics, like /* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */ * number of operations per second, network traffic. */
struct { struct {

View File

@ -98,7 +98,7 @@ struct client;
stream *streamNew(void); stream *streamNew(void);
void freeStream(stream *s); void freeStream(stream *s);
unsigned long streamLength(const robj *subject); unsigned long streamLength(robj_roptr subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);

View File

@ -68,7 +68,7 @@ void freeStream(stream *s) {
} }
/* Return the length of a stream. */ /* Return the length of a stream. */
unsigned long streamLength(const robj *subject) { unsigned long streamLength(robj_roptr subject) {
stream *s = (stream*)ptrFromObj(subject); stream *s = (stream*)ptrFromObj(subject);
return s->length; return s->length;
} }

View File

@ -94,18 +94,19 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ #define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
/* Given client ID and timeout, write the resulting radix tree key in buf. */ /* Given client ID and timeout, write the resulting radix tree key in buf. */
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, client *c) {
timeout = htonu64(timeout); timeout = htonu64(timeout);
memcpy(buf,&timeout,sizeof(timeout)); memcpy(buf,&timeout,sizeof(timeout));
memcpy(buf+8,&id,sizeof(id)); memcpy(buf+8,&c,sizeof(c));
if (sizeof(c) == 4) memset(buf+12,0,4); /* Zero padding for 32bit target. */
} }
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write /* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
* the timeout into *toptr and the client ID into *idptr. */ * the timeout into *toptr and the client pointer into *cptr. */
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) {
memcpy(toptr,buf,sizeof(*toptr)); memcpy(toptr,buf,sizeof(*toptr));
*toptr = ntohu64(*toptr); *toptr = ntohu64(*toptr);
memcpy(idptr,buf+8,sizeof(*idptr)); memcpy(cptr,buf+8,sizeof(*cptr));
} }
/* Add the specified client id / timeout as a key in the radix tree we use /* Add the specified client id / timeout as a key in the radix tree we use
@ -114,9 +115,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
void addClientToTimeoutTable(client *c) { void addClientToTimeoutTable(client *c) {
if (c->bpop.timeout == 0) return; if (c->bpop.timeout == 0) return;
uint64_t timeout = c->bpop.timeout; uint64_t timeout = c->bpop.timeout;
uint64_t id = c->id;
unsigned char buf[CLIENT_ST_KEYLEN]; unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,id); encodeTimeoutKey(buf,timeout,c);
if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL)) if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL))
c->flags |= CLIENT_IN_TO_TABLE; c->flags |= CLIENT_IN_TO_TABLE;
} }
@ -127,40 +127,11 @@ void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return; if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE; c->flags &= ~CLIENT_IN_TO_TABLE;
uint64_t timeout = c->bpop.timeout; uint64_t timeout = c->bpop.timeout;
uint64_t id = c->id;
unsigned char buf[CLIENT_ST_KEYLEN]; unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,id); encodeTimeoutKey(buf,timeout,c);
raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL); raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL);
} }
/* This function is called in beforeSleep() in order to unblock clients
* that are waiting in blocking operations with a timeout set. */
void clientsHandleTimeout(void) {
serverAssert(GlobalLocksAcquired());
if (raxSize(g_pserver->clients_timeout_table) == 0) return;
uint64_t now = mstime();
raxIterator ri;
raxStart(&ri,g_pserver->clients_timeout_table);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
uint64_t id, timeout;
decodeTimeoutKey(ri.key,&timeout,&id);
if (timeout >= now) break; /* All the timeouts are in the future. */
client *c = lookupClientByID(id);
if (c) {
std::unique_lock<fastlock> l(c->lock, std::defer_lock);
if (FCorrectThread(c))
l.lock();
c->flags &= ~CLIENT_IN_TO_TABLE;
checkBlockedClientTimeout(c,now);
}
raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL);
raxSeek(&ri,"^",NULL,0);
}
}
/* This function is called in beforeSleep() in order to unblock clients /* This function is called in beforeSleep() in order to unblock clients
* that are waiting in blocking operations with a timeout set. */ * that are waiting in blocking operations with a timeout set. */
void handleBlockedClientsTimeout(void) { void handleBlockedClientsTimeout(void) {
@ -171,15 +142,13 @@ void handleBlockedClientsTimeout(void) {
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) { while(raxNext(&ri)) {
uint64_t id, timeout; uint64_t timeout;
decodeTimeoutKey(ri.key,&timeout,&id); client *c;
decodeTimeoutKey(ri.key,&timeout,&c);
if (timeout >= now) break; /* All the timeouts are in the future. */ if (timeout >= now) break; /* All the timeouts are in the future. */
client *c = lookupClientByID(id); std::unique_lock<fastlock> lock(c->lock);
if (c) { c->flags &= ~CLIENT_IN_TO_TABLE;
std::unique_lock<fastlock> lock(c->lock); checkBlockedClientTimeout(c,now);
c->flags &= ~CLIENT_IN_TO_TABLE;
checkBlockedClientTimeout(c,now);
}
raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL); raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
} }

View File

@ -64,7 +64,8 @@ void *threadMain(void *arg) {
RedisModule_SelectDb(ctx,9); /* Tests ran in database number 9. */ RedisModule_SelectDb(ctx,9); /* Tests ran in database number 9. */
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
RedisModule_ThreadSafeContextLock(ctx); RedisModule_ThreadSafeContextLock(ctx);
RedisModule_Replicate(ctx,"INCR","c","thread"); RedisModule_Replicate(ctx,"INCR","c","a-from-thread");
RedisModule_Replicate(ctx,"INCR","c","b-from-thread");
RedisModule_ThreadSafeContextUnlock(ctx); RedisModule_ThreadSafeContextUnlock(ctx);
} }
RedisModule_FreeThreadSafeContext(ctx); RedisModule_FreeThreadSafeContext(ctx);
@ -89,6 +90,38 @@ int propagateTestCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
return REDISMODULE_OK; return REDISMODULE_OK;
} }
int propagateTest2Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
/* Replicate two commands to test MULTI/EXEC wrapping. */
RedisModule_Replicate(ctx,"INCR","c","counter-1");
RedisModule_Replicate(ctx,"INCR","c","counter-2");
RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}
int propagateTest3Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleCallReply *reply;
/* This test mixes multiple propagation systems. */
reply = RedisModule_Call(ctx, "INCR", "c!", "using-call");
RedisModule_FreeCallReply(reply);
RedisModule_Replicate(ctx,"INCR","c","counter-1");
RedisModule_Replicate(ctx,"INCR","c","counter-2");
reply = RedisModule_Call(ctx, "INCR", "c!", "after-call");
RedisModule_FreeCallReply(reply);
RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argc);
@ -100,5 +133,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
propagateTestCommand, propagateTestCommand,
"",1,1,1) == REDISMODULE_ERR) "",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"propagate-test-2",
propagateTest2Command,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"propagate-test-3",
propagateTest3Command,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -20,11 +20,44 @@ tags "modules" {
wait_for_condition 5000 10 { wait_for_condition 5000 10 {
([$replica get timer] eq "10") && \ ([$replica get timer] eq "10") && \
([$replica get thread] eq "10") ([$replica get a-from-thread] eq "10")
} else { } else {
fail "The two counters don't match the expected value." fail "The two counters don't match the expected value."
} }
$master propagate-test-2
$master propagate-test-3
$master multi
$master propagate-test-2
$master propagate-test-3
$master exec
wait_for_ofs_sync $master $replica
assert_equal [s -1 unexpected_error_replies] 0
} }
} }
} }
} }
tags "modules aof" {
test {Modules RM_Replicate replicates MULTI/EXEC correctly} {
start_server [list overrides [list loadmodule "$testmodule"]] {
# Enable the AOF
r config set appendonly yes
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
waitForBgrewriteaof r
r propagate-test-2
r propagate-test-3
r multi
r propagate-test-2
r propagate-test-3
r exec
# Load the AOF
r debug loadaof
assert_equal [s 0 unexpected_error_replies] 0
}
}
}

View File

@ -170,6 +170,27 @@ start_server {
assert_error "*NOGROUP*" {$rd read} assert_error "*NOGROUP*" {$rd read}
} }
test {RENAME can unblock XREADGROUP with data} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r XGROUP CREATE mystream2 mygroup $ MKSTREAM
r XADD mystream2 100 f1 v1
r RENAME mystream2 mystream
assert_equal "{mystream {{100-0 {f1 v1}}}}" [$rd read] ;# mystream2 had mygroup before RENAME
}
test {RENAME can unblock XREADGROUP with -NOGROUP} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r XADD mystream2 100 f1 v1
r RENAME mystream2 mystream
assert_error "*NOGROUP*" {$rd read} ;# mystream2 didn't have mygroup before RENAME
}
test {XCLAIM can claim PEL items from another consumer} { test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group # Add 3 items into the stream, and create a consumer group
r del mystream r del mystream