Cache count of keys in database

Former-commit-id: 7cd266030ad82b5fddd4668d666adc95e1bed475
This commit is contained in:
John Sully 2020-03-17 17:11:41 -04:00
parent 9edd2d2ae4
commit ebe0b918bc
5 changed files with 145 additions and 5 deletions

View File

@ -1646,6 +1646,62 @@ int clientsCronResizeQueryBuffer(client *c) {
return 0;
}
SymVer parseVersion(const char *version)
{
SymVer ver = {-1,-1,-1};
long versions[3] = {-1,-1,-1};
const char *start = version;
const char *end = nullptr;
for (int iver = 0; iver < 3; ++iver)
{
end = start;
while (*end != '\0' && *end != '.')
++end;
if (start >= end)
return ver;
if (!string2l(start, end - start, versions + iver))
return ver;
if (*end != '\0')
start = end+1;
else
break;
}
ver.major = versions[0];
ver.minor = versions[1];
ver.build = versions[2];
return ver;
}
VersionCompareResult compareVersion(SymVer *pver)
{
SymVer symVerThis = parseVersion(KEYDB_REAL_VERSION);
for (int iver = 0; iver < 3; ++iver)
{
long verThis, verOther;
switch (iver)
{
case 0:
verThis = symVerThis.major; verOther = pver->major;
break;
case 1:
verThis = symVerThis.minor; verOther = pver->minor;
break;
case 2:
verThis = symVerThis.build; verOther = pver->build;
}
if (verThis < verOther)
return VersionCompareResult::NewerVersion;
if (verThis > verOther)
return VersionCompareResult::OlderVersion;
}
return VersionCompareResult::EqualVerison;
}
/* This function is used in order to track clients using the biggest amount
* of memory in the latest few seconds. This way we can provide such information
* in the INFO output (clients section), without having to do an O(N) scan for
@ -5397,6 +5453,13 @@ int main(int argc, char **argv) {
std::set_terminate(OnTerminate);
{
SymVer version;
version = parseVersion(KEYDB_REAL_VERSION);
serverAssert(version.major >= 0 && version.minor >= 0 && version.build >= 0);
serverAssert(compareVersion(&version) == VersionCompareResult::EqualVerison);
}
#ifdef USE_MEMKIND
storage_init(NULL, 0);
#endif

View File

@ -99,6 +99,15 @@ const IStorage *RocksDBStorageProvider::clone() const
RocksDBStorageProvider::~RocksDBStorageProvider()
{
if (m_spbatch != nullptr)
endWriteBatch();
if (m_spdb != nullptr && m_psnapshot == nullptr)
{
insert(count_key, sizeof(count_key), &m_count, sizeof(m_count), false);
flush();
}
if (m_spdb != nullptr)
{
if (m_psnapshot != nullptr)

View File

@ -5,6 +5,9 @@
#include <rocksdb/db.h>
#include "../fastlock.h"
static const char count_key[] = "\0__keydb__count\1";
static const char version_key[] = "\0__keydb__version\1";
class RocksDBStorageProvider : public IStorage
{
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last

View File

@ -1,4 +1,5 @@
#include "rocksdb.h"
#include "../version.h"
#include <rocksdb/filter_policy.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/options_util.h>
@ -14,6 +15,9 @@ public:
virtual IStorage *create(int db) override;
virtual const char *name() const override;
private:
void setVersion(rocksdb::ColumnFamilyHandle*);
};
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum)
@ -64,9 +68,26 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum)
if (!status.ok())
throw status.ToString();
for (auto handle : handles)
m_vecspcols.emplace_back(handle);
m_spdb = std::shared_ptr<rocksdb::DB>(db);
for (auto handle : handles)
{
std::string strVersion;
auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion);
if (!status.ok())
{
setVersion(handle);
}
else
{
SymVer ver = parseVersion(strVersion.c_str());
auto cmp = compareVersion(&ver);
if (cmp == NewerVersion)
throw "Cannot load FLASH database created by newer version of KeyDB";
if (cmp == OlderVersion)
setVersion(handle);
}
m_vecspcols.emplace_back(handle);
}
}
RocksDBStorageFactory::~RocksDBStorageFactory()
@ -74,14 +95,33 @@ RocksDBStorageFactory::~RocksDBStorageFactory()
m_spdb->SyncWAL();
}
void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle)
{
auto status = m_spdb->Put(rocksdb::WriteOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), rocksdb::Slice(KEYDB_REAL_VERSION, strlen(KEYDB_REAL_VERSION)+1));
if (!status.ok())
throw status.ToString();
}
IStorage *RocksDBStorageFactory::create(int db)
{
++db; // skip default col family
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
size_t count = 0;
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
++count;
std::string value;
auto status = m_spdb->Get(rocksdb::ReadOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key)), &value);
if (status.ok() && value.size() == sizeof(size_t))
{
count = *reinterpret_cast<const size_t*>(value.data());
m_spdb->Delete(rocksdb::WriteOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key)));
}
else
{
printf("\tDatabase was not shutdown cleanly, recomputing metrics\n");
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
++count;
}
}
return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr, count);
}

View File

@ -1,3 +1,28 @@
#define KEYDB_REAL_VERSION "0.0.0"
extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config
enum VersionCompareResult
{
EqualVerison,
OlderVersion,
NewerVersion,
};
struct SymVer
{
long major;
long minor;
long build;
};
#ifdef __cplusplus
extern "C"
{
#endif
struct SymVer parseVersion(const char *version);
enum VersionCompareResult compareVersion(struct SymVer *pver);
#ifdef __cplusplus
}
#endif