diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 42660486e..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,58 +0,0 @@ -{ - "files.associations": { - "zmalloc.h": "c", - "stat.h": "c", - "array": "cpp", - "atomic": "cpp", - "*.tcc": "cpp", - "cctype": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "condition_variable": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "list": "cpp", - "unordered_map": "cpp", - "vector": "cpp", - "exception": "cpp", - "fstream": "cpp", - "functional": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "numeric": "cpp", - "optional": "cpp", - "ostream": "cpp", - "ratio": "cpp", - "scoped_allocator": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "streambuf": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "thread": "cpp", - "cinttypes": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "utility": "cpp", - "set": "cpp", - "algorithm": "cpp" - } -} diff --git a/src/aof.cpp b/src/aof.cpp index b9e70734e..cb01f3978 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -899,8 +899,8 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } fclose(fp); freeFakeClient(fakeClient); diff --git a/src/db.cpp b/src/db.cpp index 9adf8cd11..1324c6e14 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1469,6 +1469,14 @@ int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { return found; } +void redisDbPersistentData::resortExpire(expireEntry &e) +{ + auto itr = m_setexpire->find(e.key()); + expireEntry eT = std::move(e); + m_setexpire->erase(itr); + m_setexpire->insert(eT); +} + /* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds @@ -2257,13 +2265,12 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -redisDbPersistentData::changelist redisDbPersistentData::processChanges() +void redisDbPersistentData::processChanges() { serverAssert(GlobalLocksAcquired()); --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); - changelist vecRet; if (m_spstorage != nullptr) { @@ -2285,23 +2292,18 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() continue; robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); - vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); + m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate); + sdsfree(temp); } } m_setchanged.clear(); m_cnewKeysPending = 0; } } - - return vecRet; } -void redisDbPersistentData::commitChanges(const changelist &vec) +void redisDbPersistentData::commitChanges() { - for (auto &pair : vec) - { - m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate); - } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); } @@ -2379,8 +2381,8 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - auto vec = processChanges(); - commitChanges(vec); + processChanges(); + commitChanges(); trackChanges(false); } diff --git a/src/evict.cpp b/src/evict.cpp index ddf8e1818..060440d25 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -429,9 +429,12 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * to subtract the slaves output buffers. We can just return ASAP. */ mem_reported = zmalloc_used_memory(); if (total) *total = mem_reported; + size_t maxmemory = g_pserver->maxmemory; + if (g_pserver->FRdbSaveInProgress()) + maxmemory *= 2; /* We may return ASAP if there is no need to compute the level. */ - int return_ok_asap = !g_pserver->maxmemory || mem_reported <= g_pserver->maxmemory; + int return_ok_asap = !maxmemory || mem_reported <= maxmemory; if (return_ok_asap && !level) return C_OK; /* Remove the size of slaves output buffers and AOF buffer from the @@ -442,20 +445,20 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev /* Compute the ratio of memory usage. */ if (level) { - if (!g_pserver->maxmemory) { + if (!maxmemory) { *level = 0; } else { - *level = (float)mem_used / (float)g_pserver->maxmemory; + *level = (float)mem_used / (float)maxmemory; } } if (return_ok_asap) return C_OK; /* Check if we are still over the memory limit. */ - if (mem_used <= g_pserver->maxmemory) return C_OK; + if (mem_used <= maxmemory) return C_OK; /* Compute how much memory we need to free. */ - mem_tofree = mem_used - g_pserver->maxmemory; + mem_tofree = mem_used - maxmemory; if (logical) *logical = mem_used; if (tofree) *tofree = mem_tofree; @@ -483,6 +486,8 @@ int freeMemoryIfNeeded(void) { mstime_t latency, eviction_latency; long long delta; int slaves = listLength(g_pserver->slaves); + const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider(); + /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of @@ -503,6 +508,7 @@ int freeMemoryIfNeeded(void) { sds bestkey = NULL; int bestdbid; redisDb *db; + bool fFallback = false; if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) @@ -534,7 +540,9 @@ int freeMemoryIfNeeded(void) { /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { - if (pool[k].key == NULL) continue; + if (pool[k].key == NULL) { + continue; + } bestdbid = pool[k].dbid; sds key = nullptr; @@ -558,11 +566,14 @@ int freeMemoryIfNeeded(void) { } } } + if (bestkey == nullptr && fEvictToStorage) + fFallback = true; } /* volatile-random and allkeys-random policy */ - else if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || - g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || + g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM + || fEvictToStorage) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to @@ -570,10 +581,10 @@ int freeMemoryIfNeeded(void) { for (i = 0; i < cserver.dbnum; i++) { j = (++next_db) % cserver.dbnum; db = g_pserver->db[j]; - if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || fFallback) { if (db->size() != 0) { - auto itr = db->random_cache_threadsafe(); + auto itr = db->random_cache_threadsafe(true /*fPrimaryOnly*/); // primary only because we can't evict a snapshot key bestkey = itr.key(); bestdbid = j; break; @@ -595,7 +606,7 @@ int freeMemoryIfNeeded(void) { if (bestkey) { db = g_pserver->db[bestdbid]; - if (!cserver.delete_on_evict && db->FStorageProvider()) + if (fEvictToStorage) { // This key is in the storage so we only need to free the object delta = (long long) zmalloc_used_memory(); diff --git a/src/expire.cpp b/src/expire.cpp index b115802b4..34051b521 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -132,8 +132,19 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { pfat->popfrontExpireEntry(); } + robj *keyobj = nullptr; + + if (deleted || pfat->FEmpty()) + keyobj = createStringObject(e.key(),sdslen(e.key())); + if (deleted) { + if (!pfat->FEmpty()) + { + // We need to resort the expire entry since it may no longer be in the correct position + db->resortExpire(e); + } + robj objT; switch (val->type) { @@ -147,10 +158,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { if (pfat->FEmpty()) { - robj *keyobj = createStringObject(e.key(),sdslen(e.key())); removeExpire(db, keyobj); - decrRefCount(keyobj); } + + if (keyobj) + decrRefCount(keyobj); } int parseUnitString(const char *sz) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4f1a9ad5e..9c7577a16 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -43,6 +43,7 @@ #include #include #include +#include "config.h" #ifdef __APPLE__ #include @@ -60,6 +61,11 @@ #define UNUSED(x) ((void)x) #endif +#ifdef HAVE_BACKTRACE +#include +__attribute__((weak)) void logStackTrace(ucontext_t *) {} +#endif + extern int g_fInCrash; /**************************************************** @@ -149,6 +155,43 @@ __attribute__((weak)) void serverLog(int , const char *fmt, ...) printf("\n"); } +extern "C" pid_t gettid() +{ + static thread_local int pidCache = -1; +#ifdef __linux__ + if (pidCache == -1) + pidCache = syscall(SYS_gettid); +#else + if (pidCache == -1) { + uint64_t tidT; + pthread_threadid_np(nullptr, &tidT); + assert(tidT < UINT_MAX); + pidCache = (int)tidT; + } +#endif + return pidCache; +} + +void printTrace() +{ +#ifdef HAVE_BACKTRACE + serverLog(3 /*LL_WARNING*/, "printing backtrace for thread %d", gettid()); + ucontext_t ctxt; + getcontext(&ctxt); + logStackTrace(&ctxt); +#endif +} + + +#ifdef __linux__ +static int futex(volatile unsigned *uaddr, int futex_op, int val, + const struct timespec *timeout, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, + timeout, uaddr, val3); +} +#endif + class DeadlockDetector { std::map m_mapwait; @@ -156,9 +199,19 @@ class DeadlockDetector public: void registerwait(fastlock *lock, pid_t thispid) { + static volatile bool fInDeadlock = false; + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); + + if (fInDeadlock) + { + printTrace(); + fastlock_unlock(&m_lock); + return; + } + m_mapwait.insert(std::make_pair(thispid, lock)); // Detect cycles @@ -184,6 +237,19 @@ public: if (pidCheck == thispid) break; } + // Wake All sleeping threads so they can print their callstacks +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + int mask = -1; + fInDeadlock = true; + fastlock_unlock(&m_lock); + futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + futex(&itr->second->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + sleep(2); + fastlock_lock(&m_lock); + printTrace(); +#endif +#endif serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); _serverPanic(__FILE__, __LINE__, "Deadlock detected"); } @@ -222,32 +288,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifdef __linux__ -static int futex(volatile unsigned *uaddr, int futex_op, int val, - const struct timespec *timeout, int val3) -{ - return syscall(SYS_futex, uaddr, futex_op, val, - timeout, uaddr, val3); -} -#endif - -extern "C" pid_t gettid() -{ - static thread_local int pidCache = -1; -#ifdef __linux__ - if (pidCache == -1) - pidCache = syscall(SYS_gettid); -#else - if (pidCache == -1) { - uint64_t tidT; - pthread_threadid_np(nullptr, &tidT); - assert(tidT < UINT_MAX); - pidCache = (int)tidT; - } -#endif - return pidCache; -} - extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) { #ifdef __linux__ diff --git a/src/networking.cpp b/src/networking.cpp index 8fe98c952..891d6c34f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -496,7 +496,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) if (c->querybuf && sdslen(c->querybuf)) { std::string str = escapeString(c->querybuf); - serverLog(LL_WARNING, "\tquerybuf: %s", str.c_str()); + printf("\tquerybuf: %s\n", str.c_str()); } c->master_error = 1; } @@ -1745,7 +1745,10 @@ void sendReplyToClient(connection *conn) { c->lock.lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) - freeClient(c); + { + if (!freeClient(c)) + c->lock.unlock(); + } } } @@ -3230,12 +3233,23 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - client *c = serverTL->current_client; - if (c != nullptr) + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) { - serverAssert(c->flags & CLIENT_PROTECTED); - c->lock.unlock(); + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } } + aeReleaseLock(); serverAssertDebug(!GlobalLocksAcquired()); @@ -3253,18 +3267,18 @@ int processEventsWhileBlocked(int iel) { { // Caller expects us to be locked so fix and rethrow AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); throw; } AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); return count; } diff --git a/src/rdb.cpp b/src/rdb.cpp index e350adf15..85a0e8393 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2231,6 +2231,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; + size_t ckeysLoaded = 0; robj *subexpireKey = nullptr; robj *key = nullptr; @@ -2374,7 +2375,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { incrRefCount(subexpireKey); } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { if (key == nullptr || subexpireKey == nullptr) { - serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? szFromObj(key) : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); } else { setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); @@ -2455,6 +2456,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) { decrRefCount(key); + key = nullptr; goto eoferr; } bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; @@ -2476,11 +2478,29 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } else { + /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, + do this every 16 keys to limit the perf impact */ + if (g_pserver->m_pstorageFactory && (ckeysLoaded % 16) == 0) + { + if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); + g_pserver->db[idb]->trackChanges(true); + } + freeMemoryIfNeeded(); + } + } + /* Add the new object in the hash table */ int fInserted = dbMerge(db, key, val, rsi && rsi->fForceSetKey); // Note: dbMerge will incrRef if (fInserted) { + ++ckeysLoaded; + /* Set the expire time if needed */ if (expiretime != -1) setExpire(NULL,db,key,nullptr,expiretime); @@ -2493,8 +2513,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } - decrRefCount(key); - key = nullptr; } if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); @@ -2507,7 +2525,10 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } if (key != nullptr) + { decrRefCount(key); + key = nullptr; + } if (subexpireKey != nullptr) { @@ -2534,8 +2555,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } return C_OK; @@ -2544,6 +2565,17 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * the RDB file from a socket during initial SYNC (diskless replica mode), * we'll report the error to the caller, so that we can retry. */ eoferr: + if (key != nullptr) + { + decrRefCount(key); + key = nullptr; + } + if (subexpireKey != nullptr) + { + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } + serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); diff --git a/src/replication.cpp b/src/replication.cpp index bebc5e6c4..afb158160 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2875,8 +2875,6 @@ void freeMasterInfo(redisMaster *mi) zfree(mi->masteruser); if (mi->repl_transfer_tmpfile) zfree(mi->repl_transfer_tmpfile); - if (mi->clientFake) - freeClient(mi->clientFake); delete mi->staleKeyMap; if (mi->cached_master != nullptr) freeClientAsync(mi->cached_master); @@ -2955,11 +2953,6 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; - if (mi->clientFake) { - freeClient(mi->clientFake); - mi->clientFake = nullptr; - - } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3856,8 +3849,13 @@ bool FInReplicaReplay() return s_pstate != nullptr && s_pstate->nesting() > 0; } +struct RemoteMasterState +{ + uint64_t mvcc = 0; + client *cFake = nullptr; +}; -static std::unordered_map g_mapmvcc; +static std::unordered_map g_mapremote; void replicaReplayCommand(client *c) { @@ -3933,12 +3931,15 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; - redisMaster *mi = s_pstate->getMi(c); - client *cFake = mi->clientFake; - if (mi->clientFakeNesting != s_pstate->nesting()) - cFake = nullptr; - serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) + RemoteMasterState &remoteState = g_mapremote[uuid]; + if (remoteState.cFake == nullptr) + remoteState.cFake = createClient(nullptr, c->iel); + else + remoteState.cFake->iel = c->iel; + + client *cFake = remoteState.cFake; + + if (mvcc != 0 && remoteState.mvcc >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3947,8 +3948,6 @@ void replicaReplayCommand(client *c) // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - if (cFake == nullptr) - cFake = createClient(nullptr, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; @@ -3961,13 +3960,15 @@ void replicaReplayCommand(client *c) bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); if (cFake->master_error) - addReplyError(c, "Error in rreplay command, please check logs"); + { + addReplyError(c, "Error in rreplay command, please check logs."); + } if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - if (mvcc > g_mapmvcc[uuid]) - g_mapmvcc[uuid] = mvcc; + if (mvcc > remoteState.mvcc) + remoteState.mvcc = mvcc; } else { @@ -3975,17 +3976,6 @@ void replicaReplayCommand(client *c) addReplyError(c, "command did not execute"); } serverAssert(sdslen(cFake->querybuf) == 0); - if (cFake->flags & CLIENT_MULTI) - { - mi->clientFake = cFake; - mi->clientFakeNesting = s_pstate->nesting(); - } - else - { - if (mi->clientFake == cFake) - mi->clientFake = nullptr; - freeClient(cFake); - } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index bb5f47a80..dc6e88938 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1042,6 +1042,7 @@ struct redisCommand redisCommandTable[] = { /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ extern "C" void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +extern "C" pid_t gettid(); /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ @@ -1079,8 +1080,8 @@ void serverLogRaw(int level, const char *msg) { } else { role_char = (listLength(g_pserver->masters) ? 'S':'M'); /* Slave or Master. */ } - fprintf(fp,"%d:%c %s %c %s\n", - (int)getpid(),role_char, buf,c[level],msg); + fprintf(fp,"%d:%d:%c %s %c %s\n", + (int)getpid(),(int)gettid(),role_char, buf,c[level],msg); } fflush(fp); @@ -2339,21 +2340,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { static thread_local bool fFirstRun = true; // note: we also copy the DB pointer in case a DB swap is done while the lock is released - std::vector> vecchanges; + std::vector vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released if (!fFirstRun) { - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - auto vec = g_pserver->db[idb]->processChanges(); - vecchanges.emplace_back(g_pserver->db[idb], std::move(vec)); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + vecdb.push_back(g_pserver->db[idb]); + g_pserver->db[idb]->processChanges(); } - } - else { + } else { fFirstRun = false; } aeReleaseLock(); - for (auto &pair : vecchanges) - pair.first->commitChanges(pair.second); + for (redisDb *db : vecdb) + db->commitChanges(); + handleClientsWithPendingWrites(iel); if (serverTL->gcEpoch != 0) @@ -3834,8 +3834,6 @@ int processCommand(client *c, int callFlags) { } } - incrementMvccTstamp(); - /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -3980,6 +3978,7 @@ int processCommand(client *c, int callFlags) { return C_OK; } locker.arm(c); + incrementMvccTstamp(); call(c,callFlags); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) @@ -5219,6 +5218,20 @@ int checkForSentinelMode(int argc, char **argv) { /* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); + + if (g_pserver->m_pstorageFactory) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + if (g_pserver->db[idb]->size() > 0) + { + serverLog(LL_NOTICE, "Not loading the RDB because a storage provider is set and the database is not empty"); + return; + } + } + serverLog(LL_NOTICE, "Loading the RDB even though we have a storage provider because the database is empty"); + } + if (g_pserver->aof_state == AOF_ON) { if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); @@ -5362,14 +5375,15 @@ void incrementMvccTstamp() msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds long long mst; - __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED); + __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_ACQUIRE); if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows { - atomicIncr(g_pserver->mvcc_tstamp, 1); + __atomic_fetch_add(&g_pserver->mvcc_tstamp, 1, __ATOMIC_RELEASE); } else { - atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)mst) << MVCC_MS_SHIFT); + uint64_t val = ((uint64_t)mst) << MVCC_MS_SHIFT; + __atomic_store(&g_pserver->mvcc_tstamp, &val, __ATOMIC_RELEASE); } } @@ -5401,7 +5415,7 @@ void OnTerminate() } } - serverAssert(false); + serverPanic("std::teminate() called"); } void *workerThreadMain(void *parg) diff --git a/src/server.h b/src/server.h index cb5ce5a3b..fe6efc11e 100644 --- a/src/server.h +++ b/src/server.h @@ -1277,6 +1277,7 @@ public: size_t expireSize() const { return m_setexpire->size(); } int removeExpire(robj *key, dict_iter itr); int removeSubkeyExpire(robj *key, robj *subkey); + void resortExpire(expireEntry &e); void clear(void(callback)(void*)); void emptyDbAsync(); // Note: If you do not need the obj then use the objless iterator version. It's faster @@ -1293,16 +1294,8 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - struct changedesc - { - sdsimmutablestring strkey; - bool fUpdate; - - changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} - }; - typedef std::vector> changelist; - changelist processChanges(); - void commitChanges(const changelist &vec); + void processChanges(); + void commitChanges(); // This should only be used if you look at the key, we do not fixup // objects stored elsewhere @@ -1326,6 +1319,13 @@ protected: uint64_t m_mvccCheckpoint = 0; private: + struct changedesc + { + sdsimmutablestring strkey; + bool fUpdate; + + changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} + }; struct changedescCmp { using is_transparent = void; // C++14 to allow comparisons with different types @@ -1380,7 +1380,7 @@ public: using redisDbPersistentData::endSnapshotAsync; using redisDbPersistentData::end; - dict_iter random_cache_threadsafe() const; + dict_iter random_cache_threadsafe(bool fPrimaryOnly = false) const; dict_iter find_cached_threadsafe(const char *key) const; expireEntry *getExpire(robj_roptr key) { return getExpire(szFromObj(key)); } @@ -1469,6 +1469,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::consolidate_snapshot; using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::dictUnsafeKeyOnly; + using redisDbPersistentData::resortExpire; public: expireset::setiter expireitr; @@ -1917,8 +1918,6 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; - client *clientFake; - int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ diff --git a/src/snapshot.cpp b/src/snapshot.cpp index ba8875ddd..a5a6830a2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -333,11 +333,11 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn serverAssert(sizeStart == size()); } -dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe() const +dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const { if (size() == 0) return dict_iter(nullptr); - if (m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0) + if (!fPrimaryOnly && m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0) { dict_iter iter(nullptr); double pctInSnapshot = (double)m_pdbSnapshot->size() / (size() + m_pdbSnapshot->size()); diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index bad0a471c..ae4dad08b 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -220,7 +220,7 @@ start_server {tags {"expire"}} { assert {$ttl <= 98 && $ttl > 90} } - test { EXPIREMEMBER works (set) } { + test {EXPIREMEMBER works (set)} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 1 @@ -228,7 +228,7 @@ start_server {tags {"expire"}} { assert_equal {2} [r scard testkey] } - test { EXPIREMEMBER works (hash) } { + test {EXPIREMEMBER works (hash)} { r flushall r hset testkey foo bar r expiremember testkey foo 1 @@ -236,7 +236,7 @@ start_server {tags {"expire"}} { r exists testkey } {0} - test { EXPIREMEMBER works (zset) } { + test {EXPIREMEMBER works (zset)} { r flushall r zadd testkey 1 foo r zadd testkey 2 bar @@ -246,7 +246,7 @@ start_server {tags {"expire"}} { assert_equal {1} [r zcard testkey] } - test { TTL for subkey expires works } { + test {TTL for subkey expires works} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 10000 @@ -265,4 +265,13 @@ start_server {tags {"expire"}} { set ttl [r ttl foo] assert {$ttl <= 100 && $ttl > 90} } + + test {Roundtrip for subkey expires works} { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + r save + r debug reload + assert [expr [r ttl testkey foo] > 0] + } }