From 8f8f9b7a461bc0f77dc40c7ecf1b6dddd8585920 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 8 Dec 2019 20:06:22 -0500 Subject: [PATCH] Flash running well Former-commit-id: 9cf393eb9fb69bcc7550cd8b83e1568b3f360310 --- src/IStorage.h | 3 +++ src/db.cpp | 57 +++++++++++++++++++++-------------------- src/server.cpp | 24 +++++++++++------ src/server.h | 13 +++++++--- src/storage/rocksdb.cpp | 43 ++++++++++++++++++++++++++----- 5 files changed, 95 insertions(+), 45 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 48aab51d9..975e7f0cf 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -14,6 +14,9 @@ public: virtual size_t clear() = 0; virtual void enumerate(callback fn) const = 0; + virtual void beginWriteBatch() {} // NOP + virtual void endWriteBatch() {} // NOP + /* This is permitted to be a shallow clone */ virtual const IStorage *clone() const = 0; }; diff --git a/src/db.cpp b/src/db.cpp index 1a3257809..c9d43bda4 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1918,8 +1918,8 @@ void redisDb::initialize(int id) this->avg_ttl = 0; this->last_expire_set = 0; this->defrag_later = listCreate(); - //if (id == 0) - // this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); + if (id == 0) + this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); } bool redisDbPersistentData::insert(char *key, robj *o) @@ -1967,29 +1967,6 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_pdbSnapshot = nullptr; } -/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) -{ - redisDbPersistentData aux = std::move(*db1); - db1->m_pdict = db2->m_pdict; - db1->m_fTrackingChanges = db2->m_fTrackingChanges; - db1->m_fAllChanged = db2->m_fAllChanged; - db1->m_setexpire = db2->m_setexpire; - db1->m_spstorage = std::move(db2->m_spstorage); - db1->m_pdbSnapshot = db2->m_pdbSnapshot; - db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER); - - db2->m_pdict = aux.m_pdict; - db2->m_fTrackingChanges = aux.m_fTrackingChanges; - db2->m_fAllChanged = aux.m_fAllChanged; - db2->m_setexpire = aux.m_setexpire; - db2->m_spstorage = std::move(aux.m_spstorage); - db2->m_pdbSnapshot = aux.m_pdbSnapshot; - db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER); - - db1->m_pdict->privdata = static_cast(db1); - db2->m_pdict->privdata = static_cast(db2); -} - void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) { /* Reuse the sds from the main dict in the expire dict */ @@ -2101,15 +2078,18 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -void redisDbPersistentData::processChanges() +redisDbPersistentData::changelist redisDbPersistentData::processChanges() { serverAssert(GlobalLocksAcquired()); --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); + changelist vecRet; + fastlock_lock(&m_lockStorage); if (m_spstorage != nullptr) { + m_spstorage->beginWriteBatch(); if (m_fTrackingChanges >= 0) { if (m_fAllChanged) @@ -2119,12 +2099,13 @@ void redisDbPersistentData::processChanges() } else { - for (auto &key : m_vecchanged) + for (unique_sds_ptr &key : m_vecchanged) { robj *o = find(key.get()); if (o != nullptr) { - storeKey(key.get(), sdslen(key.get()), o); + sds temp = serializeStoredObject(o); + vecRet.emplace_back(std::move(key), unique_sds_ptr(temp)); } else { @@ -2135,6 +2116,19 @@ void redisDbPersistentData::processChanges() m_vecchanged.clear(); } } + + return vecRet; +} + +void redisDbPersistentData::commitChanges(const changelist &vec) +{ + for (auto &pair : vec) + { + m_spstorage->insert(pair.first.get(), sdslen(pair.first.get()), pair.second.get(), sdslen(pair.second.get())); + } + if (m_spstorage != nullptr) + m_spstorage->endWriteBatch(); + fastlock_unlock(&m_lockStorage); } redisDbPersistentData::~redisDbPersistentData() @@ -2181,6 +2175,13 @@ size_t redisDbPersistentData::size() const void redisDbPersistentData::removeCachedValue(const char *key) { serverAssert(m_spstorage != nullptr); + // First ensure its not a pending key + for (auto &spkey : m_vecchanged) + { + if (sdscmp(spkey.get(), (sds)key) == 0) + return; // NOP + } + dictEntry *de = dictFind(m_pdict, key); serverAssert(de != nullptr); decrRefCount((robj*)dictGetVal(de)); diff --git a/src/server.cpp b/src/server.cpp index 34aea10de..7b6ebaf47 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2128,7 +2128,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { - UNUSED(eventLoop); + int iel = ielFromEventLoop(eventLoop); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), @@ -2166,33 +2166,42 @@ void beforeSleep(struct aeEventLoop *eventLoop) { moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) + if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { - processUnblockedClients(IDX_EVENT_LOOP_MAIN); + processUnblockedClients(iel); } /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); static thread_local bool fFirstRun = true; + // note: we also copy the DB pointer in case a DB swap is done while the lock is released + std::vector> vecchanges; if (!fFirstRun) { for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb]->processChanges(); + { + auto vec = g_pserver->db[idb]->processChanges(); + vecchanges.emplace_back(g_pserver->db[idb], std::move(vec)); + } } else { fFirstRun = false; } + aeReleaseLock(); + for (auto &pair : vecchanges) + pair.first->commitChanges(pair.second); + /* Handle writes with pending output buffers. */ - handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); + handleClientsWithPendingWrites(iel); if (serverTL->gcEpoch != 0) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); serverTL->gcEpoch = 0; aeAcquireLock(); /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); + freeClientsInAsyncFreeQueue(iel); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -5121,9 +5130,8 @@ void *workerThreadMain(void *parg) serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run - int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; - aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); + aeSetBeforeSleepProc(el, beforeSleep, 0); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); aeMain(el); aeDeleteEventLoop(el); diff --git a/src/server.h b/src/server.h index a7aa8dc9b..35e1fe094 100644 --- a/src/server.h +++ b/src/server.h @@ -1250,8 +1250,6 @@ public: redisDbPersistentData() = default; redisDbPersistentData(redisDbPersistentData &&) = default; - static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); - size_t slots() const { return dictSlots(m_pdict); } size_t size() const; void expand(uint64_t slots) { dictExpand(m_pdict, slots); } @@ -1319,7 +1317,14 @@ public: void setStorageProvider(IStorage *pstorage); void trackChanges(); - void processChanges(); + + // Process and commit changes for secondary storage. Note that process and commit are seperated + // to allow you to release the global lock before commiting. To prevent deadlocks you *must* + // either release the global lock or keep the same global lock between the two functions as + // a second look is kept to ensure writes to secondary storage are ordered + typedef std::vector> changelist; + changelist processChanges(); + void commitChanges(const changelist &vec); // This should only be used if you look at the key, we do not fixup // objects stored elsewhere @@ -1361,6 +1366,7 @@ private: const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; std::unique_ptr m_spdbSnapshotHOLDER; int m_refCount = 0; + fastlock m_lockStorage { "storage" }; }; class redisDbPersistentDataSnapshot : protected redisDbPersistentData @@ -1452,6 +1458,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::getExpire; using redisDbPersistentData::trackChanges; using redisDbPersistentData::processChanges; + using redisDbPersistentData::commitChanges; using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpire; using redisDbPersistentData::createSnapshot; diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index de3174dab..4d352eef2 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -6,6 +6,7 @@ class RocksDBStorageProvider : public IStorage { std::shared_ptr m_spdb; + std::unique_ptr m_spbatch; const rocksdb::Snapshot *m_psnapshot = nullptr; rocksdb::ReadOptions m_readOptionsTemplate; @@ -21,12 +22,16 @@ public: virtual const IStorage *clone() const override; + virtual void beginWriteBatch() override; + virtual void endWriteBatch() override; + size_t count() const; protected: RocksDBStorageProvider(std::shared_ptr &spdb); const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } + rocksdb::WriteOptions WriteOptions() const; }; IStorage *create_rocksdb_storage(const char *dbfile) @@ -57,16 +62,24 @@ RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr &spd void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb) { - auto status = m_spdb->Put(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); + rocksdb::Status status; + if (m_spbatch != nullptr) + status = m_spbatch->Put(rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); + else + status = m_spdb->Put(WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); if (!status.ok()) - throw status; + throw status.ToString(); } void RocksDBStorageProvider::erase(const char *key, size_t cchKey) { - auto status = m_spdb->Delete(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey)); + rocksdb::Status status; + if (m_spbatch != nullptr) + status = m_spbatch->Delete(rocksdb::Slice(key, cchKey)); + else + status = m_spdb->Delete(WriteOptions(), rocksdb::Slice(key, cchKey)); if (!status.ok()) - throw status; + throw status.ToString(); } void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const @@ -74,7 +87,7 @@ void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback f std::string value; auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value); if (!status.ok()) - throw status; + throw status.ToString(); fn(key, cchKey, value.data(), value.size()); } @@ -83,7 +96,7 @@ size_t RocksDBStorageProvider::clear() size_t celem = count(); auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily()); if (!status.ok()) - throw status; + throw status.ToString(); return celem; } @@ -119,4 +132,22 @@ RocksDBStorageProvider::~RocksDBStorageProvider() if (m_psnapshot != nullptr) m_spdb->ReleaseSnapshot(m_psnapshot); } +} + +rocksdb::WriteOptions RocksDBStorageProvider::WriteOptions() const +{ + auto opt = rocksdb::WriteOptions(); + opt.disableWAL = true; + return opt; +} + +void RocksDBStorageProvider::beginWriteBatch() +{ + m_spbatch = std::make_unique(); +} + +void RocksDBStorageProvider::endWriteBatch() +{ + m_spdb->Write(WriteOptions(), m_spbatch.get()); + m_spbatch = nullptr; } \ No newline at end of file