From fce67c145ff0379d2c4ba10cda1ac9556151d778 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 10 Dec 2019 01:01:45 -0500 Subject: [PATCH] A bunch of bug fixes Former-commit-id: 228339586a19874f869cf45dc3834a270f99768a --- src/db.cpp | 43 +++++++++++++++++++---------- src/evict.cpp | 64 ++++++++++++++++++++++++++++++++------------ src/object.cpp | 13 +++++++-- src/semiorderedset.h | 6 +++++ src/server.h | 8 +++--- src/snapshot.cpp | 8 +++--- 6 files changed, 102 insertions(+), 40 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index d2e7e9631..5bc3002ed 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2041,28 +2041,37 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) auto itr = m_pdbSnapshot->find_threadsafe(sdsKey); if (itr == m_pdbSnapshot->end()) return; // not found - if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + + if (itr.val() != nullptr) { - dictAdd(m_pdict, sdsdup(sdsKey), itr.val()); + if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + dictAdd(m_pdict, sdsdup(sdsKey), itr.val()); + } + else + { + sds strT = serializeStoredObject(itr.val()); + robj *objNew = deserializeStoredObject(this, sdsKey, strT, sdslen(strT)); + sdsfree(strT); + dictAdd(m_pdict, sdsdup(sdsKey), objNew); + serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); + serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); + } } else { - sds strT = serializeStoredObject(itr.val()); - robj *objNew = deserializeStoredObject(strT, sdslen(strT)); - sdsfree(strT); - dictAdd(m_pdict, sdsdup(sdsKey), objNew); - serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); - serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); + dictAdd(m_pdict, sdsdup(sdsKey), nullptr); } *pde = dictFind(m_pdict, sdsKey); dictAdd(m_pdictTombstone, sdsdup(sdsKey), nullptr); } } - else if (*pde != nullptr && dictGetVal(*pde) == nullptr) + + if (*pde != nullptr && dictGetVal(*pde) == nullptr) { serverAssert(m_spstorage != nullptr); m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(data, cb); + robj *o = deserializeStoredObject(this, (const char*)dictGetKey(*pde), data, cb); serverAssert(o != nullptr); dictSetVal(m_pdict, *pde, o); }); @@ -2106,6 +2115,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() { m_spstorage->clear(); storeDatabase(); + m_fAllChanged = false; } else { @@ -2182,20 +2192,25 @@ size_t redisDbPersistentData::size() const return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); } -void redisDbPersistentData::removeCachedValue(const char *key) +bool redisDbPersistentData::removeCachedValue(const char *key) { serverAssert(m_spstorage != nullptr); // First ensure its not a pending key for (auto &spkey : m_vecchanged) { if (sdscmp(spkey.get(), (sds)key) == 0) - return; // NOP + return false; // NOP } dictEntry *de = dictFind(m_pdict, key); serverAssert(de != nullptr); - decrRefCount((robj*)dictGetVal(de)); - dictSetVal(m_pdict, de, nullptr); + if (dictGetVal(de) != nullptr) + { + decrRefCount((robj*)dictGetVal(de)); + dictSetVal(m_pdict, de, nullptr); + return true; + } + return false; } void redisDbPersistentData::trackChanges() diff --git a/src/evict.cpp b/src/evict.cpp index 96a808d0b..d84fcf1e4 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -242,33 +242,46 @@ struct visitFunctor int dbid; dict *dbdict; struct evictionPoolEntry *pool; - int count; + int count = 0; + int tries = 0; bool operator()(const expireEntry &e) { dictEntry *de = dictFind(dbdict, e.key()); - processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); - ++count; - return count < g_pserver->maxmemory_samples; + if (dictGetVal(de) != nullptr) + { + processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); + ++count; + } + ++tries; + return tries < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool) +int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 }; setexpire->random_visit(visitor); + return visitor.count; } else { + int returnCount = 0; dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); - serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj - processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); + // If the object is in second tier storage we don't need to evict it (since it alrady is) + if (o != nullptr) + { + processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); + ++returnCount; + } } + return returnCount; } + return 0; } /* ---------------------------------------------------------------------------- @@ -468,6 +481,7 @@ int freeMemoryIfNeeded(void) { size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; long long delta; + int numtries = 0; int slaves = listLength(g_pserver->slaves); /* When clients are paused the dataset should be static not just from the @@ -506,16 +520,14 @@ int freeMemoryIfNeeded(void) { if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { if ((keys = db->size()) != 0) { - evictionPoolPopulate(i, db, nullptr, pool); - total_keys += keys; + total_keys += evictionPoolPopulate(i, db, nullptr, pool); } } else { keys = db->expireSize(); if (keys != 0) - evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool); - total_keys += keys; + total_keys += evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool); } } if (!total_keys) break; /* No keys to evict. */ @@ -561,9 +573,16 @@ int freeMemoryIfNeeded(void) { if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { if (db->size() != 0) { - auto itr = db->random(); - bestkey = itr.key(); - bestdbid = j; + for (int i = 0; i < 16; ++i) + { + auto itr = db->random_threadsafe(); + if (itr.val() != nullptr) + { + bestkey = itr.key(); + bestdbid = j; + break; + } + } break; } } @@ -587,7 +606,8 @@ int freeMemoryIfNeeded(void) { { // This key is in the storage so we only need to free the object delta = (long long) zmalloc_used_memory(); - db->removeCachedValue(bestkey); + if (!db->removeCachedValue(bestkey)) + keys_freed--; // didn't actuall free this one (we inc below) delta -= (long long) zmalloc_used_memory(); mem_freed += delta; } @@ -619,7 +639,9 @@ int freeMemoryIfNeeded(void) { keyobj, db->id); decrRefCount(keyobj); } - keys_freed++; + + if (delta != 0) // delta can be zero if a snapshot has a ref + keys_freed++; /* When the memory to free starts to be big enough, we may * start spending so much time here that is impossible to @@ -641,8 +663,16 @@ int freeMemoryIfNeeded(void) { } } } + ++numtries; - if (!keys_freed) { + if (keys_freed <= 0) { + if (g_pserver->m_pstorageFactory != nullptr) + { + // If we have external storage failure to evict doesn't mean there are no + // keys to free + if (bestkey != nullptr && numtries < 16) + continue; + } latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); goto cant_free; /* nothing to free... */ diff --git a/src/object.cpp b/src/object.cpp index 77c8f19d1..bfbd1524f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1509,7 +1509,7 @@ NULL void redisObject::SetFExpires(bool fExpire) { - serverAssert(this->refcount != OBJ_SHARED_REFCOUNT); + serverAssert(this->refcount != OBJ_SHARED_REFCOUNT || fExpire == FExpires()); if (fExpire) this->refcount.fetch_or(1U << 31, std::memory_order_relaxed); else @@ -1558,6 +1558,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) case OBJ_ENCODING_EMBSTR: newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); memcpy(newObject, data, cb); + newObject->SetFExpires(false); newObject->setrefcount(1); return newObject; @@ -1566,6 +1567,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) memcpy(newObject, data, sizeof(robj)); newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); + newObject->SetFExpires(false); newObject->setrefcount(1); return newObject; } @@ -1573,7 +1575,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) return nullptr; } -robj *deserializeStoredObject(const void *data, size_t cb) +robj *deserializeStoredObjectCore(const void *data, size_t cb) { switch (((char*)data)[0]) { @@ -1610,6 +1612,13 @@ robj *deserializeStoredObject(const void *data, size_t cb) serverPanic("Unknown object type loading from storage"); } +robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb) +{ + robj *o = deserializeStoredObjectCore(data, cb); + o->SetFExpires(db->setexpire()->exists(key)); + return o; +} + sds serializeStoredObject(robj_roptr o) { switch (o->type) diff --git a/src/semiorderedset.h b/src/semiorderedset.h index da704c4ce..e710c4cec 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -93,6 +93,12 @@ public: return end(); } + bool exists(const T_KEY &key) const + { + auto itr = const_cast*>(this)->find(key); + return itr != this->end(); + } + setiter end() const { setiter itr(const_cast*>(this)); diff --git a/src/server.h b/src/server.h index 5b81babc8..eee556bc5 100644 --- a/src/server.h +++ b/src/server.h @@ -1331,7 +1331,7 @@ public: dict *dictUnsafeKeyOnly() { return m_pdict; } expireset *setexpireUnsafe() { return m_setexpire; } - const expireset *setexpire() { return m_setexpire; } + const expireset *setexpire() const { return m_setexpire; } const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); @@ -1339,7 +1339,7 @@ public: void consolidate_snapshot(); bool FStorageProvider() { return m_spstorage != nullptr; } - void removeCachedValue(const char *key); + bool removeCachedValue(const char *key); private: void ensure(const char *key); @@ -1410,7 +1410,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot friend int removeExpire(redisDb *db, robj *key); friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when); friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); - friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); + friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); friend int freeMemoryIfNeeded(void); friend void activeExpireCycle(int); @@ -2620,7 +2620,7 @@ int equalStringObjects(robj *a, robj *b); unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); -robj *deserializeStoredObject(const void *data, size_t cb); +robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); sds serializeStoredObject(robj_roptr o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index b299c62ef..bd92dd7b0 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -186,14 +186,16 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn dictEntry *deExisting = dictFind(m_spdbSnapshotHOLDER->m_pdict, (const char*)dictGetKey(de)); if (deExisting != nullptr) { - decrRefCount((robj*)dictGetVal(deExisting)); + if (dictGetVal(deExisting) != nullptr) + decrRefCount((robj*)dictGetVal(deExisting)); dictSetVal(m_spdbSnapshotHOLDER->m_pdict, deExisting, dictGetVal(de)); } else { dictAdd(m_spdbSnapshotHOLDER->m_pdict, sdsdup((sds)dictGetKey(de)), dictGetVal(de)); } - incrRefCount((robj*)dictGetVal(de)); + if (dictGetVal(de) != nullptr) + incrRefCount((robj*)dictGetVal(de)); } dictReleaseIterator(di); @@ -279,7 +281,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionretrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){ - o = deserializeStoredObject(data, cb); + o = deserializeStoredObject(this, (const char*)dictGetKey(de), data, cb); }); }