Merge branch 'unstable' of github.com:/antirez/redis into unstable
This commit is contained in:
commit
faea46710c
@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
|
||||
if (streamCompareID(&s->last_id, gt) > 0) {
|
||||
streamID start = *gt;
|
||||
start.seq++; /* Can't overflow, it's an uint64_t */
|
||||
streamIncrID(&start);
|
||||
|
||||
/* Lookup the consumer for the group, if any. */
|
||||
streamConsumer *consumer = NULL;
|
||||
|
12
src/config.c
12
src/config.c
@ -2137,7 +2137,7 @@ standardConfig configs[] = {
|
||||
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
|
||||
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
|
||||
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, server.io_threads_do_reads, 0,NULL, NULL), /* Read + parse from threads? */
|
||||
createBoolConfig("lua-replicate-commands", NULL, IMMUTABLE_CONFIG, server.lua_always_replicate_commands, 1, NULL, NULL),
|
||||
createBoolConfig("lua-replicate-commands", NULL, MODIFIABLE_CONFIG, server.lua_always_replicate_commands, 1, NULL, NULL),
|
||||
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
|
||||
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
|
||||
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
|
||||
@ -2209,7 +2209,7 @@ standardConfig configs[] = {
|
||||
createIntConfig("maxmemory-samples", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.maxmemory_samples, 5, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.maxidletime, 0, INTEGER_CONFIG, NULL, NULL), /* Default client timeout: infinite */
|
||||
createIntConfig("replica-announce-port", "slave-announce-port", MODIFIABLE_CONFIG, 0, 65535, server.slave_announce_port, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("tcp-backlog", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
|
||||
createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */
|
||||
createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, NULL), /* Default: Use +10000 offset. */
|
||||
createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.port */
|
||||
createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL),
|
||||
@ -2235,9 +2235,9 @@ standardConfig configs[] = {
|
||||
createLongLongConfig("cluster-node-timeout", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.cluster_node_timeout, 15000, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("slowlog-log-slower-than", NULL, MODIFIABLE_CONFIG, -1, LLONG_MAX, server.slowlog_log_slower_than, 10000, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
|
||||
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
|
||||
createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
|
||||
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
|
||||
|
||||
/* Unsigned Long Long configs */
|
||||
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
||||
@ -2246,7 +2246,7 @@ standardConfig configs[] = {
|
||||
createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL),
|
||||
createSizeTConfig("set-max-intset-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.set_max_intset_entries, 512, INTEGER_CONFIG, NULL, NULL),
|
||||
createSizeTConfig("zset-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_entries, 128, INTEGER_CONFIG, NULL, NULL),
|
||||
createSizeTConfig("active-defrag-ignore-bytes", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_ignore_bytes, 100<<20, MEMORY_CONFIG, NULL, NULL), /* Default: don't defrag if frag overhead is below 100mb */
|
||||
createSizeTConfig("active-defrag-ignore-bytes", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.active_defrag_ignore_bytes, 100<<20, MEMORY_CONFIG, NULL, NULL), /* Default: don't defrag if frag overhead is below 100mb */
|
||||
createSizeTConfig("hash-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL),
|
||||
createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL),
|
||||
createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL),
|
||||
|
@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||
void streamDecodeID(void *buf, streamID *id);
|
||||
int streamCompareID(streamID *a, streamID *b);
|
||||
void streamFreeNACK(streamNACK *na);
|
||||
void streamIncrID(streamID *id);
|
||||
|
||||
#endif
|
||||
|
@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) {
|
||||
return s->length;
|
||||
}
|
||||
|
||||
/* Set 'id' to be its successor streamID */
|
||||
void streamIncrID(streamID *id) {
|
||||
if (id->seq == UINT64_MAX) {
|
||||
if (id->ms == UINT64_MAX) {
|
||||
/* Special case where 'id' is the last possible streamID... */
|
||||
id->ms = id->seq = 0;
|
||||
} else {
|
||||
id->ms++;
|
||||
id->seq = 0;
|
||||
}
|
||||
} else {
|
||||
id->seq++;
|
||||
}
|
||||
}
|
||||
|
||||
/* Generate the next stream item ID given the previous one. If the current
|
||||
* milliseconds Unix time is greater than the previous one, just use this
|
||||
* as time part and start with sequence part of zero. Otherwise we use the
|
||||
@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) {
|
||||
new_id->ms = ms;
|
||||
new_id->seq = 0;
|
||||
} else {
|
||||
new_id->ms = last_id->ms;
|
||||
new_id->seq = last_id->seq+1;
|
||||
*new_id = *last_id;
|
||||
streamIncrID(new_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1220,6 +1235,13 @@ void xaddCommand(client *c) {
|
||||
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
||||
s = o->ptr;
|
||||
|
||||
/* Return ASAP if the stream has reached the last possible ID */
|
||||
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
|
||||
addReplyError(c,"The stream has exhausted the last possible ID, "
|
||||
"unable to add more items");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Append using the low level function and return the ID. */
|
||||
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
|
||||
&id, id_given ? &id : NULL)
|
||||
@ -1509,7 +1531,7 @@ void xreadCommand(client *c) {
|
||||
* so start from the next ID, since we want only messages with
|
||||
* IDs greater than start. */
|
||||
streamID start = *gt;
|
||||
start.seq++; /* uint64_t can't overflow in this context. */
|
||||
streamIncrID(&start);
|
||||
|
||||
/* Emit the two elements sub-array consisting of the name
|
||||
* of the stream and the data we extracted from it. */
|
||||
|
@ -65,7 +65,7 @@ start_server {tags {"introspection"}} {
|
||||
rdbchecksum
|
||||
daemonize
|
||||
io-threads-do-reads
|
||||
lua-replicate-commands
|
||||
tcp-backlog
|
||||
always-show-logo
|
||||
syslog-enabled
|
||||
cluster-enabled
|
||||
|
@ -328,6 +328,33 @@ start_server {
|
||||
|
||||
assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
|
||||
}
|
||||
|
||||
test {XREAD streamID edge (no-blocking)} {
|
||||
r del x
|
||||
r XADD x 1-1 f v
|
||||
r XADD x 1-18446744073709551615 f v
|
||||
r XADD x 2-1 f v
|
||||
set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615]
|
||||
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
|
||||
}
|
||||
|
||||
test {XREAD streamID edge (blocking)} {
|
||||
r del x
|
||||
set rd [redis_deferring_client]
|
||||
$rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
|
||||
r XADD x 1-1 f v
|
||||
r XADD x 1-18446744073709551615 f v
|
||||
r XADD x 2-1 f v
|
||||
set res [$rd read]
|
||||
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
|
||||
}
|
||||
|
||||
test {XADD streamID edge} {
|
||||
r del x
|
||||
r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future
|
||||
r XADD x * f2 v2
|
||||
assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"stream"} overrides {appendonly yes}} {
|
||||
|
Loading…
x
Reference in New Issue
Block a user