Add more test code, and fix bugs uncovered
Former-commit-id: 5362fa4b62f89cbc1e92e01c73a45c4e3718708b
This commit is contained in:
parent
bd4d372d45
commit
4fd552d8b6
@ -214,7 +214,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=keydb-pro-server
|
||||
REDIS_SENTINEL_NAME=keydb-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o keydbutils.o $(ASM_OBJ)
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o $(ASM_OBJ)
|
||||
REDIS_CLI_NAME=keydb-cli
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
|
||||
REDIS_BENCHMARK_NAME=keydb-benchmark
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "storage/rocksdbfactory.h"
|
||||
#include "storage/teststorageprovider.h"
|
||||
#include "cluster.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
@ -142,6 +143,7 @@ configYesNo configs_yesno[] = {
|
||||
{"replica-read-only","slave-read-only",&g_pserver->repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY},
|
||||
{"replica-ignore-maxmemory","slave-ignore-maxmemory",&g_pserver->repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY},
|
||||
{"multi-master",NULL,&g_pserver->enable_multimaster,false,CONFIG_DEFAULT_ENABLE_MULTIMASTER},
|
||||
{"delete-on-evict",NULL,&cserver.delete_on_evict,true,false},
|
||||
{NULL, NULL, 0, 0}
|
||||
};
|
||||
|
||||
@ -218,15 +220,19 @@ void queueLoadModule(sds path, sds *argv, int argc) {
|
||||
|
||||
static bool initializeStorageProvider(sds *argv, int argc, const char **err)
|
||||
{
|
||||
bool fResult = false;
|
||||
bool fTest = false;
|
||||
if (!strcasecmp(argv[0], "flash") && argc == 2)
|
||||
{
|
||||
// Create The Storage Factory (if necessary)
|
||||
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(argv[1], cserver.dbnum);
|
||||
fResult = true;
|
||||
}
|
||||
else if (!strcasecmp(argv[0], "test") && argc == 1)
|
||||
{
|
||||
g_pserver->m_pstorageFactory = new (MALLOC_LOCAL) TestStorageFactory();
|
||||
fTest = true;
|
||||
}
|
||||
|
||||
if (fResult)
|
||||
if (g_pserver->m_pstorageFactory != nullptr && !fTest)
|
||||
{
|
||||
// We need to set max memory to a sane default so keys are actually evicted properly
|
||||
if (g_pserver->maxmemory == 0 && g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
||||
@ -244,7 +250,7 @@ static bool initializeStorageProvider(sds *argv, int argc, const char **err)
|
||||
{
|
||||
*err = "Unknown storage provider";
|
||||
}
|
||||
return fResult;
|
||||
return g_pserver->m_pstorageFactory != nullptr;
|
||||
}
|
||||
|
||||
void loadServerConfigFromString(char *config) {
|
||||
|
46
src/db.cpp
46
src/db.cpp
@ -677,33 +677,33 @@ void randomkeyCommand(client *c) {
|
||||
|
||||
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
|
||||
{
|
||||
if (m_spstorage != nullptr)
|
||||
{
|
||||
bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{
|
||||
sds sdsKey = sdsnewlen(key, cchKey);
|
||||
dictEntry *de = dictFind(m_pdict, sdsKey);
|
||||
bool fEvict = (de == nullptr);
|
||||
ensure(sdsKey, &de);
|
||||
bool fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de));
|
||||
if (fEvict)
|
||||
removeCachedValue(sdsKey);
|
||||
sdsfree(sdsKey);
|
||||
return fContinue;
|
||||
});
|
||||
return fSawAll;
|
||||
}
|
||||
|
||||
dictIterator *di = dictGetSafeIterator(m_pdict);
|
||||
dictEntry *de = nullptr;
|
||||
bool fResult = true;
|
||||
while(fResult && ((de = dictNext(di)) != nullptr))
|
||||
{
|
||||
ensure((const char*)dictGetKey(de), &de);
|
||||
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
|
||||
fResult = false;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
if (m_spstorage != nullptr)
|
||||
{
|
||||
bool fSawAll = fResult && m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{
|
||||
sds sdsKey = sdsnewlen(key, cchKey);
|
||||
bool fContinue = true;
|
||||
if (dictFind(m_pdict, sdsKey) == nullptr)
|
||||
{
|
||||
ensure(sdsKey, &de);
|
||||
fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de));
|
||||
removeCachedValue(sdsKey);
|
||||
}
|
||||
sdsfree(sdsKey);
|
||||
return fContinue;
|
||||
});
|
||||
return fSawAll;
|
||||
}
|
||||
|
||||
if (fResult && m_pdbSnapshot != nullptr)
|
||||
{
|
||||
fResult = m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr){
|
||||
@ -1925,7 +1925,7 @@ void redisDbPersistentData::initialize()
|
||||
m_pdict = dictCreate(&dbDictType,this);
|
||||
m_pdictTombstone = dictCreate(&dbDictType,this);
|
||||
m_setexpire = new(MALLOC_LOCAL) expireset();
|
||||
m_fAllChanged = false;
|
||||
m_fAllChanged = 0;
|
||||
m_fTrackingChanges = 0;
|
||||
}
|
||||
|
||||
@ -1987,7 +1987,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
|
||||
{
|
||||
dictEmpty(m_pdict,callback);
|
||||
if (m_fTrackingChanges)
|
||||
m_fAllChanged = true;
|
||||
m_fAllChanged++;
|
||||
delete m_setexpire;
|
||||
m_setexpire = new (MALLOC_LOCAL) expireset();
|
||||
if (m_spstorage != nullptr)
|
||||
@ -2145,7 +2145,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
|
||||
{
|
||||
m_spstorage->clear();
|
||||
storeDatabase();
|
||||
m_fAllChanged = false;
|
||||
m_fAllChanged--;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -2237,9 +2237,11 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
||||
return true;
|
||||
}
|
||||
|
||||
void redisDbPersistentData::trackChanges()
|
||||
void redisDbPersistentData::trackChanges(bool fBulk)
|
||||
{
|
||||
m_fTrackingChanges++;
|
||||
if (fBulk)
|
||||
m_fAllChanged++;
|
||||
}
|
||||
|
||||
void redisDbPersistentData::removeAllCachedValues()
|
||||
@ -2249,7 +2251,7 @@ void redisDbPersistentData::removeAllCachedValues()
|
||||
{
|
||||
auto vec = processChanges();
|
||||
commitChanges(vec);
|
||||
trackChanges();
|
||||
trackChanges(false);
|
||||
}
|
||||
|
||||
dictEmpty(m_pdict, nullptr);
|
||||
|
@ -594,7 +594,7 @@ int freeMemoryIfNeeded(void) {
|
||||
if (bestkey) {
|
||||
db = g_pserver->db[bestdbid];
|
||||
|
||||
if (db->FStorageProvider())
|
||||
if (!cserver.delete_on_evict && db->FStorageProvider())
|
||||
{
|
||||
// This key is in the storage so we only need to free the object
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
|
12
src/rdb.cpp
12
src/rdb.cpp
@ -1993,6 +1993,11 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
robj *subexpireKey = nullptr;
|
||||
robj *key = nullptr;
|
||||
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
{
|
||||
g_pserver->db[idb]->trackChanges(true);
|
||||
}
|
||||
|
||||
rdb->update_cksum = rdbLoadProgressCallback;
|
||||
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
|
||||
if (rioRead(rdb,buf,9) == 0) goto eoferr;
|
||||
@ -2252,9 +2257,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
{
|
||||
auto vec = g_pserver->db[idb]->processChanges();
|
||||
g_pserver->db[idb]->commitChanges(vec);
|
||||
}
|
||||
return C_OK;
|
||||
|
||||
eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
// we don't need to commit changes, because we're about to exit()
|
||||
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
|
||||
return C_ERR; /* Just to avoid warning */
|
||||
|
@ -2290,7 +2290,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
aeAcquireLock();
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
g_pserver->db[idb]->trackChanges();
|
||||
g_pserver->db[idb]->trackChanges(false);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
|
@ -1318,7 +1318,7 @@ public:
|
||||
|
||||
void setStorageProvider(IStorage *pstorage);
|
||||
|
||||
void trackChanges();
|
||||
void trackChanges(bool fBulk);
|
||||
|
||||
// Process and commit changes for secondary storage. Note that process and commit are seperated
|
||||
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
||||
@ -1355,7 +1355,7 @@ private:
|
||||
dict *m_pdict = nullptr; /* The keyspace for this DB */
|
||||
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
||||
int m_fTrackingChanges = 0; // Note: Stack based
|
||||
bool m_fAllChanged = false;
|
||||
int m_fAllChanged = 0;
|
||||
std::vector<unique_sds_ptr> m_vecchanged;
|
||||
std::shared_ptr<IStorage> m_spstorage = nullptr;
|
||||
uint64_t mvccCheckpoint = 0;
|
||||
@ -1949,6 +1949,7 @@ struct redisServerConst {
|
||||
|
||||
sds license_key = nullptr;
|
||||
int trial_timeout = 20;
|
||||
int delete_on_evict = false; // Only valid when a storage provider is set
|
||||
};
|
||||
|
||||
struct redisServer {
|
||||
|
@ -256,17 +256,32 @@ dict_iter redisDbPersistentDataSnapshot::find_cached_threadsafe(const char *key)
|
||||
|
||||
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly) const
|
||||
{
|
||||
// Take the size so we can ensure we visited every element exactly once
|
||||
// use volatile to ensure it's not checked too late. This makes it more
|
||||
// likely we'll detect races (but it won't gurantee it)
|
||||
volatile size_t celem = size();
|
||||
|
||||
dictEntry *de = nullptr;
|
||||
bool fResult = true;
|
||||
|
||||
dictIterator *di = dictGetSafeIterator(m_pdict);
|
||||
while(fResult && ((de = dictNext(di)) != nullptr))
|
||||
{
|
||||
--celem;
|
||||
robj *o = fKeyOnly ? nullptr : (robj*)dictGetVal(de);
|
||||
if (!fn((const char*)dictGetKey(de), o))
|
||||
fResult = false;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
|
||||
if (m_spstorage != nullptr)
|
||||
{
|
||||
bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){
|
||||
bool fSawAll = fResult && m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){
|
||||
sds sdsKey = sdsnewlen(key, cchKey);
|
||||
dictEntry *de = dictFind(m_pdict, sdsKey);
|
||||
bool fContinue = false;
|
||||
if (de != nullptr)
|
||||
{
|
||||
fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de));
|
||||
}
|
||||
else
|
||||
bool fContinue = true;
|
||||
if (de == nullptr)
|
||||
{
|
||||
robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData);
|
||||
fContinue = fn(sdsKey, o);
|
||||
@ -280,34 +295,6 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
||||
return fSawAll;
|
||||
}
|
||||
|
||||
dictEntry *de = nullptr;
|
||||
bool fResult = true;
|
||||
|
||||
// Take the size so we can ensure we visited every element exactly once
|
||||
// use volatile to ensure it's not checked too late. This makes it more
|
||||
// likely we'll detect races (but it won't gurantee it)
|
||||
volatile size_t celem = size();
|
||||
|
||||
dictIterator *di = dictGetSafeIterator(m_pdict);
|
||||
while(fResult && ((de = dictNext(di)) != nullptr))
|
||||
{
|
||||
--celem;
|
||||
robj *o = (robj*)dictGetVal(de);
|
||||
if (o == nullptr && !fKeyOnly)
|
||||
{
|
||||
m_spstorage->retrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){
|
||||
o = deserializeStoredObject(this, (const char*)dictGetKey(de), data, cb);
|
||||
});
|
||||
}
|
||||
|
||||
if (!fn((const char*)dictGetKey(de), o))
|
||||
fResult = false;
|
||||
|
||||
if (o != nullptr && dictGetVal(de) == nullptr)
|
||||
decrRefCount(o);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
const redisDbPersistentDataSnapshot *psnapshot;
|
||||
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||
if (fResult && psnapshot != nullptr)
|
||||
|
@ -1,4 +1,4 @@
|
||||
start_server {tags {"flash"} overrides {"storage-provider flash ./rocks.db"}} {
|
||||
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no]] {
|
||||
|
||||
test { FLASH - GET works after eviction } {
|
||||
r set testkey foo
|
||||
|
Loading…
x
Reference in New Issue
Block a user