diff --git a/src/ae.cpp b/src/ae.cpp index 87212f704..90c148510 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -84,7 +84,7 @@ fastlock g_lock("AE (global)"); #endif thread_local aeEventLoop *g_eventLoopThisThread = NULL; -#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0) +#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)1) = 1; } while(0) /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ @@ -237,11 +237,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.clientData = clientData; cmd.pctl = nullptr; if (fSynchronous) + { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); - - std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); - if (fSynchronous) cmd.pctl->mutexcv.lock(); + } + auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) { @@ -252,6 +252,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { + std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; delete cmd.pctl; @@ -289,15 +290,17 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.pfn = new (MALLOC_LOCAL) std::function(fn); cmd.pctl = nullptr; if (fSynchronous) + { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); - std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); - if (fSynchronous) cmd.pctl->mutexcv.lock(); + } + auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) { + std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; delete cmd.pctl; diff --git a/src/defrag.cpp b/src/defrag.cpp index 242acd184..4b4b38997 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,7 +407,7 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); if (itr != set.end()) { @@ -415,7 +415,10 @@ void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); + serverAssert(set.find(newkey) != set.end()); + return true; } + return false; } long activeDefragQuickListNodes(quicklist *ql) { @@ -777,18 +780,21 @@ long defragKey(redisDb *db, dictEntry *de) { long defragged = 0; sds newsds; + ob = (robj*)dictGetVal(de); + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) + { defragged++, de->key = newsds; - if (!db->setexpire()->empty()) { - replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); + if (!db->setexpire()->empty()) { + bool fReplaced = replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); + serverAssert(fReplaced == ob->FExpires()); + } else { + serverAssert(!ob->FExpires()); + } } - /* Try to defrag robj and / or string value. */ - ob = (robj*)dictGetVal(de); - if (ob == nullptr) - return defragged; if ((newob = activeDefragStringOb(ob, &defragged))) { de->v.val = newob; ob = newob; @@ -841,6 +847,7 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown object type"); } + return defragged; } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 9acc854b3..afa4e4b36 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -293,7 +293,7 @@ extern "C" void fastlock_lock(struct fastlock *lock) #elif defined(__arm__) __asm__ __volatile__ ("yield"); #endif - if ((++cloops % 0x10000) == 0) + if ((++cloops % 0x100000) == 0) { fastlock_sleep(lock, tid, ticketT.u, mask); } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index f7ab6316e..7c9990a6d 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -45,7 +45,7 @@ fastlock_lock: cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause - add ecx, 0x10000 # Have we been waiting a long time? (oflow if we have) + add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) jnc .LLoop # If so, give up our timeslice to someone who's doing real work # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" diff --git a/src/rdb.cpp b/src/rdb.cpp index 5faf4958a..dea96066e 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1987,7 +1987,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { redisDb *db = g_pserver->db[dbid]; char buf[1024]; /* Key-specific attributes, set by opcodes before the key type. */ - long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); + long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; robj *subexpireKey = nullptr; diff --git a/src/replication.cpp b/src/replication.cpp index 857a5680d..730746081 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2297,7 +2297,8 @@ int connectWithMaster(redisMaster *mi) { fd = anetTcpNonBlockBestEffortBindConnect(NULL, mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { - serverLog(LL_WARNING,"Unable to connect to MASTER: %s", + int sev = g_pserver->enable_multimaster ? LL_NOTICE : LL_WARNING; // with multimaster its not unheard of to intentiallionall have downed masters + serverLog(sev,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } @@ -3060,7 +3061,7 @@ void replicationCron(void) { } /* Timed out master when we are an already connected replica? */ - if (mi->masterhost && mi->repl_state == REPL_STATE_CONNECTED && + if (mi->masterhost && mi->master && mi->repl_state == REPL_STATE_CONNECTED && (time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index de24eabed..477df0242 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -219,4 +219,37 @@ start_server {tags {"expire"}} { set ttl [r ttl foo] assert {$ttl <= 98 && $ttl > 90} } + + test { EXPIREMEMBER works (set) } { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 1 + after 1500 + assert_equal {2} [r scard testkey] + } + + test { EXPIREMEMBER works (hash) } { + r flushall + r hset testkey foo bar + r expiremember testkey foo 1 + after 1500 + r exists testkey + } {0} + + test { EXPIREMEMBER works (zset) } { + r flushall + r zadd testkey 1 foo + r zadd testkey 2 bar + assert_equal {2} [r zcard testkey] + r expiremember testkey foo 1 + after 1500 + assert_equal {1} [r zcard testkey] + } + + test { TTL for subkey expires works } { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + assert [expr [r ttl testkey foo] > 0] + } }