A bunch of bug fixes
Former-commit-id: 228339586a19874f869cf45dc3834a270f99768a
This commit is contained in:
parent
74cbadb753
commit
fce67c145f
43
src/db.cpp
43
src/db.cpp
@ -2041,28 +2041,37 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
||||
auto itr = m_pdbSnapshot->find_threadsafe(sdsKey);
|
||||
if (itr == m_pdbSnapshot->end())
|
||||
return; // not found
|
||||
if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||
|
||||
if (itr.val() != nullptr)
|
||||
{
|
||||
dictAdd(m_pdict, sdsdup(sdsKey), itr.val());
|
||||
if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||
{
|
||||
dictAdd(m_pdict, sdsdup(sdsKey), itr.val());
|
||||
}
|
||||
else
|
||||
{
|
||||
sds strT = serializeStoredObject(itr.val());
|
||||
robj *objNew = deserializeStoredObject(this, sdsKey, strT, sdslen(strT));
|
||||
sdsfree(strT);
|
||||
dictAdd(m_pdict, sdsdup(sdsKey), objNew);
|
||||
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
|
||||
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
sds strT = serializeStoredObject(itr.val());
|
||||
robj *objNew = deserializeStoredObject(strT, sdslen(strT));
|
||||
sdsfree(strT);
|
||||
dictAdd(m_pdict, sdsdup(sdsKey), objNew);
|
||||
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
|
||||
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
|
||||
dictAdd(m_pdict, sdsdup(sdsKey), nullptr);
|
||||
}
|
||||
*pde = dictFind(m_pdict, sdsKey);
|
||||
dictAdd(m_pdictTombstone, sdsdup(sdsKey), nullptr);
|
||||
}
|
||||
}
|
||||
else if (*pde != nullptr && dictGetVal(*pde) == nullptr)
|
||||
|
||||
if (*pde != nullptr && dictGetVal(*pde) == nullptr)
|
||||
{
|
||||
serverAssert(m_spstorage != nullptr);
|
||||
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
|
||||
robj *o = deserializeStoredObject(data, cb);
|
||||
robj *o = deserializeStoredObject(this, (const char*)dictGetKey(*pde), data, cb);
|
||||
serverAssert(o != nullptr);
|
||||
dictSetVal(m_pdict, *pde, o);
|
||||
});
|
||||
@ -2106,6 +2115,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
|
||||
{
|
||||
m_spstorage->clear();
|
||||
storeDatabase();
|
||||
m_fAllChanged = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -2182,20 +2192,25 @@ size_t redisDbPersistentData::size() const
|
||||
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
||||
}
|
||||
|
||||
void redisDbPersistentData::removeCachedValue(const char *key)
|
||||
bool redisDbPersistentData::removeCachedValue(const char *key)
|
||||
{
|
||||
serverAssert(m_spstorage != nullptr);
|
||||
// First ensure its not a pending key
|
||||
for (auto &spkey : m_vecchanged)
|
||||
{
|
||||
if (sdscmp(spkey.get(), (sds)key) == 0)
|
||||
return; // NOP
|
||||
return false; // NOP
|
||||
}
|
||||
|
||||
dictEntry *de = dictFind(m_pdict, key);
|
||||
serverAssert(de != nullptr);
|
||||
decrRefCount((robj*)dictGetVal(de));
|
||||
dictSetVal(m_pdict, de, nullptr);
|
||||
if (dictGetVal(de) != nullptr)
|
||||
{
|
||||
decrRefCount((robj*)dictGetVal(de));
|
||||
dictSetVal(m_pdict, de, nullptr);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void redisDbPersistentData::trackChanges()
|
||||
|
@ -242,33 +242,46 @@ struct visitFunctor
|
||||
int dbid;
|
||||
dict *dbdict;
|
||||
struct evictionPoolEntry *pool;
|
||||
int count;
|
||||
int count = 0;
|
||||
int tries = 0;
|
||||
|
||||
bool operator()(const expireEntry &e)
|
||||
{
|
||||
dictEntry *de = dictFind(dbdict, e.key());
|
||||
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
|
||||
++count;
|
||||
return count < g_pserver->maxmemory_samples;
|
||||
if (dictGetVal(de) != nullptr)
|
||||
{
|
||||
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
|
||||
++count;
|
||||
}
|
||||
++tries;
|
||||
return tries < g_pserver->maxmemory_samples;
|
||||
}
|
||||
};
|
||||
void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool)
|
||||
int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool)
|
||||
{
|
||||
if (setexpire != nullptr)
|
||||
{
|
||||
visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 };
|
||||
setexpire->random_visit(visitor);
|
||||
return visitor.count;
|
||||
}
|
||||
else
|
||||
{
|
||||
int returnCount = 0;
|
||||
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
|
||||
int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples);
|
||||
for (int j = 0; j < count; j++) {
|
||||
robj *o = (robj*)dictGetVal(samples[j]);
|
||||
serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj
|
||||
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
|
||||
// If the object is in second tier storage we don't need to evict it (since it alrady is)
|
||||
if (o != nullptr)
|
||||
{
|
||||
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
|
||||
++returnCount;
|
||||
}
|
||||
}
|
||||
return returnCount;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------------------
|
||||
@ -468,6 +481,7 @@ int freeMemoryIfNeeded(void) {
|
||||
size_t mem_reported, mem_tofree, mem_freed;
|
||||
mstime_t latency, eviction_latency;
|
||||
long long delta;
|
||||
int numtries = 0;
|
||||
int slaves = listLength(g_pserver->slaves);
|
||||
|
||||
/* When clients are paused the dataset should be static not just from the
|
||||
@ -506,16 +520,14 @@ int freeMemoryIfNeeded(void) {
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
|
||||
{
|
||||
if ((keys = db->size()) != 0) {
|
||||
evictionPoolPopulate(i, db, nullptr, pool);
|
||||
total_keys += keys;
|
||||
total_keys += evictionPoolPopulate(i, db, nullptr, pool);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
keys = db->expireSize();
|
||||
if (keys != 0)
|
||||
evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool);
|
||||
total_keys += keys;
|
||||
total_keys += evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool);
|
||||
}
|
||||
}
|
||||
if (!total_keys) break; /* No keys to evict. */
|
||||
@ -561,9 +573,16 @@ int freeMemoryIfNeeded(void) {
|
||||
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
|
||||
{
|
||||
if (db->size() != 0) {
|
||||
auto itr = db->random();
|
||||
bestkey = itr.key();
|
||||
bestdbid = j;
|
||||
for (int i = 0; i < 16; ++i)
|
||||
{
|
||||
auto itr = db->random_threadsafe();
|
||||
if (itr.val() != nullptr)
|
||||
{
|
||||
bestkey = itr.key();
|
||||
bestdbid = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -587,7 +606,8 @@ int freeMemoryIfNeeded(void) {
|
||||
{
|
||||
// This key is in the storage so we only need to free the object
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
db->removeCachedValue(bestkey);
|
||||
if (!db->removeCachedValue(bestkey))
|
||||
keys_freed--; // didn't actuall free this one (we inc below)
|
||||
delta -= (long long) zmalloc_used_memory();
|
||||
mem_freed += delta;
|
||||
}
|
||||
@ -619,7 +639,9 @@ int freeMemoryIfNeeded(void) {
|
||||
keyobj, db->id);
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
keys_freed++;
|
||||
|
||||
if (delta != 0) // delta can be zero if a snapshot has a ref
|
||||
keys_freed++;
|
||||
|
||||
/* When the memory to free starts to be big enough, we may
|
||||
* start spending so much time here that is impossible to
|
||||
@ -641,8 +663,16 @@ int freeMemoryIfNeeded(void) {
|
||||
}
|
||||
}
|
||||
}
|
||||
++numtries;
|
||||
|
||||
if (!keys_freed) {
|
||||
if (keys_freed <= 0) {
|
||||
if (g_pserver->m_pstorageFactory != nullptr)
|
||||
{
|
||||
// If we have external storage failure to evict doesn't mean there are no
|
||||
// keys to free
|
||||
if (bestkey != nullptr && numtries < 16)
|
||||
continue;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("eviction-cycle",latency);
|
||||
goto cant_free; /* nothing to free... */
|
||||
|
@ -1509,7 +1509,7 @@ NULL
|
||||
|
||||
void redisObject::SetFExpires(bool fExpire)
|
||||
{
|
||||
serverAssert(this->refcount != OBJ_SHARED_REFCOUNT);
|
||||
serverAssert(this->refcount != OBJ_SHARED_REFCOUNT || fExpire == FExpires());
|
||||
if (fExpire)
|
||||
this->refcount.fetch_or(1U << 31, std::memory_order_relaxed);
|
||||
else
|
||||
@ -1558,6 +1558,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
|
||||
case OBJ_ENCODING_EMBSTR:
|
||||
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL);
|
||||
memcpy(newObject, data, cb);
|
||||
newObject->SetFExpires(false);
|
||||
newObject->setrefcount(1);
|
||||
return newObject;
|
||||
|
||||
@ -1566,6 +1567,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
|
||||
memcpy(newObject, data, sizeof(robj));
|
||||
newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj));
|
||||
memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj));
|
||||
newObject->SetFExpires(false);
|
||||
newObject->setrefcount(1);
|
||||
return newObject;
|
||||
}
|
||||
@ -1573,7 +1575,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
robj *deserializeStoredObject(const void *data, size_t cb)
|
||||
robj *deserializeStoredObjectCore(const void *data, size_t cb)
|
||||
{
|
||||
switch (((char*)data)[0])
|
||||
{
|
||||
@ -1610,6 +1612,13 @@ robj *deserializeStoredObject(const void *data, size_t cb)
|
||||
serverPanic("Unknown object type loading from storage");
|
||||
}
|
||||
|
||||
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb)
|
||||
{
|
||||
robj *o = deserializeStoredObjectCore(data, cb);
|
||||
o->SetFExpires(db->setexpire()->exists(key));
|
||||
return o;
|
||||
}
|
||||
|
||||
sds serializeStoredObject(robj_roptr o)
|
||||
{
|
||||
switch (o->type)
|
||||
|
@ -93,6 +93,12 @@ public:
|
||||
return end();
|
||||
}
|
||||
|
||||
bool exists(const T_KEY &key) const
|
||||
{
|
||||
auto itr = const_cast<semiorderedset<T,T_KEY,MEMMOVE_SAFE>*>(this)->find(key);
|
||||
return itr != this->end();
|
||||
}
|
||||
|
||||
setiter end() const
|
||||
{
|
||||
setiter itr(const_cast<semiorderedset<T,T_KEY,MEMMOVE_SAFE>*>(this));
|
||||
|
@ -1331,7 +1331,7 @@ public:
|
||||
dict *dictUnsafeKeyOnly() { return m_pdict; }
|
||||
|
||||
expireset *setexpireUnsafe() { return m_setexpire; }
|
||||
const expireset *setexpire() { return m_setexpire; }
|
||||
const expireset *setexpire() const { return m_setexpire; }
|
||||
|
||||
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
|
||||
void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||
@ -1339,7 +1339,7 @@ public:
|
||||
void consolidate_snapshot();
|
||||
|
||||
bool FStorageProvider() { return m_spstorage != nullptr; }
|
||||
void removeCachedValue(const char *key);
|
||||
bool removeCachedValue(const char *key);
|
||||
|
||||
private:
|
||||
void ensure(const char *key);
|
||||
@ -1410,7 +1410,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
|
||||
friend int removeExpire(redisDb *db, robj *key);
|
||||
friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when);
|
||||
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
|
||||
friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
|
||||
friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
|
||||
friend void activeDefragCycle(void);
|
||||
friend int freeMemoryIfNeeded(void);
|
||||
friend void activeExpireCycle(int);
|
||||
@ -2620,7 +2620,7 @@ int equalStringObjects(robj *a, robj *b);
|
||||
unsigned long long estimateObjectIdleTime(robj_roptr o);
|
||||
void trimStringObjectIfNeeded(robj *o);
|
||||
|
||||
robj *deserializeStoredObject(const void *data, size_t cb);
|
||||
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb);
|
||||
sds serializeStoredObject(robj_roptr o);
|
||||
|
||||
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
|
||||
|
@ -186,14 +186,16 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
dictEntry *deExisting = dictFind(m_spdbSnapshotHOLDER->m_pdict, (const char*)dictGetKey(de));
|
||||
if (deExisting != nullptr)
|
||||
{
|
||||
decrRefCount((robj*)dictGetVal(deExisting));
|
||||
if (dictGetVal(deExisting) != nullptr)
|
||||
decrRefCount((robj*)dictGetVal(deExisting));
|
||||
dictSetVal(m_spdbSnapshotHOLDER->m_pdict, deExisting, dictGetVal(de));
|
||||
}
|
||||
else
|
||||
{
|
||||
dictAdd(m_spdbSnapshotHOLDER->m_pdict, sdsdup((sds)dictGetKey(de)), dictGetVal(de));
|
||||
}
|
||||
incrRefCount((robj*)dictGetVal(de));
|
||||
if (dictGetVal(de) != nullptr)
|
||||
incrRefCount((robj*)dictGetVal(de));
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
@ -279,7 +281,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
||||
if (o == nullptr && !fKeyOnly)
|
||||
{
|
||||
m_spstorage->retrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){
|
||||
o = deserializeStoredObject(data, cb);
|
||||
o = deserializeStoredObject(this, (const char*)dictGetKey(de), data, cb);
|
||||
});
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user