Flash running well

Former-commit-id: 9cf393eb9fb69bcc7550cd8b83e1568b3f360310
This commit is contained in:
John Sully 2019-12-08 20:06:22 -05:00
parent 8851ddfd4a
commit 8f8f9b7a46
5 changed files with 95 additions and 45 deletions

View File

@ -14,6 +14,9 @@ public:
virtual size_t clear() = 0; virtual size_t clear() = 0;
virtual void enumerate(callback fn) const = 0; virtual void enumerate(callback fn) const = 0;
virtual void beginWriteBatch() {} // NOP
virtual void endWriteBatch() {} // NOP
/* This is permitted to be a shallow clone */ /* This is permitted to be a shallow clone */
virtual const IStorage *clone() const = 0; virtual const IStorage *clone() const = 0;
}; };

View File

@ -1918,8 +1918,8 @@ void redisDb::initialize(int id)
this->avg_ttl = 0; this->avg_ttl = 0;
this->last_expire_set = 0; this->last_expire_set = 0;
this->defrag_later = listCreate(); this->defrag_later = listCreate();
//if (id == 0) if (id == 0)
// this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db"));
} }
bool redisDbPersistentData::insert(char *key, robj *o) bool redisDbPersistentData::insert(char *key, robj *o)
@ -1967,29 +1967,6 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_pdbSnapshot = nullptr; m_pdbSnapshot = nullptr;
} }
/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2)
{
redisDbPersistentData aux = std::move(*db1);
db1->m_pdict = db2->m_pdict;
db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire;
db1->m_spstorage = std::move(db2->m_spstorage);
db1->m_pdbSnapshot = db2->m_pdbSnapshot;
db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER);
db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire;
db2->m_spstorage = std::move(aux.m_spstorage);
db2->m_pdbSnapshot = aux.m_pdbSnapshot;
db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER);
db1->m_pdict->privdata = static_cast<redisDbPersistentData*>(db1);
db2->m_pdict->privdata = static_cast<redisDbPersistentData*>(db2);
}
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
{ {
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
@ -2101,15 +2078,18 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di); dictReleaseIterator(di);
} }
void redisDbPersistentData::processChanges() redisDbPersistentData::changelist redisDbPersistentData::processChanges()
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
--m_fTrackingChanges; --m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0); serverAssert(m_fTrackingChanges >= 0);
changelist vecRet;
fastlock_lock(&m_lockStorage);
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
{ {
m_spstorage->beginWriteBatch();
if (m_fTrackingChanges >= 0) if (m_fTrackingChanges >= 0)
{ {
if (m_fAllChanged) if (m_fAllChanged)
@ -2119,12 +2099,13 @@ void redisDbPersistentData::processChanges()
} }
else else
{ {
for (auto &key : m_vecchanged) for (unique_sds_ptr &key : m_vecchanged)
{ {
robj *o = find(key.get()); robj *o = find(key.get());
if (o != nullptr) if (o != nullptr)
{ {
storeKey(key.get(), sdslen(key.get()), o); sds temp = serializeStoredObject(o);
vecRet.emplace_back(std::move(key), unique_sds_ptr(temp));
} }
else else
{ {
@ -2135,6 +2116,19 @@ void redisDbPersistentData::processChanges()
m_vecchanged.clear(); m_vecchanged.clear();
} }
} }
return vecRet;
}
void redisDbPersistentData::commitChanges(const changelist &vec)
{
for (auto &pair : vec)
{
m_spstorage->insert(pair.first.get(), sdslen(pair.first.get()), pair.second.get(), sdslen(pair.second.get()));
}
if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
fastlock_unlock(&m_lockStorage);
} }
redisDbPersistentData::~redisDbPersistentData() redisDbPersistentData::~redisDbPersistentData()
@ -2181,6 +2175,13 @@ size_t redisDbPersistentData::size() const
void redisDbPersistentData::removeCachedValue(const char *key) void redisDbPersistentData::removeCachedValue(const char *key)
{ {
serverAssert(m_spstorage != nullptr); serverAssert(m_spstorage != nullptr);
// First ensure its not a pending key
for (auto &spkey : m_vecchanged)
{
if (sdscmp(spkey.get(), (sds)key) == 0)
return; // NOP
}
dictEntry *de = dictFind(m_pdict, key); dictEntry *de = dictFind(m_pdict, key);
serverAssert(de != nullptr); serverAssert(de != nullptr);
decrRefCount((robj*)dictGetVal(de)); decrRefCount((robj*)dictGetVal(de));

View File

@ -2128,7 +2128,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
* main loop of the event driven library, that is, before to sleep * main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */ * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) { void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); int iel = ielFromEventLoop(eventLoop);
/* Call the Redis Cluster before sleep function. Note that this function /* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa), * may change the state of Redis Cluster (from ok to fail or vice versa),
@ -2166,33 +2166,42 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients))
{ {
processUnblockedClients(IDX_EVENT_LOOP_MAIN); processUnblockedClients(iel);
} }
/* Write the AOF buffer on disk */ /* Write the AOF buffer on disk */
flushAppendOnlyFile(0); flushAppendOnlyFile(0);
static thread_local bool fFirstRun = true; static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<std::pair<redisDb*, redisDbPersistentData::changelist>> vecchanges;
if (!fFirstRun) { if (!fFirstRun) {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->processChanges(); {
auto vec = g_pserver->db[idb]->processChanges();
vecchanges.emplace_back(g_pserver->db[idb], std::move(vec));
}
} }
else { else {
fFirstRun = false; fFirstRun = false;
} }
aeReleaseLock(); aeReleaseLock();
for (auto &pair : vecchanges)
pair.first->commitChanges(pair.second);
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); handleClientsWithPendingWrites(iel);
if (serverTL->gcEpoch != 0) if (serverTL->gcEpoch != 0)
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0; serverTL->gcEpoch = 0;
aeAcquireLock(); aeAcquireLock();
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); freeClientsInAsyncFreeQueue(iel);
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
@ -5121,9 +5130,8 @@ void *workerThreadMain(void *parg)
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetBeforeSleepProc(el, beforeSleep, 0);
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
aeMain(el); aeMain(el);
aeDeleteEventLoop(el); aeDeleteEventLoop(el);

View File

@ -1250,8 +1250,6 @@ public:
redisDbPersistentData() = default; redisDbPersistentData() = default;
redisDbPersistentData(redisDbPersistentData &&) = default; redisDbPersistentData(redisDbPersistentData &&) = default;
static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2);
size_t slots() const { return dictSlots(m_pdict); } size_t slots() const { return dictSlots(m_pdict); }
size_t size() const; size_t size() const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
@ -1319,7 +1317,14 @@ public:
void setStorageProvider(IStorage *pstorage); void setStorageProvider(IStorage *pstorage);
void trackChanges(); void trackChanges();
void processChanges();
// Process and commit changes for secondary storage. Note that process and commit are seperated
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered
typedef std::vector<std::pair<unique_sds_ptr, unique_sds_ptr>> changelist;
changelist processChanges();
void commitChanges(const changelist &vec);
// This should only be used if you look at the key, we do not fixup // This should only be used if you look at the key, we do not fixup
// objects stored elsewhere // objects stored elsewhere
@ -1361,6 +1366,7 @@ private:
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER; std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
int m_refCount = 0; int m_refCount = 0;
fastlock m_lockStorage { "storage" };
}; };
class redisDbPersistentDataSnapshot : protected redisDbPersistentData class redisDbPersistentDataSnapshot : protected redisDbPersistentData
@ -1452,6 +1458,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::getExpire; using redisDbPersistentData::getExpire;
using redisDbPersistentData::trackChanges; using redisDbPersistentData::trackChanges;
using redisDbPersistentData::processChanges; using redisDbPersistentData::processChanges;
using redisDbPersistentData::commitChanges;
using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire; using redisDbPersistentData::setexpire;
using redisDbPersistentData::createSnapshot; using redisDbPersistentData::createSnapshot;

View File

@ -6,6 +6,7 @@
class RocksDBStorageProvider : public IStorage class RocksDBStorageProvider : public IStorage
{ {
std::shared_ptr<rocksdb::DB> m_spdb; std::shared_ptr<rocksdb::DB> m_spdb;
std::unique_ptr<rocksdb::WriteBatch> m_spbatch;
const rocksdb::Snapshot *m_psnapshot = nullptr; const rocksdb::Snapshot *m_psnapshot = nullptr;
rocksdb::ReadOptions m_readOptionsTemplate; rocksdb::ReadOptions m_readOptionsTemplate;
@ -21,12 +22,16 @@ public:
virtual const IStorage *clone() const override; virtual const IStorage *clone() const override;
virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
size_t count() const; size_t count() const;
protected: protected:
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb); RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb);
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
rocksdb::WriteOptions WriteOptions() const;
}; };
IStorage *create_rocksdb_storage(const char *dbfile) IStorage *create_rocksdb_storage(const char *dbfile)
@ -57,16 +62,24 @@ RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spd
void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb) void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb)
{ {
auto status = m_spdb->Put(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); rocksdb::Status status;
if (m_spbatch != nullptr)
status = m_spbatch->Put(rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
else
status = m_spdb->Put(WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
if (!status.ok()) if (!status.ok())
throw status; throw status.ToString();
} }
void RocksDBStorageProvider::erase(const char *key, size_t cchKey) void RocksDBStorageProvider::erase(const char *key, size_t cchKey)
{ {
auto status = m_spdb->Delete(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey)); rocksdb::Status status;
if (m_spbatch != nullptr)
status = m_spbatch->Delete(rocksdb::Slice(key, cchKey));
else
status = m_spdb->Delete(WriteOptions(), rocksdb::Slice(key, cchKey));
if (!status.ok()) if (!status.ok())
throw status; throw status.ToString();
} }
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const
@ -74,7 +87,7 @@ void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback f
std::string value; std::string value;
auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value); auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value);
if (!status.ok()) if (!status.ok())
throw status; throw status.ToString();
fn(key, cchKey, value.data(), value.size()); fn(key, cchKey, value.data(), value.size());
} }
@ -83,7 +96,7 @@ size_t RocksDBStorageProvider::clear()
size_t celem = count(); size_t celem = count();
auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily()); auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily());
if (!status.ok()) if (!status.ok())
throw status; throw status.ToString();
return celem; return celem;
} }
@ -119,4 +132,22 @@ RocksDBStorageProvider::~RocksDBStorageProvider()
if (m_psnapshot != nullptr) if (m_psnapshot != nullptr)
m_spdb->ReleaseSnapshot(m_psnapshot); m_spdb->ReleaseSnapshot(m_psnapshot);
} }
}
rocksdb::WriteOptions RocksDBStorageProvider::WriteOptions() const
{
auto opt = rocksdb::WriteOptions();
opt.disableWAL = true;
return opt;
}
void RocksDBStorageProvider::beginWriteBatch()
{
m_spbatch = std::make_unique<rocksdb::WriteBatch>();
}
void RocksDBStorageProvider::endWriteBatch()
{
m_spdb->Write(WriteOptions(), m_spbatch.get());
m_spbatch = nullptr;
} }