From ace783a00e89dae0090e89c755e28b29769a5ed8 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Fri, 8 Sep 2023 16:25:53 -0400 Subject: [PATCH] Flash expiration (#197) Design Doc: https://docs.google.com/document/d/1NmnYGnHLdZp-KOUCUatX5iXpF-L3YK4VUc9Lm3Tqxpo/edit?usp=sharing --- machamp_scripts/build.sh | 2 +- src/IStorage.h | 6 + src/StorageCache.cpp | 42 +++-- src/StorageCache.h | 7 + src/config.cpp | 15 +- src/db.cpp | 6 +- src/debug.cpp | 17 +- src/evict.cpp | 140 ++++++++++----- src/expire.cpp | 258 +++++++++++++++------------ src/server.cpp | 3 + src/snapshot.cpp | 6 +- src/storage/rocksdb.cpp | 83 ++++++++- src/storage/rocksdb.h | 10 +- src/storage/rocksdbfactor_internal.h | 1 + src/storage/rocksdbfactory.cpp | 44 +++-- src/storage/teststorageprovider.h | 6 + src/version.h | 1 + tests/unit/flash.tcl | 57 +++++- tests/unit/moduleapi/load.tcl | 2 +- 19 files changed, 499 insertions(+), 207 deletions(-) diff --git a/machamp_scripts/build.sh b/machamp_scripts/build.sh index e05a9b3bd..a395f26ee 100755 --- a/machamp_scripts/build.sh +++ b/machamp_scripts/build.sh @@ -2,7 +2,7 @@ # make the build git submodule init && git submodule update -make BUILD_TLS=yes -j$(nproc) KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' +make BUILD_TLS=yes ENABLE_FLASH=yes -j$(nproc) KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' # gen-cert ./utils/gen-test-certs.sh \ No newline at end of file diff --git a/src/IStorage.h b/src/IStorage.h index dc608d490..1e3542391 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,6 +1,7 @@ #pragma once #include #include "sds.h" +#include #define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" @@ -43,6 +44,11 @@ public: endWriteBatch(); } + virtual std::vector getExpirationCandidates(unsigned int count) = 0; + virtual std::vector getEvictionCandidates(unsigned int count) = 0; + virtual void setExpire(const char *key, size_t cchKey, long long expire) = 0; + virtual void removeExpire(const char *key, size_t cchKey, long long expire) = 0; + virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 91d4b3657..ba7910399 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -84,19 +84,31 @@ void StorageCache::cacheKey(const char *rgch, size_t cch) bool StorageCache::erase(sds key) { + unsigned long long when = 0; + m_spstorage->retrieve(key, sdslen(key), [&when](const char *, size_t, const void * data, size_t cbdata) { + auto e = deserializeExpire((const char *)data, cbdata, nullptr); + if (e != nullptr) + when = e->when(); + }); bool result = m_spstorage->erase(key, sdslen(key)); std::unique_lock ul(m_lock); - if (result && m_pdict != nullptr) + if (result) { - uint64_t hash = dictSdsHash(key); - dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); - serverAssert(de != nullptr); - de->v.s64--; - serverAssert(de->v.s64 >= 0); - if (de->v.s64 == 0) { - dictDelete(m_pdict, reinterpret_cast(hash)); - } else { - m_collisionCount--; + if (m_pdict != nullptr) + { + uint64_t hash = dictSdsHash(key); + dictEntry *de = dictFind(m_pdict, reinterpret_cast(hash)); + serverAssert(de != nullptr); + de->v.s64--; + serverAssert(de->v.s64 >= 0); + if (de->v.s64 == 0) { + dictDelete(m_pdict, reinterpret_cast(hash)); + } else { + m_collisionCount--; + } + } + if (when != 0) { + m_spstorage->removeExpire(key, sdslen(key), when); } } return result; @@ -111,6 +123,9 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr } ul.unlock(); m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); + auto e = deserializeExpire((const char *)data, cbdata, nullptr); + if (e != nullptr) + m_spstorage->setExpire(key, sdslen(key), e->when()); } long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); @@ -119,13 +134,18 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si std::vector vechashes; if (m_pdict != nullptr) { vechashes.reserve(celem); + } - for (size_t ielem = 0; ielem < celem; ++ielem) { + for (size_t ielem = 0; ielem < celem; ++ielem) { + if (m_pdict != nullptr) { dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); de->v.u64 = 1; vechashes.push_back(de); } + auto e = deserializeExpire(rgvals[ielem], rgcbvals[ielem], nullptr); + if (e != nullptr) + m_spstorage->setExpire(rgkeys[ielem], rgcbkeys[ielem], e->when()); } std::unique_lock ul(m_lock); diff --git a/src/StorageCache.h b/src/StorageCache.h index 4f3c1a374..829b41f08 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -51,11 +51,18 @@ public: bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } bool enumerate_hashslot(IStorage::callback fn, unsigned int hashslot) const { return m_spstorage->enumerate_hashslot(fn, hashslot); } + std::vector getExpirationCandidates(unsigned int count) { return m_spstorage->getExpirationCandidates(count); } + std::vector getEvictionCandidates(unsigned int count) { return m_spstorage->getEvictionCandidates(count); } + void setExpire(const char *key, size_t cchKey, long long expire) { m_spstorage->setExpire(key, cchKey, expire); } + void removeExpire(const char *key, size_t cchKey, long long expire) { m_spstorage->removeExpire(key, cchKey, expire); } + void beginWriteBatch(); void endWriteBatch() { m_spstorage->endWriteBatch(); } void batch_lock() { return m_spstorage->batch_lock(); } void batch_unlock() { return m_spstorage->batch_unlock(); } + void flush() { m_spstorage->flush(); } + size_t count() const; const StorageCache *clone(); diff --git a/src/config.cpp b/src/config.cpp index f898e9e74..0e8d54a58 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2597,6 +2597,19 @@ static int updateMaxmemory(long long val, long long prev, const char **err) { return 1; } +static int updateFlashMaxmemory(long long val, long long prev, const char **err) { + UNUSED(prev); + UNUSED(err); + if (val && g_pserver->m_pstorageFactory) { + size_t used = g_pserver->m_pstorageFactory->totalDiskspaceUsed(); + if ((unsigned long long)val < used) { + serverLog(LL_WARNING,"WARNING: the new maxstorage value set via CONFIG SET (%llu) is smaller than the current storage usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxstorage, used); + } + performEvictions(false /*fPreSnapshot*/); + } + return 1; +} + static int updateGoodSlaves(long long val, long long prev, const char **err) { UNUSED(val); UNUSED(prev); @@ -2940,7 +2953,7 @@ standardConfig configs[] = { /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), - createULongLongConfig("maxstorage", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxstorage, 0, MEMORY_CONFIG, NULL, NULL), + createULongLongConfig("maxstorage", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxstorage, 0, MEMORY_CONFIG, NULL, updateFlashMaxmemory), /* Size_t configs */ createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL), diff --git a/src/db.cpp b/src/db.cpp index d351fde58..67e0926e7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2875,10 +2875,10 @@ LNotFound: { dictAdd(m_pdict, sdsNewKey, o); + o->SetFExpires(spexpire != nullptr); if (spexpire != nullptr) { o->expire = std::move(*spexpire); } - o->SetFExpires(spexpire != nullptr); g_pserver->stat_storage_provider_read_hits++; } else { sdsfree(sdsNewKey); @@ -3249,8 +3249,8 @@ std::unique_ptr deserializeExpire(const char *str, size_t cch, size if (subkey) sdsfree(subkey); } - - *poffset = offset; + if (poffset != nullptr) + *poffset = offset; return spexpire; } diff --git a/src/debug.cpp b/src/debug.cpp index 00b9482dd..817728ee8 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -308,7 +308,7 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(final,&aux,sizeof(aux)); /* Iterate this DB writing every entry */ - db->iterate_threadsafe([final, db](const char *key, robj_roptr o)->bool { + db->iterate_threadsafe([final](const char *key, robj_roptr o)->bool { unsigned char digest[20]; robj *keyobj; @@ -932,6 +932,21 @@ NULL mallctl_string(c, c->argv+2, c->argc-2); return; #endif + } else if(!strcasecmp(szFromObj(c->argv[1]),"flush-storage") && c->argc == 2) { + if (g_pserver->m_pstorageFactory != nullptr) { + for (int i = 0; i < cserver.dbnum; i++) { + g_pserver->db[i]->getStorageCache()->flush(); + } + addReply(c,shared.ok); + } else { + addReplyError(c, "Can't flush storage if no storage provider is set"); + } + } else if (!strcasecmp(szFromObj(c->argv[1]),"get-storage-usage") && c->argc == 2) { + if (g_pserver->m_pstorageFactory != nullptr) { + addReplyLongLong(c, g_pserver->m_pstorageFactory->totalDiskspaceUsed()); + } else { + addReplyLongLong(c, 0); + } } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/evict.cpp b/src/evict.cpp index 2503cca15..432b807d6 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -100,6 +100,36 @@ unsigned long long estimateObjectIdleTime(robj_roptr o) { } } +unsigned long long getIdle(robj *obj, const expireEntry *e) { + unsigned long long idle; + /* Calculate the idle time according to the policy. This is called + * idle just because the code initially handled LRU, but is in fact + * just a score where an higher score means better candidate. */ + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { + idle = (obj != nullptr) ? estimateObjectIdleTime(obj) : 0; + } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { + /* When we use an LRU policy, we sort the keys by idle time + * so that we expire keys starting from greater idle time. + * However when the policy is an LFU one, we have a frequency + * estimation, and we want to evict keys with lower frequency + * first. So inside the pool we put objects using the inverted + * frequency subtracting the actual frequency to the maximum + * frequency of 255. */ + idle = 255-LFUDecrAndReturn(obj); + } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { + /* In this case the sooner the expire the better. */ + if (e != nullptr) + idle = ULLONG_MAX - e->when(); + else + idle = 0; + } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { + idle = ULLONG_MAX; + } else { + serverPanic("Unknown eviction policy in storage eviction"); + } + return idle; +} + /* LRU approximation algorithm * * Redis uses an approximation of the LRU algorithm that runs in constant @@ -137,28 +167,7 @@ void evictionPoolAlloc(void) { void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool) { - unsigned long long idle; - - /* Calculate the idle time according to the policy. This is called - * idle just because the code initially handled LRU, but is in fact - * just a score where an higher score means better candidate. */ - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { - idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0; - } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { - /* When we use an LRU policy, we sort the keys by idle time - * so that we expire keys starting from greater idle time. - * However when the policy is an LFU one, we have a frequency - * estimation, and we want to evict keys with lower frequency - * first. So inside the pool we put objects using the inverted - * frequency subtracting the actual frequency to the maximum - * frequency of 255. */ - idle = 255-LFUDecrAndReturn(o); - } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { - /* In this case the sooner the expire the better. */ - idle = ULLONG_MAX - e->when(); - } else { - serverPanic("Unknown eviction policy in evictionPoolPopulate()"); - } + unsigned long long idle = getIdle(o,e); /* Insert the element inside the pool. * First, find the first empty bucket or the first populated @@ -600,6 +609,31 @@ static unsigned long evictionTimeLimitUs() { return ULONG_MAX; /* No limit to eviction time */ } +void evict(redisDb *db, robj *keyobj) { + mstime_t eviction_latency; + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); + /* We compute the amount of memory freed by db*Delete() alone. + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. */ + latencyStartMonitor(eviction_latency); + if (g_pserver->lazyfree_lazy_eviction) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + latencyEndMonitor(eviction_latency); + latencyAddSampleIfNeeded("eviction-del",eviction_latency); + + signalModifiedKey(NULL,db,keyobj); + notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", + keyobj, db->id); + decrRefCount(keyobj); +} + static void updateSysAvailableMemory() { if (g_pserver->force_eviction_percent) { g_pserver->cron_malloc_stats.sys_available = getMemAvailable(); @@ -637,7 +671,7 @@ int performEvictions(bool fPreSnapshot) { int keys_freed = 0; size_t mem_reported, mem_tofree; long long mem_freed; /* May be negative */ - mstime_t latency, eviction_latency; + mstime_t latency; long long delta; int slaves = listLength(g_pserver->slaves); const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider(); @@ -662,6 +696,43 @@ int performEvictions(bool fPreSnapshot) { monotime evictionTimer; elapsedStart(&evictionTimer); + if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr) { + while (g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage && elapsedUs(evictionTimer) < eviction_time_limit_us) { + redisDb *db; + std::vector evictionPool; + robj *bestkey = nullptr; + redisDb *bestdb = nullptr; + unsigned long long bestidle = 0; + for (int i = 0; i < cserver.dbnum; i++) { + db = g_pserver->db[i]; + evictionPool = db->getStorageCache()->getEvictionCandidates(g_pserver->maxmemory_samples); + for (std::string key : evictionPool) { + robj *keyobj = createStringObject(key.c_str(), key.size()); + robj *obj = db->find(szFromObj(keyobj)); + if (obj != nullptr) { + expireEntry *e = db->getExpire(keyobj); + unsigned long long idle = getIdle(obj, e); + + if (bestkey == nullptr || bestidle < idle) { + if (bestkey != nullptr) + decrRefCount(bestkey); + incrRefCount(keyobj); + bestkey = keyobj; + bestidle = idle; + bestdb = db; + } + } + decrRefCount(keyobj); + } + } + if (bestkey) { + evict(bestdb, bestkey); + } else { + break; //could not find a key to evict so stop now + } + } + } + if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr && g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage) goto cant_free_storage; @@ -776,7 +847,7 @@ int performEvictions(bool fPreSnapshot) { if (db->removeCachedValue(bestkey, &deT)) { mem_freed += splazy->addEntry(db->dictUnsafeKeyOnly(), deT); ckeysFailed = 0; - g_pserver->stat_evictedkeys++; + g_pserver->stat_evictedkeys++; } else { delta = 0; @@ -788,30 +859,11 @@ int performEvictions(bool fPreSnapshot) { else { robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); - /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); - latencyStartMonitor(eviction_latency); - if (g_pserver->lazyfree_lazy_eviction) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - latencyEndMonitor(eviction_latency); - latencyAddSampleIfNeeded("eviction-del",eviction_latency); + evict(db, keyobj); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; g_pserver->stat_evictedkeys++; - signalModifiedKey(NULL,db,keyobj); - notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", - keyobj, db->id); - decrRefCount(keyobj); } keys_freed++; diff --git a/src/expire.cpp b/src/expire.cpp index 4bf81878a..8d711cedf 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -396,126 +396,152 @@ void pexpireMemberAtCommand(client *c) * distribute the time evenly across DBs. */ current_db++; - /* Continue to expire if at the end of the cycle there are still - * a big percentage of keys to expire, compared to the number of keys - * we scanned. The percentage, stored in config_cycle_acceptable_stale - * is not fixed, but depends on the Redis configured "expire effort". */ - do { - unsigned long num, slots; - long long now, ttl_sum; - int ttl_samples; - iteration++; + if (g_pserver->m_pstorageFactory == nullptr) { + /* Continue to expire if at the end of the cycle there are still + * a big percentage of keys to expire, compared to the number of keys + * we scanned. The percentage, stored in config_cycle_acceptable_stale + * is not fixed, but depends on the Redis configured "expire effort". */ + do { + unsigned long num, slots; + long long now, ttl_sum; + int ttl_samples; + iteration++; - /* If there is nothing to expire try next DB ASAP. */ - if (db->expireSize() == 0) { - db->avg_ttl = 0; - break; - } - num = dictSize(db->m_pdict); - slots = dictSlots(db->m_pdict); - now = mstime(); - - /* When there are less than 1% filled slots, sampling the key - * space is expensive, so stop here waiting for better times... - * The dictionary will be resized asap. */ - if (slots > DICT_HT_INITIAL_SIZE && - (num*100/slots < 1)) break; - - /* The main collection cycle. Sample random keys among keys - * with an expire set, checking for expired ones. */ - expired = 0; - sampled = 0; - ttl_sum = 0; - ttl_samples = 0; - - if (num > config_keys_per_loop) - num = config_keys_per_loop; - - /* Here we access the low level representation of the hash table - * for speed concerns: this makes this code coupled with dict.c, - * but it hardly changed in ten years. - * - * Note that certain places of the hash table may be empty, - * so we want also a stop condition about the number of - * buckets that we scanned. However scanning for free buckets - * is very fast: we are in the cache line scanning a sequential - * array of NULL pointers, so we can scan a lot more buckets - * than keys in the same time. */ - long max_buckets = num*20; - long checked_buckets = 0; - - while (sampled < num && checked_buckets < max_buckets) { - for (int table = 0; table < 2; table++) { - if (table == 1 && !dictIsRehashing(db->m_pdict)) break; - - unsigned long idx = db->expires_cursor; - idx &= db->m_pdict->ht[table].sizemask; - dictEntry *de = db->m_pdict->ht[table].table[idx]; - long long ttl; - - /* Scan the current bucket of the current table. */ - checked_buckets++; - while(de) { - /* Get the next entry now since this entry may get - * deleted. */ - dictEntry *e = de; - robj *o = (robj*)dictGetVal(de); - de = de->next; - if (!o->FExpires()) - continue; - - expireEntry *exp = &o->expire; - - serverAssert(exp->when() > 0); - ttl = exp->when()-now; - size_t tried = 0; - if (exp->when() <= now) { - if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++; - serverAssert(ttl <= 0); - } else { - serverAssert(ttl > 0); - } - if (ttl > 0) { - /* We want the average TTL of keys yet - * not expired. */ - ttl_sum += ttl; - ttl_samples++; - } - sampled++; - } - } - db->expires_cursor++; - } - total_expired += expired; - total_sampled += sampled; - - /* Update the average TTL stats for this database. */ - if (ttl_samples) { - long long avg_ttl = ttl_sum/ttl_samples; - - /* Do a simple running average with a few samples. - * We just use the current estimate with a weight of 2% - * and the previous estimate with a weight of 98%. */ - if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; - db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); - } - - /* We can't block forever here even if there are many keys to - * expire. So after a given amount of milliseconds return to the - * caller waiting for the other active expire cycle. */ - if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ - elapsed = ustime()-start; - if (elapsed > timelimit) { - timelimit_exit = 1; - g_pserver->stat_expired_time_cap_reached_count++; + /* If there is nothing to expire try next DB ASAP. */ + if (db->expireSize() == 0) { + db->avg_ttl = 0; break; } + num = dictSize(db->m_pdict); + slots = dictSlots(db->m_pdict); + now = mstime(); + + /* When there are less than 1% filled slots, sampling the key + * space is expensive, so stop here waiting for better times... + * The dictionary will be resized asap. */ + if (slots > DICT_HT_INITIAL_SIZE && + (num*100/slots < 1)) break; + + /* The main collection cycle. Sample random keys among keys + * with an expire set, checking for expired ones. */ + expired = 0; + sampled = 0; + ttl_sum = 0; + ttl_samples = 0; + + if (num > config_keys_per_loop) + num = config_keys_per_loop; + + /* Here we access the low level representation of the hash table + * for speed concerns: this makes this code coupled with dict.c, + * but it hardly changed in ten years. + * + * Note that certain places of the hash table may be empty, + * so we want also a stop condition about the number of + * buckets that we scanned. However scanning for free buckets + * is very fast: we are in the cache line scanning a sequential + * array of NULL pointers, so we can scan a lot more buckets + * than keys in the same time. */ + long max_buckets = num*20; + long checked_buckets = 0; + + while (sampled < num && checked_buckets < max_buckets) { + for (int table = 0; table < 2; table++) { + if (table == 1 && !dictIsRehashing(db->m_pdict)) break; + + unsigned long idx = db->expires_cursor; + idx &= db->m_pdict->ht[table].sizemask; + dictEntry *de = db->m_pdict->ht[table].table[idx]; + long long ttl; + + /* Scan the current bucket of the current table. */ + checked_buckets++; + while(de) { + /* Get the next entry now since this entry may get + * deleted. */ + dictEntry *e = de; + robj *o = (robj*)dictGetVal(de); + de = de->next; + if (!o->FExpires()) + continue; + + expireEntry *exp = &o->expire; + + serverAssert(exp->when() > 0); + ttl = exp->when()-now; + size_t tried = 0; + if (exp->when() <= now) { + if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++; + serverAssert(ttl <= 0); + } else { + serverAssert(ttl > 0); + } + if (ttl > 0) { + /* We want the average TTL of keys yet + * not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + sampled++; + } + } + db->expires_cursor++; + } + total_expired += expired; + total_sampled += sampled; + + /* Update the average TTL stats for this database. */ + if (ttl_samples) { + long long avg_ttl = ttl_sum/ttl_samples; + + /* Do a simple running average with a few samples. + * We just use the current estimate with a weight of 2% + * and the previous estimate with a weight of 98%. */ + if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; + db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); + } + + /* We can't block forever here even if there are many keys to + * expire. So after a given amount of milliseconds return to the + * caller waiting for the other active expire cycle. */ + if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ + elapsed = ustime()-start; + if (elapsed > timelimit) { + timelimit_exit = 1; + g_pserver->stat_expired_time_cap_reached_count++; + break; + } + } + /* We don't repeat the cycle for the current database if there are + * an acceptable amount of stale keys (logically expired but yet + * not reclaimed). */ + } while (sampled == 0 || + (expired*100/sampled) > config_cycle_acceptable_stale); + } else { + long prev_expired; + long long now = mstime(); + size_t tried = 0; + std::vector keys; + do { + prev_expired = total_expired; + keys = db->getStorageCache()->getExpirationCandidates(ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP); + for (std::string key : keys) { + robj* keyobj = createStringObject(key.c_str(), key.size()); + db->find(szFromObj(keyobj)); + expireEntry *e = db->getExpire(keyobj); + if (e != nullptr && e->when() < now) + total_expired += activeExpireCycleExpire(db, szFromObj(keyobj), *e, now, tried); + decrRefCount(keyobj); + } + total_sampled += keys.size(); + elapsed = ustime()-start; + } while (keys.size() > 0 && (elapsed < timelimit) && (total_expired - prev_expired) > 0); + + if (ustime()-start > timelimit) { + timelimit_exit = 1; + g_pserver->stat_expired_time_cap_reached_count++; } - /* We don't repeat the cycle for the current database if there are - * an acceptable amount of stale keys (logically expired but yet - * not reclaimed). */ - } while (sampled == 0 || - (expired*100/sampled) > config_cycle_acceptable_stale); + } } elapsed = ustime()-start; diff --git a/src/server.cpp b/src/server.cpp index 983aa3a05..f4ad87c65 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1870,6 +1870,9 @@ VersionCompareResult compareVersion(SymVer *pver) if ((symVerThis.major == 0 && symVerThis.minor == 0 && symVerThis.build == 0) || (pver->major == 0 && pver->minor == 0 && pver->build == 0)) return VersionCompareResult::EqualVersion; + + if (pver->major <= 6 && pver->minor <= 3 && pver->build <= 3) + return VersionCompareResult::IncompatibleVersion; for (int iver = 0; iver < 3; ++iver) { diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 4f158803d..94956ebc2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -587,8 +587,12 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function spexpire = deserializeExpire((const char*)data, cbData, &offset); o = deserializeStoredObject(reinterpret_cast(data)+offset, cbData-offset); + o->SetFExpires(spexpire != nullptr); + if (spexpire != nullptr) { + o->expire = std::move(*spexpire); + } } fContinue = fn(sdsKey, o); if (o != nullptr) diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 76eaa133a..f433947ba 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -33,8 +33,8 @@ std::string prefixKey(const char *key, size_t cchKey) return FInternalKey(key, cchKey) ? std::string(key, cchKey) : getPrefix(keyHashSlot(key, cchKey)) + std::string(key, cchKey); } -RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count) - : m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count) +RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr &spdb, std::shared_ptr &spcolfam, std::shared_ptr &spexpirecolfam, const rocksdb::Snapshot *psnapshot, size_t count) + : m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_spexpirecolfamily(spexpirecolfam), m_count(count) { m_readOptionsTemplate = rocksdb::ReadOptions(); m_readOptionsTemplate.verify_checksums = false; @@ -211,11 +211,79 @@ bool RocksDBStorageProvider::enumerate_hashslot(callback fn, unsigned int hashsl return full_iter; } +void RocksDBStorageProvider::setExpire(const char *key, size_t cchKey, long long expire) +{ + rocksdb::Status status; + std::unique_lock l(m_lock); + std::string prefix((const char *)&expire,sizeof(long long)); + std::string strKey(key, cchKey); + if (m_spbatch != nullptr) + status = m_spbatch->Put(m_spexpirecolfamily.get(), rocksdb::Slice(prefix + strKey), rocksdb::Slice(strKey)); + else + status = m_spdb->Put(WriteOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(prefix + strKey), rocksdb::Slice(strKey)); + if (!status.ok()) + throw status.ToString(); +} + +void RocksDBStorageProvider::removeExpire(const char *key, size_t cchKey, long long expire) +{ + rocksdb::Status status; + std::unique_lock l(m_lock); + std::string prefix((const char *)&expire,sizeof(long long)); + std::string strKey(key, cchKey); + std::string fullKey = prefix + strKey; + if (!FExpireExists(fullKey)) + return; + if (m_spbatch) + status = m_spbatch->Delete(m_spexpirecolfamily.get(), rocksdb::Slice(fullKey)); + else + status = m_spdb->Delete(WriteOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(fullKey)); + if (!status.ok()) + throw status.ToString(); +} + +std::vector RocksDBStorageProvider::getExpirationCandidates(unsigned int count) +{ + std::vector result; + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spexpirecolfamily.get())); + for (it->SeekToFirst(); it->Valid() && result.size() < count; it->Next()) { + if (FInternalKey(it->key().data(), it->key().size())) + continue; + result.emplace_back(it->value().data(), it->value().size()); + } + return result; +} + +std::string randomHashSlot() { + return getPrefix(genrand64_int63() % (1 << 16)); +} + +std::vector RocksDBStorageProvider::getEvictionCandidates(unsigned int count) +{ + std::vector result; + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get())); + for (it->Seek(randomHashSlot()); it->Valid() && result.size() < count; it->Next()) { + if (FInternalKey(it->key().data(), it->key().size())) + continue; + result.emplace_back(it->key().data() + 2, it->key().size() - 2); + } + } else { + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spexpirecolfamily.get())); + for (it->SeekToFirst(); it->Valid() && result.size() < count; it->Next()) { + if (FInternalKey(it->key().data(), it->key().size())) + continue; + result.emplace_back(it->value().data(), it->value().size()); + } + } + return result; +} + const IStorage *RocksDBStorageProvider::clone() const { std::unique_lock l(m_lock); const rocksdb::Snapshot *psnapshot = const_cast(this)->m_spdb->GetSnapshot(); - return new RocksDBStorageProvider(m_pfactory, const_cast(this)->m_spdb, const_cast(this)->m_spcolfamily, psnapshot, m_count); + return new RocksDBStorageProvider(m_pfactory, const_cast(this)->m_spdb, const_cast(this)->m_spcolfamily, const_cast(this)->m_spexpirecolfamily, psnapshot, m_count); } RocksDBStorageProvider::~RocksDBStorageProvider() @@ -268,6 +336,7 @@ void RocksDBStorageProvider::batch_unlock() void RocksDBStorageProvider::flush() { m_spdb->SyncWAL(); + m_spdb->Flush(rocksdb::FlushOptions()); } bool RocksDBStorageProvider::FKeyExists(std::string& key) const @@ -276,4 +345,12 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const if (m_spbatch) return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok(); return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok(); +} + +bool RocksDBStorageProvider::FExpireExists(std::string& key) const +{ + rocksdb::PinnableSlice slice; + if (m_spbatch) + return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok(); + return m_spdb->Get(ReadOptions(), m_spexpirecolfamily.get(), rocksdb::Slice(key), &slice).ok(); } \ No newline at end of file diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index b78788eb2..dd6196a55 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -10,6 +10,7 @@ static const char count_key[] = INTERNAL_KEY_PREFIX "__keydb__count\1"; static const char version_key[] = INTERNAL_KEY_PREFIX "__keydb__version\1"; static const char meta_key[] = INTERNAL_KEY_PREFIX "__keydb__metadata\1"; +static const char last_expire_key[] = INTERNAL_KEY_PREFIX "__keydb__last_expire_time"; class RocksDBStorageFactory; class RocksDBStorageProvider : public IStorage @@ -19,12 +20,13 @@ class RocksDBStorageProvider : public IStorage std::unique_ptr m_spbatch; const rocksdb::Snapshot *m_psnapshot = nullptr; std::shared_ptr m_spcolfamily; + std::shared_ptr m_spexpirecolfamily; rocksdb::ReadOptions m_readOptionsTemplate; size_t m_count = 0; mutable fastlock m_lock {"RocksDBStorageProvider"}; public: - RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count); + RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr &spdb, std::shared_ptr &spcolfam, std::shared_ptr &spexpirecolfam, const rocksdb::Snapshot *psnapshot, size_t count); ~RocksDBStorageProvider(); virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override; @@ -34,6 +36,11 @@ public: virtual bool enumerate(callback fn) const override; virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override; + virtual std::vector getExpirationCandidates(unsigned int count) override; + virtual std::vector getEvictionCandidates(unsigned int count) override; + virtual void setExpire(const char *key, size_t cchKey, long long expire) override; + virtual void removeExpire(const char *key, size_t cchKey, long long expire) override; + virtual const IStorage *clone() const override; virtual void beginWriteBatch() override; @@ -50,6 +57,7 @@ public: protected: bool FKeyExists(std::string&) const; + bool FExpireExists(std::string&) const; const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } rocksdb::WriteOptions WriteOptions() const; diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index dc27f6987..ff545d6ba 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -5,6 +5,7 @@ class RocksDBStorageFactory : public IStorageFactory { std::shared_ptr m_spdb; // Note: This must be first so it is deleted last std::vector> m_vecspcols; + std::vector> m_vecspexpirecols; std::shared_ptr m_pfilemanager; std::string m_path; bool m_fCreatedTempFolder = false; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 5c3beeb4b..7087a0136 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -61,8 +61,8 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT); // RocksDB requires we know the count of col families before opening, if the user only wants to see less // we still have to make room for all column family handles regardless - if (status.ok() && (int)vecT.size() > dbnum) - dbnum = (int)vecT.size(); + if (status.ok() && (int)vecT.size()/2 > dbnum) + dbnum = (int)vecT.size()/2; std::vector veccoldesc; veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); // ignore default col family @@ -79,6 +79,7 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons rocksdb::ColumnFamilyOptions cf_options(options); cf_options.level_compaction_dynamic_level_bytes = true; veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options)); + veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb) + "_expires", cf_options)); } if (rgchConfig != nullptr) @@ -100,23 +101,29 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons m_spdb = std::shared_ptr(db); for (auto handle : handles) { - std::string strVersion; - auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion); - if (!status.ok()) - { - setVersion(handle); - } - else - { - SymVer ver = parseVersion(strVersion.c_str()); - auto cmp = compareVersion(&ver); - if (cmp == NewerVersion) - throw "Cannot load FLASH database created by newer version of KeyDB"; - if (cmp == OlderVersion) + if (handle->GetName().size() > 7 && !strncmp(handle->GetName().substr(handle->GetName().size() - 7).c_str(), "expires", 7)) { + m_vecspexpirecols.emplace_back(handle); + } else { + std::string strVersion; + auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion); + if (!status.ok()) + { setVersion(handle); + } + else + { + SymVer ver = parseVersion(strVersion.c_str()); + auto cmp = compareVersion(&ver); + if (cmp == NewerVersion) + throw "Cannot load FLASH database created by newer version of KeyDB"; + if (cmp == IncompatibleVersion) + throw "Cannot load FLASH database from before 6.3.4"; + if (cmp == OlderVersion) + setVersion(handle); + } + m_vecspcols.emplace_back(handle); } - m_vecspcols.emplace_back(handle); - } + } } RocksDBStorageFactory::~RocksDBStorageFactory() @@ -156,6 +163,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr { ++db; // skip default col family std::shared_ptr spcolfamily(m_vecspcols[db].release()); + std::shared_ptr spexpirecolfamily(m_vecspexpirecols[db].release()); size_t count = 0; bool fUnclean = false; @@ -192,7 +200,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr ++count; } } - return new RocksDBStorageProvider(this, m_spdb, spcolfamily, nullptr, count); + return new RocksDBStorageProvider(this, m_spdb, spcolfamily, spexpirecolfamily, nullptr, count); } const char *RocksDBStorageFactory::name() const diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index cb8c384f1..f95dbbe6a 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -1,6 +1,7 @@ #include "../IStorage.h" #include #include +#include class TestStorageFactory : public IStorageFactory { @@ -28,6 +29,11 @@ public: virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override; virtual size_t count() const override; + virtual std::vector getExpirationCandidates(unsigned int) override { return std::vector(); } + virtual std::vector getEvictionCandidates(unsigned int) override { return std::vector(); } + virtual void setExpire(const char *, size_t, long long) override {} + virtual void removeExpire(const char *, size_t, long long) override {} + virtual void flush() override; /* This is permitted to be a shallow clone */ diff --git a/src/version.h b/src/version.h index a747c45a6..26c498813 100644 --- a/src/version.h +++ b/src/version.h @@ -7,6 +7,7 @@ enum VersionCompareResult EqualVersion, OlderVersion, NewerVersion, + IncompatibleVersion, }; struct SymVer diff --git a/tests/unit/flash.tcl b/tests/unit/flash.tcl index e66248fb2..52154f804 100644 --- a/tests/unit/flash.tcl +++ b/tests/unit/flash.tcl @@ -74,8 +74,11 @@ if {$::flash_enabled} { r set testkey foo ex 1 r flushall cache assert_equal {1} [r dbsize] - after 1500 - assert_equal {0} [r dbsize] + wait_for_condition 50 1000 { + [r dbsize] == 0 + } else { + fail "key is not expired" + } } test { SUBKEY EXPIRE persists after cache flush } { @@ -140,17 +143,17 @@ if {$::flash_enabled} { r set $numkeys xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx incr numkeys if {[s used_memory]+1024 >= $limit} { - break + break } } # Add additional keys to force eviction # should still be under the limit for maxmemory, however all keys set should still exist between flash and memory # check same number of keys exist in addition to values of first and last keys set err 0 - set extra_keys [expr floor([expr ($limit * 0.4) / 1024])] + set extra_keys [expr floor([expr ($limit * 0.4) / 1024])] for {set j 0} {$j < $extra_keys} {incr j} { catch { - r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx } err assert {$err == {OK}} } @@ -165,7 +168,49 @@ if {$::flash_enabled} { assert {[r get last] == {val}} r flushall } - } + test "FLASH - is flash eviction working? (policy $policy)" { + # Get the current memory limit and calculate a new limit. + # Set limit to 100M. + set used [s used_memory] + set limit [expr {$used+60*1024*1024}] + r config set maxmemory $limit + r config set maxmemory-policy $policy + # Now add keys equivalent to 1024b until the limit is almost reached. + set numkeys 0 + r set first val + while 1 { + r set $numkeys xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + incr numkeys + if {[s used_memory]+1024 >= $limit} { + break + } + } + # Add additional keys to force eviction + # should still be under the limit for maxmemory, however all keys set should still exist between flash and memory + # check same number of keys exist in addition to values of first and last keys + set err 0 + set extra_keys [expr floor([expr ($limit * 0.4) / 1024])] + for {set j 0} {$j < $extra_keys} {incr j} { + catch { + r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + } err + assert {$err == {OK}} + } + if {[log_file_matches [srv 0 stdout] "*Failed to evict*"]} { + fail "Server did not evict cleanly (detected full flush)" + } + r set last val + r debug flush-storage + r config set maxstorage 1 + r config set maxmemory 1 + set dbsize [r dbsize] + # after setting maxstorage and memory below used amount we should evict from storage provider + assert {$dbsize < $numkeys+$extra_keys+2} + r config set maxstorage 0 + r config set maxmemory 0 + r flushall + } + } } } diff --git a/tests/unit/moduleapi/load.tcl b/tests/unit/moduleapi/load.tcl index 853b9aebb..12ca22402 100644 --- a/tests/unit/moduleapi/load.tcl +++ b/tests/unit/moduleapi/load.tcl @@ -1,7 +1,7 @@ set testmodule [file normalize tests/modules/load.so] if {$::flash_enabled} { - start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master} databases 256 loadmodule $testmodule]] { + start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master.load.test} databases 256 loadmodule $testmodule]] { test "Module is notified of keys loaded from flash" { r flushall r set foo bar