diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 42660486e..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - "files.associations": { - "zmalloc.h": "c", - "stat.h": "c", - "array": "cpp", - "atomic": "cpp", - "*.tcc": "cpp", - "cctype": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "condition_variable": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "list": "cpp", - "unordered_map": "cpp", - "vector": "cpp", - "exception": "cpp", - "fstream": "cpp", - "functional": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "numeric": "cpp", - "optional": "cpp", - "ostream": "cpp", - "ratio": "cpp", - "scoped_allocator": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "streambuf": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "thread": "cpp", - "cinttypes": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "utility": "cpp", - "set": "cpp", - "algorithm": "cpp" - } -} diff --git a/src/expire.cpp b/src/expire.cpp index b115802b4..b7cb49eaf 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -134,6 +134,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { if (deleted) { + if (!pfat->FEmpty()) + { + // We need to resort the expire entry since it may no longer be in the correct position + auto itr = db->setexpire->find(e.key()); + expireEntry eT = std::move(e); + db->setexpire->erase(itr); + db->setexpire->insert(eT); + } + robj objT; switch (val->type) { diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4f1a9ad5e..9c7577a16 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -43,6 +43,7 @@ #include #include #include +#include "config.h" #ifdef __APPLE__ #include @@ -60,6 +61,11 @@ #define UNUSED(x) ((void)x) #endif +#ifdef HAVE_BACKTRACE +#include +__attribute__((weak)) void logStackTrace(ucontext_t *) {} +#endif + extern int g_fInCrash; /**************************************************** @@ -149,6 +155,43 @@ __attribute__((weak)) void serverLog(int , const char *fmt, ...) printf("\n"); } +extern "C" pid_t gettid() +{ + static thread_local int pidCache = -1; +#ifdef __linux__ + if (pidCache == -1) + pidCache = syscall(SYS_gettid); +#else + if (pidCache == -1) { + uint64_t tidT; + pthread_threadid_np(nullptr, &tidT); + assert(tidT < UINT_MAX); + pidCache = (int)tidT; + } +#endif + return pidCache; +} + +void printTrace() +{ +#ifdef HAVE_BACKTRACE + serverLog(3 /*LL_WARNING*/, "printing backtrace for thread %d", gettid()); + ucontext_t ctxt; + getcontext(&ctxt); + logStackTrace(&ctxt); +#endif +} + + +#ifdef __linux__ +static int futex(volatile unsigned *uaddr, int futex_op, int val, + const struct timespec *timeout, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, + timeout, uaddr, val3); +} +#endif + class DeadlockDetector { std::map m_mapwait; @@ -156,9 +199,19 @@ class DeadlockDetector public: void registerwait(fastlock *lock, pid_t thispid) { + static volatile bool fInDeadlock = false; + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); + + if (fInDeadlock) + { + printTrace(); + fastlock_unlock(&m_lock); + return; + } + m_mapwait.insert(std::make_pair(thispid, lock)); // Detect cycles @@ -184,6 +237,19 @@ public: if (pidCheck == thispid) break; } + // Wake All sleeping threads so they can print their callstacks +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + int mask = -1; + fInDeadlock = true; + fastlock_unlock(&m_lock); + futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + futex(&itr->second->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + sleep(2); + fastlock_lock(&m_lock); + printTrace(); +#endif +#endif serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); _serverPanic(__FILE__, __LINE__, "Deadlock detected"); } @@ -222,32 +288,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifdef __linux__ -static int futex(volatile unsigned *uaddr, int futex_op, int val, - const struct timespec *timeout, int val3) -{ - return syscall(SYS_futex, uaddr, futex_op, val, - timeout, uaddr, val3); -} -#endif - -extern "C" pid_t gettid() -{ - static thread_local int pidCache = -1; -#ifdef __linux__ - if (pidCache == -1) - pidCache = syscall(SYS_gettid); -#else - if (pidCache == -1) { - uint64_t tidT; - pthread_threadid_np(nullptr, &tidT); - assert(tidT < UINT_MAX); - pidCache = (int)tidT; - } -#endif - return pidCache; -} - extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) { #ifdef __linux__ diff --git a/src/networking.cpp b/src/networking.cpp index 8fe98c952..00702c656 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1745,7 +1745,10 @@ void sendReplyToClient(connection *conn) { c->lock.lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) - freeClient(c); + { + if (!freeClient(c)) + c->lock.unlock(); + } } } @@ -3230,12 +3233,23 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - client *c = serverTL->current_client; - if (c != nullptr) + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) { - serverAssert(c->flags & CLIENT_PROTECTED); - c->lock.unlock(); + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } } + aeReleaseLock(); serverAssertDebug(!GlobalLocksAcquired()); @@ -3253,18 +3267,18 @@ int processEventsWhileBlocked(int iel) { { // Caller expects us to be locked so fix and rethrow AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); throw; } AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); return count; } diff --git a/src/rdb.cpp b/src/rdb.cpp index 6554a10bb..85a0e8393 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2375,7 +2375,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { incrRefCount(subexpireKey); } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { if (key == nullptr || subexpireKey == nullptr) { - serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? szFromObj(key) : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); } else { setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); @@ -2456,6 +2456,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) { decrRefCount(key); + key = nullptr; goto eoferr; } bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; @@ -2512,8 +2513,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } - decrRefCount(key); - key = nullptr; } if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); @@ -2526,7 +2525,10 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } if (key != nullptr) + { decrRefCount(key); + key = nullptr; + } if (subexpireKey != nullptr) { @@ -2563,6 +2565,17 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * the RDB file from a socket during initial SYNC (diskless replica mode), * we'll report the error to the caller, so that we can retry. */ eoferr: + if (key != nullptr) + { + decrRefCount(key); + key = nullptr; + } + if (subexpireKey != nullptr) + { + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } + serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); diff --git a/src/server.cpp b/src/server.cpp index dbcb4f542..7e5682e67 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1042,6 +1042,7 @@ struct redisCommand redisCommandTable[] = { /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ extern "C" void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +extern "C" pid_t gettid(); /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ @@ -1079,8 +1080,8 @@ void serverLogRaw(int level, const char *msg) { } else { role_char = (listLength(g_pserver->masters) ? 'S':'M'); /* Slave or Master. */ } - fprintf(fp,"%d:%c %s %c %s\n", - (int)getpid(),role_char, buf,c[level],msg); + fprintf(fp,"%d:%d:%c %s %c %s\n", + (int)getpid(),(int)gettid(),role_char, buf,c[level],msg); } fflush(fp); diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index bad0a471c..ae4dad08b 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -220,7 +220,7 @@ start_server {tags {"expire"}} { assert {$ttl <= 98 && $ttl > 90} } - test { EXPIREMEMBER works (set) } { + test {EXPIREMEMBER works (set)} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 1 @@ -228,7 +228,7 @@ start_server {tags {"expire"}} { assert_equal {2} [r scard testkey] } - test { EXPIREMEMBER works (hash) } { + test {EXPIREMEMBER works (hash)} { r flushall r hset testkey foo bar r expiremember testkey foo 1 @@ -236,7 +236,7 @@ start_server {tags {"expire"}} { r exists testkey } {0} - test { EXPIREMEMBER works (zset) } { + test {EXPIREMEMBER works (zset)} { r flushall r zadd testkey 1 foo r zadd testkey 2 bar @@ -246,7 +246,7 @@ start_server {tags {"expire"}} { assert_equal {1} [r zcard testkey] } - test { TTL for subkey expires works } { + test {TTL for subkey expires works} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 10000 @@ -265,4 +265,13 @@ start_server {tags {"expire"}} { set ttl [r ttl foo] assert {$ttl <= 100 && $ttl > 90} } + + test {Roundtrip for subkey expires works} { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + r save + r debug reload + assert [expr [r ttl testkey foo] > 0] + } }