diff --git a/src/server.cpp b/src/server.cpp index f7ac7545e..f59288a3a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1646,6 +1646,62 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } +SymVer parseVersion(const char *version) +{ + SymVer ver = {-1,-1,-1}; + long versions[3] = {-1,-1,-1}; + const char *start = version; + const char *end = nullptr; + + for (int iver = 0; iver < 3; ++iver) + { + end = start; + while (*end != '\0' && *end != '.') + ++end; + + if (start >= end) + return ver; + + if (!string2l(start, end - start, versions + iver)) + return ver; + if (*end != '\0') + start = end+1; + else + break; + } + ver.major = versions[0]; + ver.minor = versions[1]; + ver.build = versions[2]; + + return ver; +} + +VersionCompareResult compareVersion(SymVer *pver) +{ + SymVer symVerThis = parseVersion(KEYDB_REAL_VERSION); + for (int iver = 0; iver < 3; ++iver) + { + long verThis, verOther; + switch (iver) + { + case 0: + verThis = symVerThis.major; verOther = pver->major; + break; + case 1: + verThis = symVerThis.minor; verOther = pver->minor; + break; + case 2: + verThis = symVerThis.build; verOther = pver->build; + } + + if (verThis < verOther) + return VersionCompareResult::NewerVersion; + if (verThis > verOther) + return VersionCompareResult::OlderVersion; + } + return VersionCompareResult::EqualVerison; +} + /* This function is used in order to track clients using the biggest amount * of memory in the latest few seconds. This way we can provide such information * in the INFO output (clients section), without having to do an O(N) scan for @@ -5397,6 +5453,13 @@ int main(int argc, char **argv) { std::set_terminate(OnTerminate); + { + SymVer version; + version = parseVersion(KEYDB_REAL_VERSION); + serverAssert(version.major >= 0 && version.minor >= 0 && version.build >= 0); + serverAssert(compareVersion(&version) == VersionCompareResult::EqualVerison); + } + #ifdef USE_MEMKIND storage_init(NULL, 0); #endif diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 386ae7397..dd88a86a8 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -99,6 +99,15 @@ const IStorage *RocksDBStorageProvider::clone() const RocksDBStorageProvider::~RocksDBStorageProvider() { + if (m_spbatch != nullptr) + endWriteBatch(); + + if (m_spdb != nullptr && m_psnapshot == nullptr) + { + insert(count_key, sizeof(count_key), &m_count, sizeof(m_count), false); + flush(); + } + if (m_spdb != nullptr) { if (m_psnapshot != nullptr) diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index b5d3308fd..21e57db92 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -5,6 +5,9 @@ #include #include "../fastlock.h" +static const char count_key[] = "\0__keydb__count\1"; +static const char version_key[] = "\0__keydb__version\1"; + class RocksDBStorageProvider : public IStorage { std::shared_ptr m_spdb; // Note: This must be first so it is deleted last diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 8c5cacef2..f02701130 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -1,4 +1,5 @@ #include "rocksdb.h" +#include "../version.h" #include #include #include @@ -14,6 +15,9 @@ public: virtual IStorage *create(int db) override; virtual const char *name() const override; + +private: + void setVersion(rocksdb::ColumnFamilyHandle*); }; IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum) @@ -64,9 +68,26 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum) if (!status.ok()) throw status.ToString(); - for (auto handle : handles) - m_vecspcols.emplace_back(handle); m_spdb = std::shared_ptr(db); + for (auto handle : handles) + { + std::string strVersion; + auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion); + if (!status.ok()) + { + setVersion(handle); + } + else + { + SymVer ver = parseVersion(strVersion.c_str()); + auto cmp = compareVersion(&ver); + if (cmp == NewerVersion) + throw "Cannot load FLASH database created by newer version of KeyDB"; + if (cmp == OlderVersion) + setVersion(handle); + } + m_vecspcols.emplace_back(handle); + } } RocksDBStorageFactory::~RocksDBStorageFactory() @@ -74,14 +95,33 @@ RocksDBStorageFactory::~RocksDBStorageFactory() m_spdb->SyncWAL(); } +void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle) +{ + auto status = m_spdb->Put(rocksdb::WriteOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), rocksdb::Slice(KEYDB_REAL_VERSION, strlen(KEYDB_REAL_VERSION)+1)); + if (!status.ok()) + throw status.ToString(); +} + IStorage *RocksDBStorageFactory::create(int db) { ++db; // skip default col family std::shared_ptr spcolfamily(m_vecspcols[db].release()); size_t count = 0; - std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get())); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - ++count; + + std::string value; + auto status = m_spdb->Get(rocksdb::ReadOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key)), &value); + if (status.ok() && value.size() == sizeof(size_t)) + { + count = *reinterpret_cast(value.data()); + m_spdb->Delete(rocksdb::WriteOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key))); + } + else + { + printf("\tDatabase was not shutdown cleanly, recomputing metrics\n"); + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + ++count; + } } return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr, count); } diff --git a/src/version.h b/src/version.h index db32931f1..793078590 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,28 @@ #define KEYDB_REAL_VERSION "0.0.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config +enum VersionCompareResult +{ + EqualVerison, + OlderVersion, + NewerVersion, +}; + +struct SymVer +{ + long major; + long minor; + long build; +}; + +#ifdef __cplusplus +extern "C" +{ +#endif + +struct SymVer parseVersion(const char *version); +enum VersionCompareResult compareVersion(struct SymVer *pver); + +#ifdef __cplusplus +} +#endif \ No newline at end of file