6.0.15 Merge branch 'keydbpro' into PRO_RELEASE_6
Former-commit-id: 05140de5a46d05e39815472f96be526868800f30
This commit is contained in:
commit
636ea4db9f
2
deps/Makefile
vendored
2
deps/Makefile
vendored
@ -94,6 +94,6 @@ jemalloc: .make-prerequisites
|
|||||||
|
|
||||||
rocksdb: .make-prerequisites
|
rocksdb: .make-prerequisites
|
||||||
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
|
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
|
||||||
cd rocksdb && PORTABLE=1 FORCE_SSE42=1 $(MAKE) static_lib
|
cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib
|
||||||
|
|
||||||
.PHONY: rocksdb
|
.PHONY: rocksdb
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
void StorageCache::clear()
|
void StorageCache::clear()
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
if (m_setkeys != nullptr)
|
if (m_setkeys != nullptr)
|
||||||
m_setkeys->clear();
|
m_setkeys->clear();
|
||||||
m_spstorage->clear();
|
m_spstorage->clear();
|
||||||
@ -24,6 +25,7 @@ void StorageCache::cacheKey(const char *rgch, size_t cch)
|
|||||||
bool StorageCache::erase(sds key)
|
bool StorageCache::erase(sds key)
|
||||||
{
|
{
|
||||||
bool result = m_spstorage->erase(key, sdslen(key));
|
bool result = m_spstorage->erase(key, sdslen(key));
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
if (result && m_setkeys != nullptr)
|
if (result && m_setkeys != nullptr)
|
||||||
{
|
{
|
||||||
auto itr = m_setkeys->find(sdsview(key));
|
auto itr = m_setkeys->find(sdsview(key));
|
||||||
@ -35,15 +37,18 @@ bool StorageCache::erase(sds key)
|
|||||||
|
|
||||||
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
|
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
if (!fOverwrite && m_setkeys != nullptr)
|
if (!fOverwrite && m_setkeys != nullptr)
|
||||||
{
|
{
|
||||||
cacheKey(key);
|
cacheKey(key);
|
||||||
}
|
}
|
||||||
|
ul.unlock();
|
||||||
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
|
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
const StorageCache *StorageCache::clone()
|
const StorageCache *StorageCache::clone()
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
// Clones never clone the cache
|
// Clones never clone the cache
|
||||||
StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone()));
|
StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone()));
|
||||||
return cacheNew;
|
return cacheNew;
|
||||||
@ -51,6 +56,7 @@ const StorageCache *StorageCache::clone()
|
|||||||
|
|
||||||
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const
|
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
if (m_setkeys != nullptr)
|
if (m_setkeys != nullptr)
|
||||||
{
|
{
|
||||||
auto itr = m_setkeys->find(sdsview(key));
|
auto itr = m_setkeys->find(sdsview(key));
|
||||||
@ -59,11 +65,13 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey
|
|||||||
if (cachedKey != nullptr)
|
if (cachedKey != nullptr)
|
||||||
*cachedKey = sdsdupshared(itr->get());
|
*cachedKey = sdsdupshared(itr->get());
|
||||||
}
|
}
|
||||||
|
ul.unlock();
|
||||||
m_spstorage->retrieve(key, sdslen(key), fn);
|
m_spstorage->retrieve(key, sdslen(key), fn);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t StorageCache::count() const
|
size_t StorageCache::count() const
|
||||||
{
|
{
|
||||||
|
std::unique_lock<fastlock> ul(m_lock);
|
||||||
size_t count = m_spstorage->count();
|
size_t count = m_spstorage->count();
|
||||||
if (m_setkeys != nullptr)
|
if (m_setkeys != nullptr)
|
||||||
serverAssert(count == m_setkeys->size());
|
serverAssert(count == m_setkeys->size());
|
||||||
|
@ -5,6 +5,7 @@ class StorageCache
|
|||||||
{
|
{
|
||||||
std::shared_ptr<IStorage> m_spstorage;
|
std::shared_ptr<IStorage> m_spstorage;
|
||||||
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys;
|
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys;
|
||||||
|
mutable fastlock m_lock {"StorageCache"};
|
||||||
|
|
||||||
StorageCache(IStorage *storage)
|
StorageCache(IStorage *storage)
|
||||||
: m_spstorage(storage)
|
: m_spstorage(storage)
|
||||||
@ -32,7 +33,7 @@ public:
|
|||||||
StorageCache *cache = new StorageCache(nullptr);
|
StorageCache *cache = new StorageCache(nullptr);
|
||||||
if (pfactory->FSlow())
|
if (pfactory->FSlow())
|
||||||
{
|
{
|
||||||
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>();
|
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>(20);
|
||||||
}
|
}
|
||||||
load_iter_data data = {cache, fn, privdata};
|
load_iter_data data = {cache, fn, privdata};
|
||||||
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
|
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
|
||||||
|
@ -972,8 +972,8 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
|
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
{
|
{
|
||||||
g_pserver->db[idb]->processChanges(false);
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
g_pserver->db[idb]->commitChanges();
|
g_pserver->db[idb]->commitChanges();
|
||||||
}
|
}
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
freeFakeClient(fakeClient);
|
freeFakeClient(fakeClient);
|
||||||
|
@ -2181,7 +2181,7 @@ static int updateMaxmemory(long long val, long long prev, const char **err) {
|
|||||||
if ((unsigned long long)val < used) {
|
if ((unsigned long long)val < used) {
|
||||||
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used);
|
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used);
|
||||||
}
|
}
|
||||||
freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/);
|
freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false /*fPreSnapshot*/);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
111
src/db.cpp
111
src/db.cpp
@ -101,7 +101,12 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
|
|||||||
}
|
}
|
||||||
static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) {
|
static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) {
|
||||||
serverAssert((flags & LOOKUP_UPDATEMVCC) == 0);
|
serverAssert((flags & LOOKUP_UPDATEMVCC) == 0);
|
||||||
robj_roptr val = db->find(szFromObj(key));
|
robj_roptr val;
|
||||||
|
if (g_pserver->m_pstorageFactory)
|
||||||
|
val = db->find(szFromObj(key)).val();
|
||||||
|
else
|
||||||
|
val = db->find_cached_threadsafe(szFromObj(key)).val();
|
||||||
|
|
||||||
if (val != nullptr) {
|
if (val != nullptr) {
|
||||||
lookupKeyUpdateObj(val.unsafe_robjcast(), flags);
|
lookupKeyUpdateObj(val.unsafe_robjcast(), flags);
|
||||||
return val;
|
return val;
|
||||||
@ -896,7 +901,10 @@ void keysCommand(client *c) {
|
|||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
|
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
db->endSnapshot(snapshot);
|
|
||||||
|
locker.disarm();
|
||||||
|
lock.unlock();
|
||||||
|
db->endSnapshotAsync(snapshot);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -1044,7 +1052,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
// Do an async version
|
// Do an async version
|
||||||
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
||||||
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
||||||
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
|
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
|
||||||
if (snapshot != nullptr)
|
if (snapshot != nullptr)
|
||||||
{
|
{
|
||||||
aeEventLoop *el = serverTL->el;
|
aeEventLoop *el = serverTL->el;
|
||||||
@ -1082,9 +1090,16 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
|
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
|
mstime_t timeScanFilter;
|
||||||
|
latencyStartMonitor(timeScanFilter);
|
||||||
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
|
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
|
||||||
|
latencyEndMonitor(timeScanFilter);
|
||||||
|
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
|
||||||
|
|
||||||
db->endSnapshot(snapshot);
|
locker.disarm();
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
db->endSnapshotAsync(snapshot);
|
||||||
listSetFreeMethod(keys,decrRefCountVoid);
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
listRelease(keys);
|
listRelease(keys);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
@ -2323,10 +2338,12 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)
|
|||||||
serverAssert(FImplies(fAssumeNew, res == DICT_OK));
|
serverAssert(FImplies(fAssumeNew, res == DICT_OK));
|
||||||
if (res == DICT_OK)
|
if (res == DICT_OK)
|
||||||
{
|
{
|
||||||
|
#ifdef CHECKED_BUILD
|
||||||
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
|
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
|
||||||
{
|
{
|
||||||
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
|
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
trackkey(key, false /* fUpdate */);
|
trackkey(key, false /* fUpdate */);
|
||||||
}
|
}
|
||||||
return (res == DICT_OK);
|
return (res == DICT_OK);
|
||||||
@ -2546,7 +2563,7 @@ void redisDbPersistentData::storeDatabase()
|
|||||||
sdsfree(temp);
|
sdsfree(temp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::processChanges(bool fSnapshot)
|
bool redisDbPersistentData::processChanges(bool fSnapshot)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
@ -2555,6 +2572,8 @@ void redisDbPersistentData::processChanges(bool fSnapshot)
|
|||||||
|
|
||||||
if (m_spstorage != nullptr)
|
if (m_spstorage != nullptr)
|
||||||
{
|
{
|
||||||
|
if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0)
|
||||||
|
return false;
|
||||||
m_spstorage->beginWriteBatch();
|
m_spstorage->beginWriteBatch();
|
||||||
serverAssert(m_pdbSnapshotStorageFlush == nullptr);
|
serverAssert(m_pdbSnapshotStorageFlush == nullptr);
|
||||||
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100)
|
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100)
|
||||||
@ -2586,6 +2605,7 @@ void redisDbPersistentData::processChanges(bool fSnapshot)
|
|||||||
m_setchanged.clear();
|
m_setchanged.clear();
|
||||||
m_cnewKeysPending = 0;
|
m_cnewKeysPending = 0;
|
||||||
}
|
}
|
||||||
|
return (m_spstorage != nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
|
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
|
||||||
@ -2688,8 +2708,8 @@ void redisDbPersistentData::removeAllCachedValues()
|
|||||||
// First we have to flush the tracked changes
|
// First we have to flush the tracked changes
|
||||||
if (m_fTrackingChanges)
|
if (m_fTrackingChanges)
|
||||||
{
|
{
|
||||||
processChanges(false);
|
if (processChanges(false))
|
||||||
commitChanges();
|
commitChanges();
|
||||||
trackChanges(false);
|
trackChanges(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2802,3 +2822,80 @@ int dbnumFromDb(redisDb *db)
|
|||||||
}
|
}
|
||||||
serverPanic("invalid database pointer");
|
serverPanic("invalid database pointer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c)
|
||||||
|
{
|
||||||
|
if (m_spstorage == nullptr)
|
||||||
|
return;
|
||||||
|
|
||||||
|
std::vector<robj*> veckeys;
|
||||||
|
lock.arm(c);
|
||||||
|
int numkeys = 0;
|
||||||
|
int *keys = getKeysFromCommand(c->cmd, c->argv, c->argc, &numkeys);
|
||||||
|
for (int ikey = 0; ikey < numkeys; ++ikey)
|
||||||
|
{
|
||||||
|
robj *objKey = c->argv[keys[ikey]];
|
||||||
|
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
|
||||||
|
veckeys.push_back(objKey);
|
||||||
|
}
|
||||||
|
lock.disarm();
|
||||||
|
|
||||||
|
getKeysFreeResult(keys);
|
||||||
|
|
||||||
|
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
|
||||||
|
for (robj *objKey : veckeys)
|
||||||
|
{
|
||||||
|
sds sharedKey = nullptr;
|
||||||
|
std::unique_ptr<expireEntry> spexpire;
|
||||||
|
robj *o = nullptr;
|
||||||
|
m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){
|
||||||
|
size_t offset = 0;
|
||||||
|
spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset);
|
||||||
|
o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast<const char*>(data) + offset, cb - offset);
|
||||||
|
serverAssert(o != nullptr);
|
||||||
|
}, &sharedKey);
|
||||||
|
|
||||||
|
if (sharedKey == nullptr)
|
||||||
|
sharedKey = sdsdupshared(szFromObj(objKey));
|
||||||
|
|
||||||
|
vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
|
||||||
|
}
|
||||||
|
|
||||||
|
lock.arm(c);
|
||||||
|
for (auto &tuple : vecInserts)
|
||||||
|
{
|
||||||
|
sds sharedKey = std::get<0>(tuple);
|
||||||
|
robj *o = std::get<1>(tuple);
|
||||||
|
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
|
||||||
|
|
||||||
|
if (o != nullptr)
|
||||||
|
{
|
||||||
|
if (this->find_cached_threadsafe(sharedKey) != nullptr)
|
||||||
|
{
|
||||||
|
// While unlocked this was already ensured
|
||||||
|
decrRefCount(o);
|
||||||
|
sdsfree(sharedKey);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dictAdd(m_pdict, sharedKey, o);
|
||||||
|
o->SetFExpires(spexpire != nullptr);
|
||||||
|
|
||||||
|
if (spexpire != nullptr)
|
||||||
|
{
|
||||||
|
auto itr = m_setexpire->find(sharedKey);
|
||||||
|
if (itr != m_setexpire->end())
|
||||||
|
m_setexpire->erase(itr);
|
||||||
|
m_setexpire->insert(std::move(*spexpire));
|
||||||
|
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
|
||||||
|
}
|
||||||
|
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (sharedKey != nullptr)
|
||||||
|
sdsfree(sharedKey); // BUG but don't bother crashing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -425,7 +425,7 @@ size_t freeMemoryGetNotCountedMemory(void) {
|
|||||||
* limit.
|
* limit.
|
||||||
* (Populated both for C_ERR and C_OK)
|
* (Populated both for C_ERR and C_OK)
|
||||||
*/
|
*/
|
||||||
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot) {
|
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle, bool fPreSnapshot) {
|
||||||
size_t mem_reported, mem_used, mem_tofree;
|
size_t mem_reported, mem_used, mem_tofree;
|
||||||
|
|
||||||
/* Check if we are over the memory usage limit. If we are not, no need
|
/* Check if we are over the memory usage limit. If we are not, no need
|
||||||
@ -464,7 +464,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
|||||||
|
|
||||||
/* Compute how much memory we need to free. */
|
/* Compute how much memory we need to free. */
|
||||||
mem_tofree = mem_used - maxmemory;
|
mem_tofree = mem_used - maxmemory;
|
||||||
if (g_pserver->m_pstorageFactory)
|
if (g_pserver->m_pstorageFactory && !fQuickCycle)
|
||||||
{
|
{
|
||||||
mem_tofree += static_cast<size_t>(maxmemory * 0.05); // if we have a storage provider be much more aggressive
|
mem_tofree += static_cast<size_t>(maxmemory * 0.05); // if we have a storage provider be much more aggressive
|
||||||
}
|
}
|
||||||
@ -484,7 +484,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
|||||||
* were over the limit, but the attempt to free memory was successful.
|
* were over the limit, but the attempt to free memory was successful.
|
||||||
* Otehrwise if we are over the memory limit, but not enough memory
|
* Otehrwise if we are over the memory limit, but not enough memory
|
||||||
* was freed to return back under the limit, the function returns C_ERR. */
|
* was freed to return back under the limit, the function returns C_ERR. */
|
||||||
int freeMemoryIfNeeded(bool fPreSnapshot) {
|
int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
/* By default replicas should ignore maxmemory
|
/* By default replicas should ignore maxmemory
|
||||||
@ -498,12 +498,13 @@ int freeMemoryIfNeeded(bool fPreSnapshot) {
|
|||||||
const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider();
|
const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider();
|
||||||
int result = C_ERR;
|
int result = C_ERR;
|
||||||
int ckeysFailed = 0;
|
int ckeysFailed = 0;
|
||||||
|
int keys_freed = 0;
|
||||||
|
|
||||||
/* When clients are paused the dataset should be static not just from the
|
/* When clients are paused the dataset should be static not just from the
|
||||||
* POV of clients not being able to write, but also from the POV of
|
* POV of clients not being able to write, but also from the POV of
|
||||||
* expires and evictions of keys not being performed. */
|
* expires and evictions of keys not being performed. */
|
||||||
if (clientsArePaused()) return C_OK;
|
if (clientsArePaused()) return C_OK;
|
||||||
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fPreSnapshot) == C_OK)
|
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fQuickCycle,fPreSnapshot) == C_OK)
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
mem_freed = 0;
|
mem_freed = 0;
|
||||||
@ -513,7 +514,7 @@ int freeMemoryIfNeeded(bool fPreSnapshot) {
|
|||||||
goto cant_free; /* We need to free memory, but policy forbids. */
|
goto cant_free; /* We need to free memory, but policy forbids. */
|
||||||
|
|
||||||
while (mem_freed < mem_tofree) {
|
while (mem_freed < mem_tofree) {
|
||||||
int j, k, i, keys_freed = 0;
|
int j, k, i;
|
||||||
static unsigned int next_db = 0;
|
static unsigned int next_db = 0;
|
||||||
sds bestkey = NULL;
|
sds bestkey = NULL;
|
||||||
int bestdbid;
|
int bestdbid;
|
||||||
@ -737,7 +738,7 @@ cant_free:
|
|||||||
* - Nor we are loading data right now.
|
* - Nor we are loading data right now.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
int freeMemoryIfNeededAndSafe(bool fPreSnapshot) {
|
int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot) {
|
||||||
if (g_pserver->shutdown_asap || g_pserver->lua_timedout || g_pserver->loading) return C_OK;
|
if (g_pserver->shutdown_asap || g_pserver->lua_timedout || g_pserver->loading) return C_OK;
|
||||||
return freeMemoryIfNeeded(fPreSnapshot);
|
return freeMemoryIfNeeded(fQuickCycle, fPreSnapshot);
|
||||||
}
|
}
|
||||||
|
10
src/rdb.cpp
10
src/rdb.cpp
@ -2520,11 +2520,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
{
|
{
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
{
|
{
|
||||||
g_pserver->db[idb]->processChanges(false);
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
g_pserver->db[idb]->commitChanges();
|
g_pserver->db[idb]->commitChanges();
|
||||||
g_pserver->db[idb]->trackChanges(false);
|
g_pserver->db[idb]->trackChanges(false);
|
||||||
}
|
}
|
||||||
freeMemoryIfNeeded(false /* fPreSnapshot*/);
|
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2596,8 +2596,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
{
|
{
|
||||||
g_pserver->db[idb]->processChanges(false);
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
g_pserver->db[idb]->commitChanges();
|
g_pserver->db[idb]->commitChanges();
|
||||||
}
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
|
@ -1473,7 +1473,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
int delhook = 0, err;
|
int delhook = 0, err;
|
||||||
|
|
||||||
if (g_pserver->m_pstorageFactory != nullptr)
|
if (g_pserver->m_pstorageFactory != nullptr)
|
||||||
freeMemoryIfNeededAndSafe(true);
|
freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true);
|
||||||
|
|
||||||
/* When we replicate whole scripts, we want the same PRNG sequence at
|
/* When we replicate whole scripts, we want the same PRNG sequence at
|
||||||
* every call so that our PRNG is not affected by external state. */
|
* every call so that our PRNG is not affected by external state. */
|
||||||
|
@ -49,8 +49,11 @@ class semiorderedset
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
semiorderedset()
|
semiorderedset(size_t bitsStart = 0)
|
||||||
{
|
{
|
||||||
|
if (bitsStart < bits_min)
|
||||||
|
bitsStart = bits_min;
|
||||||
|
bits = bitsStart;
|
||||||
m_data.resize((1ULL << bits));
|
m_data.resize((1ULL << bits));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2014,8 +2014,8 @@ void flushStorageWeak()
|
|||||||
latencyStartMonitor(storage_process_latency);
|
latencyStartMonitor(storage_process_latency);
|
||||||
std::vector<redisDb*> vecdb;
|
std::vector<redisDb*> vecdb;
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
vecdb.push_back(g_pserver->db[idb]);
|
if (g_pserver->db[idb]->processChanges(true))
|
||||||
g_pserver->db[idb]->processChanges(true);
|
vecdb.push_back(g_pserver->db[idb]);
|
||||||
}
|
}
|
||||||
latencyEndMonitor(storage_process_latency);
|
latencyEndMonitor(storage_process_latency);
|
||||||
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
|
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
|
||||||
@ -2453,8 +2453,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
mstime_t storage_process_latency;
|
mstime_t storage_process_latency;
|
||||||
latencyStartMonitor(storage_process_latency);
|
latencyStartMonitor(storage_process_latency);
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
vecdb.push_back(g_pserver->db[idb]);
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
g_pserver->db[idb]->processChanges(false);
|
vecdb.push_back(g_pserver->db[idb]);
|
||||||
}
|
}
|
||||||
latencyEndMonitor(storage_process_latency);
|
latencyEndMonitor(storage_process_latency);
|
||||||
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
|
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
|
||||||
@ -3936,6 +3936,9 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!locker.isArmed())
|
||||||
|
c->db->prefetchKeysAsync(locker, c);
|
||||||
|
|
||||||
/* Check if the user is authenticated. This check is skipped in case
|
/* Check if the user is authenticated. This check is skipped in case
|
||||||
* the default user is flagged as "nopass" and is active. */
|
* the default user is flagged as "nopass" and is active. */
|
||||||
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
|
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
|
||||||
@ -4007,7 +4010,7 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
|
|||||||
* propagation of DELs due to eviction. */
|
* propagation of DELs due to eviction. */
|
||||||
if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
|
if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
int out_of_memory = freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/) == C_ERR;
|
int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR;
|
||||||
/* freeMemoryIfNeeded may flush replica output buffers. This may result
|
/* freeMemoryIfNeeded may flush replica output buffers. This may result
|
||||||
* into a replica, that may be the active client, to be freed. */
|
* into a replica, that may be the active client, to be freed. */
|
||||||
if (serverTL->current_client == NULL) return C_ERR;
|
if (serverTL->current_client == NULL) return C_ERR;
|
||||||
@ -4268,8 +4271,8 @@ int prepareForShutdown(int flags) {
|
|||||||
|
|
||||||
// Also Dump To FLASH if Applicable
|
// Also Dump To FLASH if Applicable
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
g_pserver->db[idb]->processChanges(false);
|
if (g_pserver->db[idb]->processChanges(false))
|
||||||
g_pserver->db[idb]->commitChanges();
|
g_pserver->db[idb]->commitChanges();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
src/server.h
13
src/server.h
@ -1329,7 +1329,7 @@ public:
|
|||||||
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
||||||
// either release the global lock or keep the same global lock between the two functions as
|
// either release the global lock or keep the same global lock between the two functions as
|
||||||
// a second look is kept to ensure writes to secondary storage are ordered
|
// a second look is kept to ensure writes to secondary storage are ordered
|
||||||
void processChanges(bool fSnapshot);
|
bool processChanges(bool fSnapshot);
|
||||||
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
|
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
|
||||||
|
|
||||||
// This should only be used if you look at the key, we do not fixup
|
// This should only be used if you look at the key, we do not fixup
|
||||||
@ -1350,6 +1350,8 @@ public:
|
|||||||
bool removeCachedValue(const char *key);
|
bool removeCachedValue(const char *key);
|
||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
|
|
||||||
|
void prefetchKeysAsync(class AeLocker &locker, client *c);
|
||||||
|
|
||||||
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
dict_iter find_cached_threadsafe(const char *key) const;
|
dict_iter find_cached_threadsafe(const char *key) const;
|
||||||
@ -1462,7 +1464,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
|
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
|
||||||
friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
|
friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
|
||||||
friend void activeDefragCycle(void);
|
friend void activeDefragCycle(void);
|
||||||
friend int freeMemoryIfNeeded(bool);
|
friend int freeMemoryIfNeeded(bool, bool);
|
||||||
friend void activeExpireCycle(int);
|
friend void activeExpireCycle(int);
|
||||||
friend void expireSlaveKeys(void);
|
friend void expireSlaveKeys(void);
|
||||||
|
|
||||||
@ -1518,6 +1520,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::removeAllCachedValues;
|
using redisDbPersistentData::removeAllCachedValues;
|
||||||
using redisDbPersistentData::dictUnsafeKeyOnly;
|
using redisDbPersistentData::dictUnsafeKeyOnly;
|
||||||
using redisDbPersistentData::resortExpire;
|
using redisDbPersistentData::resortExpire;
|
||||||
|
using redisDbPersistentData::prefetchKeysAsync;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
expireset::setiter expireitr;
|
expireset::setiter expireitr;
|
||||||
@ -2997,10 +3000,10 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec);
|
|||||||
int zslLexValueLteMax(sds value, zlexrangespec *spec);
|
int zslLexValueLteMax(sds value, zlexrangespec *spec);
|
||||||
|
|
||||||
/* Core functions */
|
/* Core functions */
|
||||||
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot=false);
|
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle = false, bool fPreSnapshot=false);
|
||||||
size_t freeMemoryGetNotCountedMemory();
|
size_t freeMemoryGetNotCountedMemory();
|
||||||
int freeMemoryIfNeeded(bool fPreSnapshot);
|
int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot);
|
||||||
int freeMemoryIfNeededAndSafe(bool fPreSnapshot);
|
int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot);
|
||||||
int processCommand(client *c, int callFlags, class AeLocker &locker);
|
int processCommand(client *c, int callFlags, class AeLocker &locker);
|
||||||
void setupSignalHandlers(void);
|
void setupSignalHandlers(void);
|
||||||
struct redisCommand *lookupCommand(sds name);
|
struct redisCommand *lookupCommand(sds name);
|
||||||
|
@ -8,7 +8,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
serverAssert(m_refCount == 0); // do not call this on a snapshot
|
serverAssert(m_refCount == 0); // do not call this on a snapshot
|
||||||
|
|
||||||
if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional)
|
if (freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true /*fPreSnapshot*/) != C_OK && fOptional)
|
||||||
return nullptr; // can't create snapshot due to OOM
|
return nullptr; // can't create snapshot due to OOM
|
||||||
|
|
||||||
int levels = 1;
|
int levels = 1;
|
||||||
@ -182,7 +182,8 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
|
|||||||
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
|
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
|
||||||
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
||||||
{
|
{
|
||||||
aeAcquireLock();
|
mstime_t latency;
|
||||||
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
||||||
{
|
{
|
||||||
// Free a stale async snapshot so consolidate_children can clean it up later
|
// Free a stale async snapshot so consolidate_children can clean it up later
|
||||||
@ -198,6 +199,8 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
{
|
{
|
||||||
// For small snapshots it makes more sense just to merge it directly
|
// For small snapshots it makes more sense just to merge it directly
|
||||||
endSnapshot(psnapshot);
|
endSnapshot(psnapshot);
|
||||||
|
latencyEndMonitor(latency);
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-synchronous-path", latency);
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -206,19 +209,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
||||||
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
||||||
psnapshot = nullptr;
|
psnapshot = nullptr;
|
||||||
aeReleaseLock();
|
aeReleaseLock(); latencyEndMonitor(latency);
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency);
|
||||||
|
|
||||||
// do the expensive work of merging snapshots outside the ref
|
// do the expensive work of merging snapshots outside the ref
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it
|
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
||||||
|
|
||||||
// Final Cleanup
|
// Final Cleanup
|
||||||
aeAcquireLock();
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
if (m_pdbSnapshotASYNC == nullptr)
|
if (m_pdbSnapshotASYNC == nullptr)
|
||||||
m_pdbSnapshotASYNC = psnapshotT;
|
m_pdbSnapshotASYNC = psnapshotT;
|
||||||
else
|
else
|
||||||
endSnapshot(psnapshotT); // finally clean up our temp snapshot
|
endSnapshot(psnapshotT); // finally clean up our temp snapshot
|
||||||
aeReleaseLock();
|
aeReleaseLock(); latencyEndMonitor(latency);
|
||||||
|
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||||
@ -232,15 +238,14 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
|||||||
|
|
||||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
size_t freed = 0;
|
|
||||||
while ((de = dictNext(di)) != nullptr)
|
while ((de = dictNext(di)) != nullptr)
|
||||||
{
|
{
|
||||||
dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de));
|
dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de));
|
||||||
if (deObj != nullptr && dictGetVal(deObj) != nullptr)
|
if (deObj != nullptr && dictGetVal(deObj) != nullptr)
|
||||||
{
|
{
|
||||||
decrRefCount((robj*)dictGetVal(deObj));
|
decrRefCount((robj*)dictGetVal(deObj));
|
||||||
deObj->v.val = nullptr;
|
void *ptrSet = nullptr;
|
||||||
++freed;
|
__atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
@ -262,6 +267,9 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mstime_t latency_endsnapshot;
|
||||||
|
latencyStartMonitor(latency_endsnapshot);
|
||||||
|
|
||||||
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
|
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
|
||||||
if (m_spdbSnapshotHOLDER->m_refCount == 1)
|
if (m_spdbSnapshotHOLDER->m_refCount == 1)
|
||||||
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
|
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
|
||||||
@ -288,9 +296,6 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mstime_t latency_endsnapshot;
|
|
||||||
latencyStartMonitor(latency_endsnapshot);
|
|
||||||
|
|
||||||
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
|
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
|
||||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -363,7 +368,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
latencyEndMonitor(latency_endsnapshot);
|
latencyEndMonitor(latency_endsnapshot);
|
||||||
latencyAddSampleIfNeeded("end-mvcc-snapshot", latency_endsnapshot);
|
latencyAddSampleIfNeeded("end-mvcc-snapshot", latency_endsnapshot);
|
||||||
|
|
||||||
freeMemoryIfNeededAndSafe(false);
|
freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const
|
dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const
|
||||||
@ -459,7 +464,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
|||||||
// Take the size so we can ensure we visited every element exactly once
|
// Take the size so we can ensure we visited every element exactly once
|
||||||
// use volatile to ensure it's not checked too late. This makes it more
|
// use volatile to ensure it's not checked too late. This makes it more
|
||||||
// likely we'll detect races (but it won't gurantee it)
|
// likely we'll detect races (but it won't gurantee it)
|
||||||
volatile size_t celem = size();
|
volatile ssize_t celem = (ssize_t)size();
|
||||||
|
|
||||||
dictEntry *de = nullptr;
|
dictEntry *de = nullptr;
|
||||||
bool fResult = true;
|
bool fResult = true;
|
||||||
@ -564,9 +569,11 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
|
|||||||
|
|
||||||
volatile size_t skipped = 0;
|
volatile size_t skipped = 0;
|
||||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
||||||
if (o != nullptr) {
|
if (o != nullptr || !m_spstorage) {
|
||||||
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
||||||
incrRefCount(o);
|
if (o != nullptr) {
|
||||||
|
incrRefCount(o);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
++skipped;
|
++skipped;
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
#ifndef __STORAGE_H__
|
#ifndef __STORAGE_H__
|
||||||
#define __STORAGE_H__
|
#define __STORAGE_H__
|
||||||
|
|
||||||
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 // Note: also defined in object.c - should always match
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
#define KEYDB_REAL_VERSION "6.0.14"
|
#define KEYDB_REAL_VERSION "6.0.15"
|
||||||
|
|
||||||
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
|
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
|
||||||
|
|
||||||
enum VersionCompareResult
|
enum VersionCompareResult
|
||||||
|
Loading…
x
Reference in New Issue
Block a user