Merge branch 'unstable' into keydbpro

Former-commit-id: 461eea07260a31cd75753d5b7be691f5793a6f1b
This commit is contained in:
John Sully 2020-06-07 16:41:21 -04:00
commit 4001a99481
10 changed files with 54 additions and 13 deletions

View File

@ -288,9 +288,9 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK;
}
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock)
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue)
{
if (eventLoop == g_eventLoopThisThread)
if (eventLoop == g_eventLoopThisThread && !fForceQueue)
{
fn();
return AE_OK;

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus
} // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true);
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false);
extern "C" {
#endif
void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -81,6 +81,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
robj objKey;
initStaticStringObject(objKey, (char*)e.key());
bool fTtlChanged = false;
while (!pfat->FEmpty())
{
@ -127,7 +128,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
break;
case OBJ_CRON:
executeCronJobExpireHook(e.key(), val);
{
sds keyCopy = sdsdup(e.key());
incrRefCount(val);
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{
executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy);
decrRefCount(val);
}, false, true /*fLock*/, true /*fForceQueue*/);
}
return;
case OBJ_LIST:
@ -140,16 +149,16 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
pfat->popfrontExpireEntry();
fTtlChanged = true;
}
if (!pfat->FEmpty() && fTtlChanged)
{
// We need to resort the expire entry since it may no longer be in the correct position
db->resortExpire(e);
if (deleted)
{
if (!pfat->FEmpty())
{
// We need to resort the expire entry since it may no longer be in the correct position
db->resortExpire(e);
}
switch (val->type)
{
case OBJ_SET:

View File

@ -5,7 +5,6 @@
void *operator new(size_t size, enum MALLOC_CLASS mclass);
#ifndef SANITIZE
[[deprecated]]
void *operator new(size_t size);
void operator delete(void * p) noexcept;

View File

@ -160,7 +160,7 @@ start_server {tags {"repl"}} {
test {FLUSHALL should replicate} {
r -1 flushall
if {$::valgrind} {after 2000}
if {$::valgrind} {after 2000} else {after 500}
list [r -1 dbsize] [r 0 dbsize]
} {0 0}

View File

@ -216,7 +216,7 @@ proc s {args} {
# test server, so that the test server will send them again to
# clients once the clients are idle.
proc run_solo {name code} {
if {$::numclients == 1 || $::loop || $::external} {
if {$::numclients == 1 || $::loop < 0 || $::external} {
# run_solo is not supported in these scenarios, just run the code.
eval $code
return

View File

@ -245,6 +245,7 @@ start_server {tags {"dump"}} {
set rd [redis_deferring_client]
$rd debug sleep 1.0 ; # Make second server unable to reply.
after 100
set e {}
catch {r -1 migrate $second_host $second_port key 9 500} e
assert_match {IOERR*} $e

View File

@ -291,4 +291,26 @@ start_server {tags {"expire"}} {
fail "Server reported corrupt subexpire"
}
}
test {Stress subkey expires} {
r flushall
set rd [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd2 multi
for {set j 0} {$j < 1000} {incr j} {
for {set k 0} {$k < 1000} {incr k} {
$rd hset "hash_$k" "field_$j" "foo"
$rd expiremember "hash_$k" "field_$j" [expr int(floor(rand() * 1000))] ms
if {rand() < 0.3} {
$rd2 hdel "hash_$k" "field_$j"
}
if {rand() < 0.01} {
$rd del "hash_$k"
}
}
}
$rd2 exec
after 3000
assert_equal [r dbsize] 0
}
}

View File

@ -17,7 +17,9 @@ start_server {tags {"lazyfree"}} {
fail "Memory is not reclaimed by UNLINK"
}
}
}
start_server {tags {"lazyfree"}} {
test "FLUSHDB ASYNC can reclaim memory in background" {
set orig_mem [s used_memory]
set args {}

View File

@ -176,6 +176,7 @@ start_server {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
after 100
r XADD s2 * new abcd1234
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
@ -185,6 +186,7 @@ start_server {
test {Blocking XREAD waiting old data} {
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $
after 100
r XADD s2 * foo abcd1234
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
@ -198,6 +200,7 @@ start_server {
r XDEL s1 667
set rd [redis_deferring_client]
$rd XREAD BLOCK 10 STREAMS s1 666
after 100
after 20
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}}
}
@ -206,6 +209,7 @@ start_server {
set rd [redis_deferring_client]
r del s1
$rd XREAD BLOCK 20000 STREAMS s1 $
after 100
r multi
r XADD s1 * old abcd1234
r DEL s1
@ -220,6 +224,7 @@ start_server {
set rd [redis_deferring_client]
r del s1
$rd XREAD BLOCK 20000 STREAMS s1 $
after 100
r multi
r XADD s1 * old abcd1234
r DEL s1
@ -236,6 +241,7 @@ start_server {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
after 100
r XADD s2 * new abcd1234
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
@ -246,6 +252,7 @@ start_server {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
after 100
r MULTI
r XADD s2 * field one
r XADD s2 * field two
@ -353,6 +360,7 @@ start_server {
r del x
set rd [redis_deferring_client]
$rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
after 100
r XADD x 1-1 f v
r XADD x 1-18446744073709551615 f v
r XADD x 2-1 f v