Flash expiration (#197)
Design Doc: https://docs.google.com/document/d/1NmnYGnHLdZp-KOUCUatX5iXpF-L3YK4VUc9Lm3Tqxpo/edit?usp=sharing
This commit is contained in:
parent
7d4f461562
commit
ace783a00e
@ -2,7 +2,7 @@
|
||||
|
||||
# make the build
|
||||
git submodule init && git submodule update
|
||||
make BUILD_TLS=yes -j$(nproc) KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror'
|
||||
make BUILD_TLS=yes ENABLE_FLASH=yes -j$(nproc) KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror'
|
||||
|
||||
# gen-cert
|
||||
./utils/gen-test-certs.sh
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include <functional>
|
||||
#include "sds.h"
|
||||
#include <string>
|
||||
|
||||
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"
|
||||
|
||||
@ -43,6 +44,11 @@ public:
|
||||
endWriteBatch();
|
||||
}
|
||||
|
||||
virtual std::vector<std::string> getExpirationCandidates(unsigned int count) = 0;
|
||||
virtual std::vector<std::string> getEvictionCandidates(unsigned int count) = 0;
|
||||
virtual void setExpire(const char *key, size_t cchKey, long long expire) = 0;
|
||||
virtual void removeExpire(const char *key, size_t cchKey, long long expire) = 0;
|
||||
|
||||
virtual void beginWriteBatch() {} // NOP
|
||||
virtual void endWriteBatch() {} // NOP
|
||||
|
||||
|
@ -84,19 +84,31 @@ void StorageCache::cacheKey(const char *rgch, size_t cch)
|
||||
|
||||
bool StorageCache::erase(sds key)
|
||||
{
|
||||
unsigned long long when = 0;
|
||||
m_spstorage->retrieve(key, sdslen(key), [&when](const char *, size_t, const void * data, size_t cbdata) {
|
||||
auto e = deserializeExpire((const char *)data, cbdata, nullptr);
|
||||
if (e != nullptr)
|
||||
when = e->when();
|
||||
});
|
||||
bool result = m_spstorage->erase(key, sdslen(key));
|
||||
std::unique_lock<fastlock> ul(m_lock);
|
||||
if (result && m_pdict != nullptr)
|
||||
if (result)
|
||||
{
|
||||
uint64_t hash = dictSdsHash(key);
|
||||
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
|
||||
serverAssert(de != nullptr);
|
||||
de->v.s64--;
|
||||
serverAssert(de->v.s64 >= 0);
|
||||
if (de->v.s64 == 0) {
|
||||
dictDelete(m_pdict, reinterpret_cast<void*>(hash));
|
||||
} else {
|
||||
m_collisionCount--;
|
||||
if (m_pdict != nullptr)
|
||||
{
|
||||
uint64_t hash = dictSdsHash(key);
|
||||
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));
|
||||
serverAssert(de != nullptr);
|
||||
de->v.s64--;
|
||||
serverAssert(de->v.s64 >= 0);
|
||||
if (de->v.s64 == 0) {
|
||||
dictDelete(m_pdict, reinterpret_cast<void*>(hash));
|
||||
} else {
|
||||
m_collisionCount--;
|
||||
}
|
||||
}
|
||||
if (when != 0) {
|
||||
m_spstorage->removeExpire(key, sdslen(key), when);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@ -111,6 +123,9 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
|
||||
}
|
||||
ul.unlock();
|
||||
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
|
||||
auto e = deserializeExpire((const char *)data, cbdata, nullptr);
|
||||
if (e != nullptr)
|
||||
m_spstorage->setExpire(key, sdslen(key), e->when());
|
||||
}
|
||||
|
||||
long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing);
|
||||
@ -119,13 +134,18 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si
|
||||
std::vector<dictEntry*> vechashes;
|
||||
if (m_pdict != nullptr) {
|
||||
vechashes.reserve(celem);
|
||||
}
|
||||
|
||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||
if (m_pdict != nullptr) {
|
||||
dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry));
|
||||
de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]);
|
||||
de->v.u64 = 1;
|
||||
vechashes.push_back(de);
|
||||
}
|
||||
auto e = deserializeExpire(rgvals[ielem], rgcbvals[ielem], nullptr);
|
||||
if (e != nullptr)
|
||||
m_spstorage->setExpire(rgkeys[ielem], rgcbkeys[ielem], e->when());
|
||||
}
|
||||
|
||||
std::unique_lock<fastlock> ul(m_lock);
|
||||
|
@ -51,11 +51,18 @@ public:
|
||||
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
|
||||
bool enumerate_hashslot(IStorage::callback fn, unsigned int hashslot) const { return m_spstorage->enumerate_hashslot(fn, hashslot); }
|
||||
|
||||
std::vector<std::string> getExpirationCandidates(unsigned int count) { return m_spstorage->getExpirationCandidates(count); }
|
||||
std::vector<std::string> getEvictionCandidates(unsigned int count) { return m_spstorage->getEvictionCandidates(count); }
|
||||
void setExpire(const char *key, size_t cchKey, long long expire) { m_spstorage->setExpire(key, cchKey, expire); }
|
||||
void removeExpire(const char *key, size_t cchKey, long long expire) { m_spstorage->removeExpire(key, cchKey, expire); }
|
||||
|
||||
void beginWriteBatch();
|
||||
void endWriteBatch() { m_spstorage->endWriteBatch(); }
|
||||
void batch_lock() { return m_spstorage->batch_lock(); }
|
||||
void batch_unlock() { return m_spstorage->batch_unlock(); }
|
||||
|
||||
void flush() { m_spstorage->flush(); }
|
||||
|
||||
size_t count() const;
|
||||
|
||||
const StorageCache *clone();
|
||||
|
@ -2597,6 +2597,19 @@ static int updateMaxmemory(long long val, long long prev, const char **err) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int updateFlashMaxmemory(long long val, long long prev, const char **err) {
|
||||
UNUSED(prev);
|
||||
UNUSED(err);
|
||||
if (val && g_pserver->m_pstorageFactory) {
|
||||
size_t used = g_pserver->m_pstorageFactory->totalDiskspaceUsed();
|
||||
if ((unsigned long long)val < used) {
|
||||
serverLog(LL_WARNING,"WARNING: the new maxstorage value set via CONFIG SET (%llu) is smaller than the current storage usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxstorage, used);
|
||||
}
|
||||
performEvictions(false /*fPreSnapshot*/);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int updateGoodSlaves(long long val, long long prev, const char **err) {
|
||||
UNUSED(val);
|
||||
UNUSED(prev);
|
||||
@ -2940,7 +2953,7 @@ standardConfig configs[] = {
|
||||
|
||||
/* Unsigned Long Long configs */
|
||||
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
||||
createULongLongConfig("maxstorage", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxstorage, 0, MEMORY_CONFIG, NULL, NULL),
|
||||
createULongLongConfig("maxstorage", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxstorage, 0, MEMORY_CONFIG, NULL, updateFlashMaxmemory),
|
||||
|
||||
/* Size_t configs */
|
||||
createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL),
|
||||
|
@ -2875,10 +2875,10 @@ LNotFound:
|
||||
{
|
||||
dictAdd(m_pdict, sdsNewKey, o);
|
||||
|
||||
o->SetFExpires(spexpire != nullptr);
|
||||
if (spexpire != nullptr) {
|
||||
o->expire = std::move(*spexpire);
|
||||
}
|
||||
o->SetFExpires(spexpire != nullptr);
|
||||
g_pserver->stat_storage_provider_read_hits++;
|
||||
} else {
|
||||
sdsfree(sdsNewKey);
|
||||
@ -3249,8 +3249,8 @@ std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size
|
||||
if (subkey)
|
||||
sdsfree(subkey);
|
||||
}
|
||||
|
||||
*poffset = offset;
|
||||
if (poffset != nullptr)
|
||||
*poffset = offset;
|
||||
return spexpire;
|
||||
}
|
||||
|
||||
|
@ -308,7 +308,7 @@ void computeDatasetDigest(unsigned char *final) {
|
||||
mixDigest(final,&aux,sizeof(aux));
|
||||
|
||||
/* Iterate this DB writing every entry */
|
||||
db->iterate_threadsafe([final, db](const char *key, robj_roptr o)->bool {
|
||||
db->iterate_threadsafe([final](const char *key, robj_roptr o)->bool {
|
||||
unsigned char digest[20];
|
||||
robj *keyobj;
|
||||
|
||||
@ -932,6 +932,21 @@ NULL
|
||||
mallctl_string(c, c->argv+2, c->argc-2);
|
||||
return;
|
||||
#endif
|
||||
} else if(!strcasecmp(szFromObj(c->argv[1]),"flush-storage") && c->argc == 2) {
|
||||
if (g_pserver->m_pstorageFactory != nullptr) {
|
||||
for (int i = 0; i < cserver.dbnum; i++) {
|
||||
g_pserver->db[i]->getStorageCache()->flush();
|
||||
}
|
||||
addReply(c,shared.ok);
|
||||
} else {
|
||||
addReplyError(c, "Can't flush storage if no storage provider is set");
|
||||
}
|
||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"get-storage-usage") && c->argc == 2) {
|
||||
if (g_pserver->m_pstorageFactory != nullptr) {
|
||||
addReplyLongLong(c, g_pserver->m_pstorageFactory->totalDiskspaceUsed());
|
||||
} else {
|
||||
addReplyLongLong(c, 0);
|
||||
}
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
|
140
src/evict.cpp
140
src/evict.cpp
@ -100,6 +100,36 @@ unsigned long long estimateObjectIdleTime(robj_roptr o) {
|
||||
}
|
||||
}
|
||||
|
||||
unsigned long long getIdle(robj *obj, const expireEntry *e) {
|
||||
unsigned long long idle;
|
||||
/* Calculate the idle time according to the policy. This is called
|
||||
* idle just because the code initially handled LRU, but is in fact
|
||||
* just a score where an higher score means better candidate. */
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) {
|
||||
idle = (obj != nullptr) ? estimateObjectIdleTime(obj) : 0;
|
||||
} else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
/* When we use an LRU policy, we sort the keys by idle time
|
||||
* so that we expire keys starting from greater idle time.
|
||||
* However when the policy is an LFU one, we have a frequency
|
||||
* estimation, and we want to evict keys with lower frequency
|
||||
* first. So inside the pool we put objects using the inverted
|
||||
* frequency subtracting the actual frequency to the maximum
|
||||
* frequency of 255. */
|
||||
idle = 255-LFUDecrAndReturn(obj);
|
||||
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
|
||||
/* In this case the sooner the expire the better. */
|
||||
if (e != nullptr)
|
||||
idle = ULLONG_MAX - e->when();
|
||||
else
|
||||
idle = 0;
|
||||
} else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
|
||||
idle = ULLONG_MAX;
|
||||
} else {
|
||||
serverPanic("Unknown eviction policy in storage eviction");
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
|
||||
/* LRU approximation algorithm
|
||||
*
|
||||
* Redis uses an approximation of the LRU algorithm that runs in constant
|
||||
@ -137,28 +167,7 @@ void evictionPoolAlloc(void) {
|
||||
|
||||
void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool)
|
||||
{
|
||||
unsigned long long idle;
|
||||
|
||||
/* Calculate the idle time according to the policy. This is called
|
||||
* idle just because the code initially handled LRU, but is in fact
|
||||
* just a score where an higher score means better candidate. */
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) {
|
||||
idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0;
|
||||
} else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
/* When we use an LRU policy, we sort the keys by idle time
|
||||
* so that we expire keys starting from greater idle time.
|
||||
* However when the policy is an LFU one, we have a frequency
|
||||
* estimation, and we want to evict keys with lower frequency
|
||||
* first. So inside the pool we put objects using the inverted
|
||||
* frequency subtracting the actual frequency to the maximum
|
||||
* frequency of 255. */
|
||||
idle = 255-LFUDecrAndReturn(o);
|
||||
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
|
||||
/* In this case the sooner the expire the better. */
|
||||
idle = ULLONG_MAX - e->when();
|
||||
} else {
|
||||
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
|
||||
}
|
||||
unsigned long long idle = getIdle(o,e);
|
||||
|
||||
/* Insert the element inside the pool.
|
||||
* First, find the first empty bucket or the first populated
|
||||
@ -600,6 +609,31 @@ static unsigned long evictionTimeLimitUs() {
|
||||
return ULONG_MAX; /* No limit to eviction time */
|
||||
}
|
||||
|
||||
void evict(redisDb *db, robj *keyobj) {
|
||||
mstime_t eviction_latency;
|
||||
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction);
|
||||
/* We compute the amount of memory freed by db*Delete() alone.
|
||||
* It is possible that actually the memory needed to propagate
|
||||
* the DEL in AOF and replication link is greater than the one
|
||||
* we are freeing removing the key, but we can't account for
|
||||
* that otherwise we would never exit the loop.
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space. */
|
||||
latencyStartMonitor(eviction_latency);
|
||||
if (g_pserver->lazyfree_lazy_eviction)
|
||||
dbAsyncDelete(db,keyobj);
|
||||
else
|
||||
dbSyncDelete(db,keyobj);
|
||||
latencyEndMonitor(eviction_latency);
|
||||
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
|
||||
|
||||
signalModifiedKey(NULL,db,keyobj);
|
||||
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
|
||||
keyobj, db->id);
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
|
||||
static void updateSysAvailableMemory() {
|
||||
if (g_pserver->force_eviction_percent) {
|
||||
g_pserver->cron_malloc_stats.sys_available = getMemAvailable();
|
||||
@ -637,7 +671,7 @@ int performEvictions(bool fPreSnapshot) {
|
||||
int keys_freed = 0;
|
||||
size_t mem_reported, mem_tofree;
|
||||
long long mem_freed; /* May be negative */
|
||||
mstime_t latency, eviction_latency;
|
||||
mstime_t latency;
|
||||
long long delta;
|
||||
int slaves = listLength(g_pserver->slaves);
|
||||
const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider();
|
||||
@ -662,6 +696,43 @@ int performEvictions(bool fPreSnapshot) {
|
||||
monotime evictionTimer;
|
||||
elapsedStart(&evictionTimer);
|
||||
|
||||
if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr) {
|
||||
while (g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage && elapsedUs(evictionTimer) < eviction_time_limit_us) {
|
||||
redisDb *db;
|
||||
std::vector<std::string> evictionPool;
|
||||
robj *bestkey = nullptr;
|
||||
redisDb *bestdb = nullptr;
|
||||
unsigned long long bestidle = 0;
|
||||
for (int i = 0; i < cserver.dbnum; i++) {
|
||||
db = g_pserver->db[i];
|
||||
evictionPool = db->getStorageCache()->getEvictionCandidates(g_pserver->maxmemory_samples);
|
||||
for (std::string key : evictionPool) {
|
||||
robj *keyobj = createStringObject(key.c_str(), key.size());
|
||||
robj *obj = db->find(szFromObj(keyobj));
|
||||
if (obj != nullptr) {
|
||||
expireEntry *e = db->getExpire(keyobj);
|
||||
unsigned long long idle = getIdle(obj, e);
|
||||
|
||||
if (bestkey == nullptr || bestidle < idle) {
|
||||
if (bestkey != nullptr)
|
||||
decrRefCount(bestkey);
|
||||
incrRefCount(keyobj);
|
||||
bestkey = keyobj;
|
||||
bestidle = idle;
|
||||
bestdb = db;
|
||||
}
|
||||
}
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
}
|
||||
if (bestkey) {
|
||||
evict(bestdb, bestkey);
|
||||
} else {
|
||||
break; //could not find a key to evict so stop now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr && g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage)
|
||||
goto cant_free_storage;
|
||||
|
||||
@ -776,7 +847,7 @@ int performEvictions(bool fPreSnapshot) {
|
||||
if (db->removeCachedValue(bestkey, &deT)) {
|
||||
mem_freed += splazy->addEntry(db->dictUnsafeKeyOnly(), deT);
|
||||
ckeysFailed = 0;
|
||||
g_pserver->stat_evictedkeys++;
|
||||
g_pserver->stat_evictedkeys++;
|
||||
}
|
||||
else {
|
||||
delta = 0;
|
||||
@ -788,30 +859,11 @@ int performEvictions(bool fPreSnapshot) {
|
||||
else
|
||||
{
|
||||
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
||||
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction);
|
||||
/* We compute the amount of memory freed by db*Delete() alone.
|
||||
* It is possible that actually the memory needed to propagate
|
||||
* the DEL in AOF and replication link is greater than the one
|
||||
* we are freeing removing the key, but we can't account for
|
||||
* that otherwise we would never exit the loop.
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space. */
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
latencyStartMonitor(eviction_latency);
|
||||
if (g_pserver->lazyfree_lazy_eviction)
|
||||
dbAsyncDelete(db,keyobj);
|
||||
else
|
||||
dbSyncDelete(db,keyobj);
|
||||
latencyEndMonitor(eviction_latency);
|
||||
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
|
||||
evict(db, keyobj);
|
||||
delta -= (long long) zmalloc_used_memory();
|
||||
mem_freed += delta;
|
||||
g_pserver->stat_evictedkeys++;
|
||||
signalModifiedKey(NULL,db,keyobj);
|
||||
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
|
||||
keyobj, db->id);
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
keys_freed++;
|
||||
|
||||
|
258
src/expire.cpp
258
src/expire.cpp
@ -396,126 +396,152 @@ void pexpireMemberAtCommand(client *c)
|
||||
* distribute the time evenly across DBs. */
|
||||
current_db++;
|
||||
|
||||
/* Continue to expire if at the end of the cycle there are still
|
||||
* a big percentage of keys to expire, compared to the number of keys
|
||||
* we scanned. The percentage, stored in config_cycle_acceptable_stale
|
||||
* is not fixed, but depends on the Redis configured "expire effort". */
|
||||
do {
|
||||
unsigned long num, slots;
|
||||
long long now, ttl_sum;
|
||||
int ttl_samples;
|
||||
iteration++;
|
||||
if (g_pserver->m_pstorageFactory == nullptr) {
|
||||
/* Continue to expire if at the end of the cycle there are still
|
||||
* a big percentage of keys to expire, compared to the number of keys
|
||||
* we scanned. The percentage, stored in config_cycle_acceptable_stale
|
||||
* is not fixed, but depends on the Redis configured "expire effort". */
|
||||
do {
|
||||
unsigned long num, slots;
|
||||
long long now, ttl_sum;
|
||||
int ttl_samples;
|
||||
iteration++;
|
||||
|
||||
/* If there is nothing to expire try next DB ASAP. */
|
||||
if (db->expireSize() == 0) {
|
||||
db->avg_ttl = 0;
|
||||
break;
|
||||
}
|
||||
num = dictSize(db->m_pdict);
|
||||
slots = dictSlots(db->m_pdict);
|
||||
now = mstime();
|
||||
|
||||
/* When there are less than 1% filled slots, sampling the key
|
||||
* space is expensive, so stop here waiting for better times...
|
||||
* The dictionary will be resized asap. */
|
||||
if (slots > DICT_HT_INITIAL_SIZE &&
|
||||
(num*100/slots < 1)) break;
|
||||
|
||||
/* The main collection cycle. Sample random keys among keys
|
||||
* with an expire set, checking for expired ones. */
|
||||
expired = 0;
|
||||
sampled = 0;
|
||||
ttl_sum = 0;
|
||||
ttl_samples = 0;
|
||||
|
||||
if (num > config_keys_per_loop)
|
||||
num = config_keys_per_loop;
|
||||
|
||||
/* Here we access the low level representation of the hash table
|
||||
* for speed concerns: this makes this code coupled with dict.c,
|
||||
* but it hardly changed in ten years.
|
||||
*
|
||||
* Note that certain places of the hash table may be empty,
|
||||
* so we want also a stop condition about the number of
|
||||
* buckets that we scanned. However scanning for free buckets
|
||||
* is very fast: we are in the cache line scanning a sequential
|
||||
* array of NULL pointers, so we can scan a lot more buckets
|
||||
* than keys in the same time. */
|
||||
long max_buckets = num*20;
|
||||
long checked_buckets = 0;
|
||||
|
||||
while (sampled < num && checked_buckets < max_buckets) {
|
||||
for (int table = 0; table < 2; table++) {
|
||||
if (table == 1 && !dictIsRehashing(db->m_pdict)) break;
|
||||
|
||||
unsigned long idx = db->expires_cursor;
|
||||
idx &= db->m_pdict->ht[table].sizemask;
|
||||
dictEntry *de = db->m_pdict->ht[table].table[idx];
|
||||
long long ttl;
|
||||
|
||||
/* Scan the current bucket of the current table. */
|
||||
checked_buckets++;
|
||||
while(de) {
|
||||
/* Get the next entry now since this entry may get
|
||||
* deleted. */
|
||||
dictEntry *e = de;
|
||||
robj *o = (robj*)dictGetVal(de);
|
||||
de = de->next;
|
||||
if (!o->FExpires())
|
||||
continue;
|
||||
|
||||
expireEntry *exp = &o->expire;
|
||||
|
||||
serverAssert(exp->when() > 0);
|
||||
ttl = exp->when()-now;
|
||||
size_t tried = 0;
|
||||
if (exp->when() <= now) {
|
||||
if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++;
|
||||
serverAssert(ttl <= 0);
|
||||
} else {
|
||||
serverAssert(ttl > 0);
|
||||
}
|
||||
if (ttl > 0) {
|
||||
/* We want the average TTL of keys yet
|
||||
* not expired. */
|
||||
ttl_sum += ttl;
|
||||
ttl_samples++;
|
||||
}
|
||||
sampled++;
|
||||
}
|
||||
}
|
||||
db->expires_cursor++;
|
||||
}
|
||||
total_expired += expired;
|
||||
total_sampled += sampled;
|
||||
|
||||
/* Update the average TTL stats for this database. */
|
||||
if (ttl_samples) {
|
||||
long long avg_ttl = ttl_sum/ttl_samples;
|
||||
|
||||
/* Do a simple running average with a few samples.
|
||||
* We just use the current estimate with a weight of 2%
|
||||
* and the previous estimate with a weight of 98%. */
|
||||
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
|
||||
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
|
||||
}
|
||||
|
||||
/* We can't block forever here even if there are many keys to
|
||||
* expire. So after a given amount of milliseconds return to the
|
||||
* caller waiting for the other active expire cycle. */
|
||||
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
|
||||
elapsed = ustime()-start;
|
||||
if (elapsed > timelimit) {
|
||||
timelimit_exit = 1;
|
||||
g_pserver->stat_expired_time_cap_reached_count++;
|
||||
/* If there is nothing to expire try next DB ASAP. */
|
||||
if (db->expireSize() == 0) {
|
||||
db->avg_ttl = 0;
|
||||
break;
|
||||
}
|
||||
num = dictSize(db->m_pdict);
|
||||
slots = dictSlots(db->m_pdict);
|
||||
now = mstime();
|
||||
|
||||
/* When there are less than 1% filled slots, sampling the key
|
||||
* space is expensive, so stop here waiting for better times...
|
||||
* The dictionary will be resized asap. */
|
||||
if (slots > DICT_HT_INITIAL_SIZE &&
|
||||
(num*100/slots < 1)) break;
|
||||
|
||||
/* The main collection cycle. Sample random keys among keys
|
||||
* with an expire set, checking for expired ones. */
|
||||
expired = 0;
|
||||
sampled = 0;
|
||||
ttl_sum = 0;
|
||||
ttl_samples = 0;
|
||||
|
||||
if (num > config_keys_per_loop)
|
||||
num = config_keys_per_loop;
|
||||
|
||||
/* Here we access the low level representation of the hash table
|
||||
* for speed concerns: this makes this code coupled with dict.c,
|
||||
* but it hardly changed in ten years.
|
||||
*
|
||||
* Note that certain places of the hash table may be empty,
|
||||
* so we want also a stop condition about the number of
|
||||
* buckets that we scanned. However scanning for free buckets
|
||||
* is very fast: we are in the cache line scanning a sequential
|
||||
* array of NULL pointers, so we can scan a lot more buckets
|
||||
* than keys in the same time. */
|
||||
long max_buckets = num*20;
|
||||
long checked_buckets = 0;
|
||||
|
||||
while (sampled < num && checked_buckets < max_buckets) {
|
||||
for (int table = 0; table < 2; table++) {
|
||||
if (table == 1 && !dictIsRehashing(db->m_pdict)) break;
|
||||
|
||||
unsigned long idx = db->expires_cursor;
|
||||
idx &= db->m_pdict->ht[table].sizemask;
|
||||
dictEntry *de = db->m_pdict->ht[table].table[idx];
|
||||
long long ttl;
|
||||
|
||||
/* Scan the current bucket of the current table. */
|
||||
checked_buckets++;
|
||||
while(de) {
|
||||
/* Get the next entry now since this entry may get
|
||||
* deleted. */
|
||||
dictEntry *e = de;
|
||||
robj *o = (robj*)dictGetVal(de);
|
||||
de = de->next;
|
||||
if (!o->FExpires())
|
||||
continue;
|
||||
|
||||
expireEntry *exp = &o->expire;
|
||||
|
||||
serverAssert(exp->when() > 0);
|
||||
ttl = exp->when()-now;
|
||||
size_t tried = 0;
|
||||
if (exp->when() <= now) {
|
||||
if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++;
|
||||
serverAssert(ttl <= 0);
|
||||
} else {
|
||||
serverAssert(ttl > 0);
|
||||
}
|
||||
if (ttl > 0) {
|
||||
/* We want the average TTL of keys yet
|
||||
* not expired. */
|
||||
ttl_sum += ttl;
|
||||
ttl_samples++;
|
||||
}
|
||||
sampled++;
|
||||
}
|
||||
}
|
||||
db->expires_cursor++;
|
||||
}
|
||||
total_expired += expired;
|
||||
total_sampled += sampled;
|
||||
|
||||
/* Update the average TTL stats for this database. */
|
||||
if (ttl_samples) {
|
||||
long long avg_ttl = ttl_sum/ttl_samples;
|
||||
|
||||
/* Do a simple running average with a few samples.
|
||||
* We just use the current estimate with a weight of 2%
|
||||
* and the previous estimate with a weight of 98%. */
|
||||
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
|
||||
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
|
||||
}
|
||||
|
||||
/* We can't block forever here even if there are many keys to
|
||||
* expire. So after a given amount of milliseconds return to the
|
||||
* caller waiting for the other active expire cycle. */
|
||||
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
|
||||
elapsed = ustime()-start;
|
||||
if (elapsed > timelimit) {
|
||||
timelimit_exit = 1;
|
||||
g_pserver->stat_expired_time_cap_reached_count++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* We don't repeat the cycle for the current database if there are
|
||||
* an acceptable amount of stale keys (logically expired but yet
|
||||
* not reclaimed). */
|
||||
} while (sampled == 0 ||
|
||||
(expired*100/sampled) > config_cycle_acceptable_stale);
|
||||
} else {
|
||||
long prev_expired;
|
||||
long long now = mstime();
|
||||
size_t tried = 0;
|
||||
std::vector<std::string> keys;
|
||||
do {
|
||||
prev_expired = total_expired;
|
||||
keys = db->getStorageCache()->getExpirationCandidates(ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP);
|
||||
for (std::string key : keys) {
|
||||
robj* keyobj = createStringObject(key.c_str(), key.size());
|
||||
db->find(szFromObj(keyobj));
|
||||
expireEntry *e = db->getExpire(keyobj);
|
||||
if (e != nullptr && e->when() < now)
|
||||
total_expired += activeExpireCycleExpire(db, szFromObj(keyobj), *e, now, tried);
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
total_sampled += keys.size();
|
||||
elapsed = ustime()-start;
|
||||
} while (keys.size() > 0 && (elapsed < timelimit) && (total_expired - prev_expired) > 0);
|
||||
|
||||
if (ustime()-start > timelimit) {
|
||||
timelimit_exit = 1;
|
||||
g_pserver->stat_expired_time_cap_reached_count++;
|
||||
}
|
||||
/* We don't repeat the cycle for the current database if there are
|
||||
* an acceptable amount of stale keys (logically expired but yet
|
||||
* not reclaimed). */
|
||||
} while (sampled == 0 ||
|
||||
(expired*100/sampled) > config_cycle_acceptable_stale);
|
||||
}
|
||||
}
|
||||
|
||||
elapsed = ustime()-start;
|
||||
|
@ -1871,6 +1871,9 @@ VersionCompareResult compareVersion(SymVer *pver)
|
||||
|| (pver->major == 0 && pver->minor == 0 && pver->build == 0))
|
||||
return VersionCompareResult::EqualVersion;
|
||||
|
||||
if (pver->major <= 6 && pver->minor <= 3 && pver->build <= 3)
|
||||
return VersionCompareResult::IncompatibleVersion;
|
||||
|
||||
for (int iver = 0; iver < 3; ++iver)
|
||||
{
|
||||
long verThis, verOther;
|
||||
|
@ -587,8 +587,12 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(c
|
||||
if (!fKeyOnly)
|
||||
{
|
||||
size_t offset = 0;
|
||||
deserializeExpire((const char*)data, cbData, &offset);
|
||||
std::unique_ptr<expireEntry> spexpire = deserializeExpire((const char*)data, cbData, &offset);
|
||||
o = deserializeStoredObject(reinterpret_cast<const char*>(data)+offset, cbData-offset);
|
||||
o->SetFExpires(spexpire != nullptr);
|
||||
if (spexpire != nullptr) {
|
||||
o->expire = std::move(*spexpire);
|
||||
}
|
||||
}
|
||||
fContinue = fn(sdsKey, o);
|
||||
if (o != nullptr)
|
||||
|
@ -33,8 +33,8 @@ std::string prefixKey(const char *key, size_t cchKey)
|
||||
return FInternalKey(key, cchKey) ? std::string(key, cchKey) : getPrefix(keyHashSlot(key, cchKey)) + std::string(key, cchKey);
|
||||
}
|
||||
|
||||
RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
|
||||
: m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
|
||||
RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spexpirecolfam, const rocksdb::Snapshot *psnapshot, size_t count)
|
||||
: m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_spexpirecolfamily(spexpirecolfam), m_count(count)
|
||||
{
|
||||
m_readOptionsTemplate = rocksdb::ReadOptions();
|
||||
m_readOptionsTemplate.verify_checksums = false;
|
||||
@ -211,11 +211,79 @@ bool RocksDBStorageProvider::enumerate_hashslot(callback fn, unsigned int hashsl
|
||||
return full_iter;
|
||||
}
|
||||
|
||||
void RocksDBStorageProvider::setExpire(const char *key, size_t cchKey, long long expire)
|
||||
{
|
||||
rocksdb::Status status;
|
||||
std::unique_lock<fastlock> l(m_lock);
|
||||
std::string prefix((const char *)&expire,sizeof(long long));
|
||||
std::string strKey(key, cchKey);
|
||||
if (m_spbatch != nullptr)
|
||||
status = m_spbatch->Put(m_spexpirecolfamily.get(), rocksdb::Slice(prefix + strKey), rocksdb::Slice(strKey));
|
||||
else
|
||||
status = m_spdb->Put(WriteOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(prefix + strKey), rocksdb::Slice(strKey));
|
||||
if (!status.ok())
|
||||
throw status.ToString();
|
||||
}
|
||||
|
||||
void RocksDBStorageProvider::removeExpire(const char *key, size_t cchKey, long long expire)
|
||||
{
|
||||
rocksdb::Status status;
|
||||
std::unique_lock<fastlock> l(m_lock);
|
||||
std::string prefix((const char *)&expire,sizeof(long long));
|
||||
std::string strKey(key, cchKey);
|
||||
std::string fullKey = prefix + strKey;
|
||||
if (!FExpireExists(fullKey))
|
||||
return;
|
||||
if (m_spbatch)
|
||||
status = m_spbatch->Delete(m_spexpirecolfamily.get(), rocksdb::Slice(fullKey));
|
||||
else
|
||||
status = m_spdb->Delete(WriteOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(fullKey));
|
||||
if (!status.ok())
|
||||
throw status.ToString();
|
||||
}
|
||||
|
||||
std::vector<std::string> RocksDBStorageProvider::getExpirationCandidates(unsigned int count)
|
||||
{
|
||||
std::vector<std::string> result;
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spexpirecolfamily.get()));
|
||||
for (it->SeekToFirst(); it->Valid() && result.size() < count; it->Next()) {
|
||||
if (FInternalKey(it->key().data(), it->key().size()))
|
||||
continue;
|
||||
result.emplace_back(it->value().data(), it->value().size());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::string randomHashSlot() {
|
||||
return getPrefix(genrand64_int63() % (1 << 16));
|
||||
}
|
||||
|
||||
std::vector<std::string> RocksDBStorageProvider::getEvictionCandidates(unsigned int count)
|
||||
{
|
||||
std::vector<std::string> result;
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
|
||||
for (it->Seek(randomHashSlot()); it->Valid() && result.size() < count; it->Next()) {
|
||||
if (FInternalKey(it->key().data(), it->key().size()))
|
||||
continue;
|
||||
result.emplace_back(it->key().data() + 2, it->key().size() - 2);
|
||||
}
|
||||
} else {
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spexpirecolfamily.get()));
|
||||
for (it->SeekToFirst(); it->Valid() && result.size() < count; it->Next()) {
|
||||
if (FInternalKey(it->key().data(), it->key().size()))
|
||||
continue;
|
||||
result.emplace_back(it->value().data(), it->value().size());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
const IStorage *RocksDBStorageProvider::clone() const
|
||||
{
|
||||
std::unique_lock<fastlock> l(m_lock);
|
||||
const rocksdb::Snapshot *psnapshot = const_cast<RocksDBStorageProvider*>(this)->m_spdb->GetSnapshot();
|
||||
return new RocksDBStorageProvider(m_pfactory, const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot, m_count);
|
||||
return new RocksDBStorageProvider(m_pfactory, const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, const_cast<RocksDBStorageProvider*>(this)->m_spexpirecolfamily, psnapshot, m_count);
|
||||
}
|
||||
|
||||
RocksDBStorageProvider::~RocksDBStorageProvider()
|
||||
@ -268,6 +336,7 @@ void RocksDBStorageProvider::batch_unlock()
|
||||
void RocksDBStorageProvider::flush()
|
||||
{
|
||||
m_spdb->SyncWAL();
|
||||
m_spdb->Flush(rocksdb::FlushOptions());
|
||||
}
|
||||
|
||||
bool RocksDBStorageProvider::FKeyExists(std::string& key) const
|
||||
@ -277,3 +346,11 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const
|
||||
return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||
}
|
||||
|
||||
bool RocksDBStorageProvider::FExpireExists(std::string& key) const
|
||||
{
|
||||
rocksdb::PinnableSlice slice;
|
||||
if (m_spbatch)
|
||||
return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||
return m_spdb->Get(ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||
}
|
@ -10,6 +10,7 @@
|
||||
static const char count_key[] = INTERNAL_KEY_PREFIX "__keydb__count\1";
|
||||
static const char version_key[] = INTERNAL_KEY_PREFIX "__keydb__version\1";
|
||||
static const char meta_key[] = INTERNAL_KEY_PREFIX "__keydb__metadata\1";
|
||||
static const char last_expire_key[] = INTERNAL_KEY_PREFIX "__keydb__last_expire_time";
|
||||
class RocksDBStorageFactory;
|
||||
|
||||
class RocksDBStorageProvider : public IStorage
|
||||
@ -19,12 +20,13 @@ class RocksDBStorageProvider : public IStorage
|
||||
std::unique_ptr<rocksdb::WriteBatchWithIndex> m_spbatch;
|
||||
const rocksdb::Snapshot *m_psnapshot = nullptr;
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spexpirecolfamily;
|
||||
rocksdb::ReadOptions m_readOptionsTemplate;
|
||||
size_t m_count = 0;
|
||||
mutable fastlock m_lock {"RocksDBStorageProvider"};
|
||||
|
||||
public:
|
||||
RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
|
||||
RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spexpirecolfam, const rocksdb::Snapshot *psnapshot, size_t count);
|
||||
~RocksDBStorageProvider();
|
||||
|
||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override;
|
||||
@ -34,6 +36,11 @@ public:
|
||||
virtual bool enumerate(callback fn) const override;
|
||||
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override;
|
||||
|
||||
virtual std::vector<std::string> getExpirationCandidates(unsigned int count) override;
|
||||
virtual std::vector<std::string> getEvictionCandidates(unsigned int count) override;
|
||||
virtual void setExpire(const char *key, size_t cchKey, long long expire) override;
|
||||
virtual void removeExpire(const char *key, size_t cchKey, long long expire) override;
|
||||
|
||||
virtual const IStorage *clone() const override;
|
||||
|
||||
virtual void beginWriteBatch() override;
|
||||
@ -50,6 +57,7 @@ public:
|
||||
|
||||
protected:
|
||||
bool FKeyExists(std::string&) const;
|
||||
bool FExpireExists(std::string&) const;
|
||||
|
||||
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
|
||||
rocksdb::WriteOptions WriteOptions() const;
|
||||
|
@ -5,6 +5,7 @@ class RocksDBStorageFactory : public IStorageFactory
|
||||
{
|
||||
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
|
||||
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspcols;
|
||||
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspexpirecols;
|
||||
std::shared_ptr<rocksdb::SstFileManager> m_pfilemanager;
|
||||
std::string m_path;
|
||||
bool m_fCreatedTempFolder = false;
|
||||
|
@ -61,8 +61,8 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
|
||||
auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT);
|
||||
// RocksDB requires we know the count of col families before opening, if the user only wants to see less
|
||||
// we still have to make room for all column family handles regardless
|
||||
if (status.ok() && (int)vecT.size() > dbnum)
|
||||
dbnum = (int)vecT.size();
|
||||
if (status.ok() && (int)vecT.size()/2 > dbnum)
|
||||
dbnum = (int)vecT.size()/2;
|
||||
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> veccoldesc;
|
||||
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); // ignore default col family
|
||||
@ -79,6 +79,7 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
|
||||
rocksdb::ColumnFamilyOptions cf_options(options);
|
||||
cf_options.level_compaction_dynamic_level_bytes = true;
|
||||
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options));
|
||||
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb) + "_expires", cf_options));
|
||||
}
|
||||
|
||||
if (rgchConfig != nullptr)
|
||||
@ -100,22 +101,28 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
|
||||
m_spdb = std::shared_ptr<rocksdb::DB>(db);
|
||||
for (auto handle : handles)
|
||||
{
|
||||
std::string strVersion;
|
||||
auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion);
|
||||
if (!status.ok())
|
||||
{
|
||||
setVersion(handle);
|
||||
}
|
||||
else
|
||||
{
|
||||
SymVer ver = parseVersion(strVersion.c_str());
|
||||
auto cmp = compareVersion(&ver);
|
||||
if (cmp == NewerVersion)
|
||||
throw "Cannot load FLASH database created by newer version of KeyDB";
|
||||
if (cmp == OlderVersion)
|
||||
if (handle->GetName().size() > 7 && !strncmp(handle->GetName().substr(handle->GetName().size() - 7).c_str(), "expires", 7)) {
|
||||
m_vecspexpirecols.emplace_back(handle);
|
||||
} else {
|
||||
std::string strVersion;
|
||||
auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion);
|
||||
if (!status.ok())
|
||||
{
|
||||
setVersion(handle);
|
||||
}
|
||||
else
|
||||
{
|
||||
SymVer ver = parseVersion(strVersion.c_str());
|
||||
auto cmp = compareVersion(&ver);
|
||||
if (cmp == NewerVersion)
|
||||
throw "Cannot load FLASH database created by newer version of KeyDB";
|
||||
if (cmp == IncompatibleVersion)
|
||||
throw "Cannot load FLASH database from before 6.3.4";
|
||||
if (cmp == OlderVersion)
|
||||
setVersion(handle);
|
||||
}
|
||||
m_vecspcols.emplace_back(handle);
|
||||
}
|
||||
m_vecspcols.emplace_back(handle);
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,6 +163,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr
|
||||
{
|
||||
++db; // skip default col family
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> spexpirecolfamily(m_vecspexpirecols[db].release());
|
||||
size_t count = 0;
|
||||
bool fUnclean = false;
|
||||
|
||||
@ -192,7 +200,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr
|
||||
++count;
|
||||
}
|
||||
}
|
||||
return new RocksDBStorageProvider(this, m_spdb, spcolfamily, nullptr, count);
|
||||
return new RocksDBStorageProvider(this, m_spdb, spcolfamily, spexpirecolfamily, nullptr, count);
|
||||
}
|
||||
|
||||
const char *RocksDBStorageFactory::name() const
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "../IStorage.h"
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
class TestStorageFactory : public IStorageFactory
|
||||
{
|
||||
@ -28,6 +29,11 @@ public:
|
||||
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override;
|
||||
virtual size_t count() const override;
|
||||
|
||||
virtual std::vector<std::string> getExpirationCandidates(unsigned int) override { return std::vector<std::string>(); }
|
||||
virtual std::vector<std::string> getEvictionCandidates(unsigned int) override { return std::vector<std::string>(); }
|
||||
virtual void setExpire(const char *, size_t, long long) override {}
|
||||
virtual void removeExpire(const char *, size_t, long long) override {}
|
||||
|
||||
virtual void flush() override;
|
||||
|
||||
/* This is permitted to be a shallow clone */
|
||||
|
@ -7,6 +7,7 @@ enum VersionCompareResult
|
||||
EqualVersion,
|
||||
OlderVersion,
|
||||
NewerVersion,
|
||||
IncompatibleVersion,
|
||||
};
|
||||
|
||||
struct SymVer
|
||||
|
@ -74,8 +74,11 @@ if {$::flash_enabled} {
|
||||
r set testkey foo ex 1
|
||||
r flushall cache
|
||||
assert_equal {1} [r dbsize]
|
||||
after 1500
|
||||
assert_equal {0} [r dbsize]
|
||||
wait_for_condition 50 1000 {
|
||||
[r dbsize] == 0
|
||||
} else {
|
||||
fail "key is not expired"
|
||||
}
|
||||
}
|
||||
|
||||
test { SUBKEY EXPIRE persists after cache flush } {
|
||||
@ -140,17 +143,17 @@ if {$::flash_enabled} {
|
||||
r set $numkeys xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
incr numkeys
|
||||
if {[s used_memory]+1024 >= $limit} {
|
||||
break
|
||||
break
|
||||
}
|
||||
}
|
||||
# Add additional keys to force eviction
|
||||
# should still be under the limit for maxmemory, however all keys set should still exist between flash and memory
|
||||
# check same number of keys exist in addition to values of first and last keys
|
||||
set err 0
|
||||
set extra_keys [expr floor([expr ($limit * 0.4) / 1024])]
|
||||
set extra_keys [expr floor([expr ($limit * 0.4) / 1024])]
|
||||
for {set j 0} {$j < $extra_keys} {incr j} {
|
||||
catch {
|
||||
r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
} err
|
||||
assert {$err == {OK}}
|
||||
}
|
||||
@ -165,7 +168,49 @@ if {$::flash_enabled} {
|
||||
assert {[r get last] == {val}}
|
||||
r flushall
|
||||
}
|
||||
}
|
||||
|
||||
test "FLASH - is flash eviction working? (policy $policy)" {
|
||||
# Get the current memory limit and calculate a new limit.
|
||||
# Set limit to 100M.
|
||||
set used [s used_memory]
|
||||
set limit [expr {$used+60*1024*1024}]
|
||||
r config set maxmemory $limit
|
||||
r config set maxmemory-policy $policy
|
||||
# Now add keys equivalent to 1024b until the limit is almost reached.
|
||||
set numkeys 0
|
||||
r set first val
|
||||
while 1 {
|
||||
r set $numkeys xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
incr numkeys
|
||||
if {[s used_memory]+1024 >= $limit} {
|
||||
break
|
||||
}
|
||||
}
|
||||
# Add additional keys to force eviction
|
||||
# should still be under the limit for maxmemory, however all keys set should still exist between flash and memory
|
||||
# check same number of keys exist in addition to values of first and last keys
|
||||
set err 0
|
||||
set extra_keys [expr floor([expr ($limit * 0.4) / 1024])]
|
||||
for {set j 0} {$j < $extra_keys} {incr j} {
|
||||
catch {
|
||||
r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
|
||||
} err
|
||||
assert {$err == {OK}}
|
||||
}
|
||||
if {[log_file_matches [srv 0 stdout] "*Failed to evict*"]} {
|
||||
fail "Server did not evict cleanly (detected full flush)"
|
||||
}
|
||||
r set last val
|
||||
r debug flush-storage
|
||||
r config set maxstorage 1
|
||||
r config set maxmemory 1
|
||||
set dbsize [r dbsize]
|
||||
# after setting maxstorage and memory below used amount we should evict from storage provider
|
||||
assert {$dbsize < $numkeys+$extra_keys+2}
|
||||
r config set maxstorage 0
|
||||
r config set maxmemory 0
|
||||
r flushall
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
set testmodule [file normalize tests/modules/load.so]
|
||||
|
||||
if {$::flash_enabled} {
|
||||
start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master} databases 256 loadmodule $testmodule]] {
|
||||
start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master.load.test} databases 256 loadmodule $testmodule]] {
|
||||
test "Module is notified of keys loaded from flash" {
|
||||
r flushall
|
||||
r set foo bar
|
||||
|
Loading…
x
Reference in New Issue
Block a user