diff --git a/src/ae.cpp b/src/ae.cpp index 58e23e3f0..d99855170 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -288,9 +288,9 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) { - if (eventLoop == g_eventLoopThisThread) + if (eventLoop == g_eventLoopThisThread && !fForceQueue) { fn(); return AE_OK; diff --git a/src/ae.h b/src/ae.h index 3f1ddbf06..fdd444d3a 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/expire.cpp b/src/expire.cpp index c999ce708..961e75393 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -81,6 +81,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { robj objKey; initStaticStringObject(objKey, (char*)e.key()); + bool fTtlChanged = false; while (!pfat->FEmpty()) { @@ -127,7 +128,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { break; case OBJ_CRON: - executeCronJobExpireHook(e.key(), val); + { + sds keyCopy = sdsdup(e.key()); + incrRefCount(val); + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{ + executeCronJobExpireHook(keyCopy, val); + sdsfree(keyCopy); + decrRefCount(val); + }, false, true /*fLock*/, true /*fForceQueue*/); + } return; case OBJ_LIST: @@ -140,16 +149,16 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); pfat->popfrontExpireEntry(); + fTtlChanged = true; } + if (!pfat->FEmpty() && fTtlChanged) + { + // We need to resort the expire entry since it may no longer be in the correct position + db->resortExpire(e); + if (deleted) { - if (!pfat->FEmpty()) - { - // We need to resort the expire entry since it may no longer be in the correct position - db->resortExpire(e); - } - switch (val->type) { case OBJ_SET: diff --git a/src/new.h b/src/new.h index d37ac04f8..573594eba 100644 --- a/src/new.h +++ b/src/new.h @@ -5,7 +5,6 @@ void *operator new(size_t size, enum MALLOC_CLASS mclass); #ifndef SANITIZE -[[deprecated]] void *operator new(size_t size); void operator delete(void * p) noexcept; diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index b67f5428c..68f1ff69c 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -160,7 +160,7 @@ start_server {tags {"repl"}} { test {FLUSHALL should replicate} { r -1 flushall - if {$::valgrind} {after 2000} + if {$::valgrind} {after 2000} else {after 500} list [r -1 dbsize] [r 0 dbsize] } {0 0} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index eed91c484..8b3596844 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -216,7 +216,7 @@ proc s {args} { # test server, so that the test server will send them again to # clients once the clients are idle. proc run_solo {name code} { - if {$::numclients == 1 || $::loop || $::external} { + if {$::numclients == 1 || $::loop < 0 || $::external} { # run_solo is not supported in these scenarios, just run the code. eval $code return diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 062d803b5..aa0f6ee11 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -245,6 +245,7 @@ start_server {tags {"dump"}} { set rd [redis_deferring_client] $rd debug sleep 1.0 ; # Make second server unable to reply. + after 100 set e {} catch {r -1 migrate $second_host $second_port key 9 500} e assert_match {IOERR*} $e diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 67b2db4c2..16006d7da 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -291,4 +291,26 @@ start_server {tags {"expire"}} { fail "Server reported corrupt subexpire" } } + + test {Stress subkey expires} { + r flushall + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + $rd2 multi + for {set j 0} {$j < 1000} {incr j} { + for {set k 0} {$k < 1000} {incr k} { + $rd hset "hash_$k" "field_$j" "foo" + $rd expiremember "hash_$k" "field_$j" [expr int(floor(rand() * 1000))] ms + if {rand() < 0.3} { + $rd2 hdel "hash_$k" "field_$j" + } + if {rand() < 0.01} { + $rd del "hash_$k" + } + } + } + $rd2 exec + after 3000 + assert_equal [r dbsize] 0 + } } diff --git a/tests/unit/lazyfree.tcl b/tests/unit/lazyfree.tcl index 8efb3aecd..be3939fd2 100644 --- a/tests/unit/lazyfree.tcl +++ b/tests/unit/lazyfree.tcl @@ -17,7 +17,9 @@ start_server {tags {"lazyfree"}} { fail "Memory is not reclaimed by UNLINK" } } +} +start_server {tags {"lazyfree"}} { test "FLUSHDB ASYNC can reclaim memory in background" { set orig_mem [s used_memory] set args {} diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index c2b524d7f..144a207b9 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -176,6 +176,7 @@ start_server { r XADD s2 * old abcd1234 set rd [redis_deferring_client] $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $ + after 100 r XADD s2 * new abcd1234 set res [$rd read] assert {[lindex $res 0 0] eq {s2}} @@ -185,6 +186,7 @@ start_server { test {Blocking XREAD waiting old data} { set rd [redis_deferring_client] $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $ + after 100 r XADD s2 * foo abcd1234 set res [$rd read] assert {[lindex $res 0 0] eq {s2}} @@ -198,6 +200,7 @@ start_server { r XDEL s1 667 set rd [redis_deferring_client] $rd XREAD BLOCK 10 STREAMS s1 666 + after 100 after 20 assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}} } @@ -206,6 +209,7 @@ start_server { set rd [redis_deferring_client] r del s1 $rd XREAD BLOCK 20000 STREAMS s1 $ + after 100 r multi r XADD s1 * old abcd1234 r DEL s1 @@ -220,6 +224,7 @@ start_server { set rd [redis_deferring_client] r del s1 $rd XREAD BLOCK 20000 STREAMS s1 $ + after 100 r multi r XADD s1 * old abcd1234 r DEL s1 @@ -236,6 +241,7 @@ start_server { r XADD s2 * old abcd1234 set rd [redis_deferring_client] $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + after 100 r XADD s2 * new abcd1234 set res [$rd read] assert {[lindex $res 0 0] eq {s2}} @@ -246,6 +252,7 @@ start_server { r XADD s2 * old abcd1234 set rd [redis_deferring_client] $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ + after 100 r MULTI r XADD s2 * field one r XADD s2 * field two @@ -353,6 +360,7 @@ start_server { r del x set rd [redis_deferring_client] $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + after 100 r XADD x 1-1 f v r XADD x 1-18446744073709551615 f v r XADD x 2-1 f v