Merge branch 'unstable' into keydbpro

Former-commit-id: 76ddbed0708277443660ffab2a2289e120fe87cd
This commit is contained in:
John Sully 2020-01-03 16:53:40 -05:00
commit 784f4c5b06
7 changed files with 63 additions and 19 deletions

View File

@ -84,7 +84,7 @@ fastlock g_lock("AE (global)");
#endif #endif
thread_local aeEventLoop *g_eventLoopThisThread = NULL; thread_local aeEventLoop *g_eventLoopThisThread = NULL;
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0) #define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)1) = 1; } while(0)
/* Include the best multiplexing layer supported by this system. /* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */ * The following should be ordered by performances, descending. */
@ -237,11 +237,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
cmd.clientData = clientData; cmd.clientData = clientData;
cmd.pctl = nullptr; cmd.pctl = nullptr;
if (fSynchronous) if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
if (fSynchronous)
cmd.pctl->mutexcv.lock(); cmd.pctl->mutexcv.lock();
}
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd)) if (size != sizeof(cmd))
{ {
@ -252,6 +252,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
if (fSynchronous) if (fSynchronous)
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
delete cmd.pctl; delete cmd.pctl;
@ -289,15 +290,17 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn); cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr; cmd.pctl = nullptr;
if (fSynchronous) if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
if (fSynchronous)
cmd.pctl->mutexcv.lock(); cmd.pctl->mutexcv.lock();
}
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(size == sizeof(cmd)); AE_ASSERT(size == sizeof(cmd));
int ret = AE_OK; int ret = AE_OK;
if (fSynchronous) if (fSynchronous)
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
delete cmd.pctl; delete cmd.pctl;

View File

@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/ /* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref); void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations. /* Defrag helper for generic allocations.
* *
@ -407,7 +407,7 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL; return NULL;
} }
void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey); auto itr = set.find(oldkey);
if (itr != set.end()) if (itr != set.end())
{ {
@ -415,7 +415,10 @@ void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
eNew.setKeyUnsafe(newkey); eNew.setKeyUnsafe(newkey);
set.erase(itr); set.erase(itr);
set.insert(eNew); set.insert(eNew);
serverAssert(set.find(newkey) != set.end());
return true;
} }
return false;
} }
long activeDefragQuickListNodes(quicklist *ql) { long activeDefragQuickListNodes(quicklist *ql) {
@ -777,18 +780,21 @@ long defragKey(redisDb *db, dictEntry *de) {
long defragged = 0; long defragged = 0;
sds newsds; sds newsds;
ob = (robj*)dictGetVal(de);
/* Try to defrag the key name. */ /* Try to defrag the key name. */
newsds = activeDefragSds(keysds); newsds = activeDefragSds(keysds);
if (newsds) if (newsds)
{
defragged++, de->key = newsds; defragged++, de->key = newsds;
if (!db->setexpire()->empty()) { if (!db->setexpire()->empty()) {
replaceSateliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds); bool fReplaced = replaceSateliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds);
serverAssert(fReplaced == ob->FExpires());
} else {
serverAssert(!ob->FExpires());
}
} }
/* Try to defrag robj and / or string value. */
ob = (robj*)dictGetVal(de);
if (ob == nullptr)
return defragged;
if ((newob = activeDefragStringOb(ob, &defragged))) { if ((newob = activeDefragStringOb(ob, &defragged))) {
de->v.val = newob; de->v.val = newob;
ob = newob; ob = newob;
@ -841,6 +847,7 @@ long defragKey(redisDb *db, dictEntry *de) {
} else { } else {
serverPanic("Unknown object type"); serverPanic("Unknown object type");
} }
return defragged; return defragged;
} }

View File

@ -293,7 +293,7 @@ extern "C" void fastlock_lock(struct fastlock *lock)
#elif defined(__arm__) #elif defined(__arm__)
__asm__ __volatile__ ("yield"); __asm__ __volatile__ ("yield");
#endif #endif
if ((++cloops % 0x10000) == 0) if ((++cloops % 0x100000) == 0)
{ {
fastlock_sleep(lock, tid, ticketT.u, mask); fastlock_sleep(lock, tid, ticketT.u, mask);
} }

View File

@ -45,7 +45,7 @@ fastlock_lock:
cmp dx, ax # is our ticket up? cmp dx, ax # is our ticket up?
je .LLocked # leave the loop je .LLocked # leave the loop
pause pause
add ecx, 0x10000 # Have we been waiting a long time? (oflow if we have) add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have)
# 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
jnc .LLoop # If so, give up our timeslice to someone who's doing real work jnc .LLoop # If so, give up our timeslice to someone who's doing real work
# Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"

View File

@ -1987,7 +1987,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
redisDb *db = g_pserver->db[dbid]; redisDb *db = g_pserver->db[dbid];
char buf[1024]; char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */ /* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0; long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
robj *subexpireKey = nullptr; robj *subexpireKey = nullptr;

View File

@ -2297,7 +2297,8 @@ int connectWithMaster(redisMaster *mi) {
fd = anetTcpNonBlockBestEffortBindConnect(NULL, fd = anetTcpNonBlockBestEffortBindConnect(NULL,
mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR); mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) { if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s", int sev = g_pserver->enable_multimaster ? LL_NOTICE : LL_WARNING; // with multimaster its not unheard of to intentiallionall have downed masters
serverLog(sev,"Unable to connect to MASTER: %s",
strerror(errno)); strerror(errno));
return C_ERR; return C_ERR;
} }
@ -3060,7 +3061,7 @@ void replicationCron(void) {
} }
/* Timed out master when we are an already connected replica? */ /* Timed out master when we are an already connected replica? */
if (mi->masterhost && mi->repl_state == REPL_STATE_CONNECTED && if (mi->masterhost && mi->master && mi->repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout) (time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout)
{ {
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");

View File

@ -219,4 +219,37 @@ start_server {tags {"expire"}} {
set ttl [r ttl foo] set ttl [r ttl foo]
assert {$ttl <= 98 && $ttl > 90} assert {$ttl <= 98 && $ttl > 90}
} }
test { EXPIREMEMBER works (set) } {
r flushall
r sadd testkey foo bar baz
r expiremember testkey foo 1
after 1500
assert_equal {2} [r scard testkey]
}
test { EXPIREMEMBER works (hash) } {
r flushall
r hset testkey foo bar
r expiremember testkey foo 1
after 1500
r exists testkey
} {0}
test { EXPIREMEMBER works (zset) } {
r flushall
r zadd testkey 1 foo
r zadd testkey 2 bar
assert_equal {2} [r zcard testkey]
r expiremember testkey foo 1
after 1500
assert_equal {1} [r zcard testkey]
}
test { TTL for subkey expires works } {
r flushall
r sadd testkey foo bar baz
r expiremember testkey foo 10000
assert [expr [r ttl testkey foo] > 0]
}
} }