Flash slot to key (#653)
* add hash prefixing to rocksdb * don't need to offset internal keys * remove hashslot prefix when loading from rocksdb * don't prefix internal keys * implement slot to keys with rocksdb * fix compile * add hashslot enum to test storage provider * add --flash option for tests * forgot return statement in getKeysInSlot * check for count mismatch * forgot ; * add assert to ensure correctness * fix warnings * add option to show logs of 1 server at a time * add fixed length prefix extractor * add machamp * switch machamp to main branch * add machamp script
This commit is contained in:
parent
16e8469316
commit
3336c4f43e
53
build.yaml
Normal file
53
build.yaml
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
# Doc: https://wiki.sc-corp.net/pages/viewpage.action?pageId=121500284
|
||||||
|
version: 1
|
||||||
|
machamp:
|
||||||
|
keydb-build:
|
||||||
|
# Optional - build counter is linked to the build def
|
||||||
|
tag_template: 0.0.%build.counter%
|
||||||
|
# Optional - value in seconds before a build is terminated, default is 3600 seconds
|
||||||
|
timeout: 3600
|
||||||
|
# Optional - update ghe or not, default to true
|
||||||
|
update_ghe: true
|
||||||
|
code_coverage: false
|
||||||
|
# Required
|
||||||
|
steps:
|
||||||
|
make-build:
|
||||||
|
type: cmd
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./machamp_scripts/build.sh
|
||||||
|
tls-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./runtest --clients 4 --verbose
|
||||||
|
cluster-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./runtest-cluster
|
||||||
|
sentinel-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./runtest-sentinel
|
||||||
|
module-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./runtest-moduleapi
|
||||||
|
rotation-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
|
command: ./runtest-rotation
|
||||||
|
keyproxy-test:
|
||||||
|
type: cmd
|
||||||
|
parent: make-build
|
||||||
|
command: ./runtest-keyproxy --tls
|
||||||
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
16
ci.yaml
Normal file
16
ci.yaml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
# Doc: https://wiki.sc-corp.net/display/TOOL/ci.yaml+User+Guide
|
||||||
|
version: 1
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
- workflows:
|
||||||
|
# All builds that use machamp should use the defined `backend_workflow`
|
||||||
|
- workflow_type: backend_workflow
|
||||||
|
# references a build defined in build.yaml
|
||||||
|
build_name: keydb-build
|
||||||
|
arch_types: ["amd64", "arm64"]
|
||||||
|
push:
|
||||||
|
- branches: [main]
|
||||||
|
workflows:
|
||||||
|
- workflow_type: backend_workflow
|
||||||
|
build_name: keydb-build
|
||||||
|
arch_types: ["amd64", "arm64"]
|
8
machamp_scripts/build.sh
Executable file
8
machamp_scripts/build.sh
Executable file
@ -0,0 +1,8 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# make the build
|
||||||
|
git submodule init && git submodule update
|
||||||
|
make BUILD_TLS=yes -j$(nproc) KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror'
|
||||||
|
|
||||||
|
# gen-cert
|
||||||
|
./utils/gen-test-certs.sh
|
@ -31,6 +31,7 @@ public:
|
|||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
|
||||||
virtual size_t clear() = 0;
|
virtual size_t clear() = 0;
|
||||||
virtual bool enumerate(callback fn) const = 0;
|
virtual bool enumerate(callback fn) const = 0;
|
||||||
|
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const = 0;
|
||||||
virtual size_t count() const = 0;
|
virtual size_t count() const = 0;
|
||||||
|
|
||||||
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
|
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
|
||||||
|
@ -49,6 +49,7 @@ public:
|
|||||||
void expand(uint64_t slots);
|
void expand(uint64_t slots);
|
||||||
|
|
||||||
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
|
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
|
||||||
|
bool enumerate_hashslot(IStorage::callback fn, unsigned int hashslot) const { return m_spstorage->enumerate_hashslot(fn, hashslot); }
|
||||||
|
|
||||||
void beginWriteBatch();
|
void beginWriteBatch();
|
||||||
void endWriteBatch() { m_spstorage->endWriteBatch(); }
|
void endWriteBatch() { m_spstorage->endWriteBatch(); }
|
||||||
|
27
src/db.cpp
27
src/db.cpp
@ -2490,10 +2490,12 @@ void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add) {
|
|||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
unsigned int hashslot = keyHashSlot(key,keylen);
|
unsigned int hashslot = keyHashSlot(key,keylen);
|
||||||
|
g_pserver->cluster->slots_keys_count[hashslot] += add ? 1 : -1;
|
||||||
|
|
||||||
|
if (g_pserver->m_pstorageFactory == nullptr) {
|
||||||
unsigned char buf[64];
|
unsigned char buf[64];
|
||||||
unsigned char *indexed = buf;
|
unsigned char *indexed = buf;
|
||||||
|
|
||||||
g_pserver->cluster->slots_keys_count[hashslot] += add ? 1 : -1;
|
|
||||||
if (keylen+2 > 64) indexed = (unsigned char*)zmalloc(keylen+2, MALLOC_SHARED);
|
if (keylen+2 > 64) indexed = (unsigned char*)zmalloc(keylen+2, MALLOC_SHARED);
|
||||||
indexed[0] = (hashslot >> 8) & 0xff;
|
indexed[0] = (hashslot >> 8) & 0xff;
|
||||||
indexed[1] = hashslot & 0xff;
|
indexed[1] = hashslot & 0xff;
|
||||||
@ -2509,6 +2511,7 @@ void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add) {
|
|||||||
// the key.
|
// the key.
|
||||||
serverAssert(fModified || g_pserver->db[0]->snapshot_depth() > 0);
|
serverAssert(fModified || g_pserver->db[0]->snapshot_depth() > 0);
|
||||||
if (indexed != buf) zfree(indexed);
|
if (indexed != buf) zfree(indexed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void slotToKeyAdd(sds key) {
|
void slotToKeyAdd(sds key) {
|
||||||
@ -2544,6 +2547,14 @@ void slotToKeyFlush(int async) {
|
|||||||
* New objects are returned to represent keys, it's up to the caller to
|
* New objects are returned to represent keys, it's up to the caller to
|
||||||
* decrement the reference count to release the keys names. */
|
* decrement the reference count to release the keys names. */
|
||||||
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
|
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
|
||||||
|
if (g_pserver->m_pstorageFactory != nullptr) {
|
||||||
|
int j = 0;
|
||||||
|
g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool {
|
||||||
|
keys[j++] = createStringObject(key, cchKey);
|
||||||
|
return --count;
|
||||||
|
}, hashslot);
|
||||||
|
return j;
|
||||||
|
} else {
|
||||||
raxIterator iter;
|
raxIterator iter;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
unsigned char indexed[2];
|
unsigned char indexed[2];
|
||||||
@ -2558,13 +2569,24 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
|
|||||||
}
|
}
|
||||||
raxStop(&iter);
|
raxStop(&iter);
|
||||||
return j;
|
return j;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove all the keys in the specified hash slot.
|
/* Remove all the keys in the specified hash slot.
|
||||||
* The number of removed items is returned. */
|
* The number of removed items is returned. */
|
||||||
unsigned int delKeysInSlot(unsigned int hashslot) {
|
unsigned int delKeysInSlot(unsigned int hashslot) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
if (g_pserver->m_pstorageFactory != nullptr) {
|
||||||
|
int j = 0;
|
||||||
|
g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool {
|
||||||
|
robj *keyobj = createStringObject(key, cchKey);
|
||||||
|
dbDelete(g_pserver->db[0], keyobj);
|
||||||
|
decrRefCount(keyobj);
|
||||||
|
j++;
|
||||||
|
return true;
|
||||||
|
}, hashslot);
|
||||||
|
return j;
|
||||||
|
} else {
|
||||||
raxIterator iter;
|
raxIterator iter;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
unsigned char indexed[2];
|
unsigned char indexed[2];
|
||||||
@ -2585,6 +2607,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
|
|||||||
}
|
}
|
||||||
raxStop(&iter);
|
raxStop(&iter);
|
||||||
return j;
|
return j;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned int countKeysInSlot(unsigned int hashslot) {
|
unsigned int countKeysInSlot(unsigned int hashslot) {
|
||||||
|
@ -1209,6 +1209,7 @@ public:
|
|||||||
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
|
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
|
||||||
|
std::shared_ptr<StorageCache> getStorageCache() { return m_spstorage; }
|
||||||
void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
|
void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
|
||||||
|
|
||||||
dict_iter find_cached_threadsafe(const char *key) const;
|
dict_iter find_cached_threadsafe(const char *key) const;
|
||||||
@ -1370,6 +1371,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::FRehashing;
|
using redisDbPersistentData::FRehashing;
|
||||||
using redisDbPersistentData::FTrackingChanges;
|
using redisDbPersistentData::FTrackingChanges;
|
||||||
using redisDbPersistentData::CloneStorageCache;
|
using redisDbPersistentData::CloneStorageCache;
|
||||||
|
using redisDbPersistentData::getStorageCache;
|
||||||
using redisDbPersistentData::bulkDirectStorageInsert;
|
using redisDbPersistentData::bulkDirectStorageInsert;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
@ -3,6 +3,8 @@
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include "../server.h"
|
||||||
|
#include "../cluster.h"
|
||||||
#include "rocksdbfactor_internal.h"
|
#include "rocksdbfactor_internal.h"
|
||||||
|
|
||||||
static const char keyprefix[] = INTERNAL_KEY_PREFIX;
|
static const char keyprefix[] = INTERNAL_KEY_PREFIX;
|
||||||
@ -20,6 +22,17 @@ bool FInternalKey(const char *key, size_t cch)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string getPrefix(unsigned int hashslot)
|
||||||
|
{
|
||||||
|
char *hash_char = (char *)&hashslot;
|
||||||
|
return std::string(hash_char + (sizeof(unsigned int) - 2), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string prefixKey(const char *key, size_t cchKey)
|
||||||
|
{
|
||||||
|
return FInternalKey(key, cchKey) ? std::string(key, cchKey) : getPrefix(keyHashSlot(key, cchKey)) + std::string(key, cchKey);
|
||||||
|
}
|
||||||
|
|
||||||
RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
|
RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
|
||||||
: m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
|
: m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
|
||||||
{
|
{
|
||||||
@ -32,10 +45,11 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
|
|||||||
{
|
{
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
std::unique_lock<fastlock> l(m_lock);
|
std::unique_lock<fastlock> l(m_lock);
|
||||||
|
std::string prefixed_key = prefixKey(key, cchKey);
|
||||||
if (m_spbatch != nullptr)
|
if (m_spbatch != nullptr)
|
||||||
status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(prefixed_key), rocksdb::Slice((const char*)data, cb));
|
||||||
else
|
else
|
||||||
status = m_spdb->Put(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
status = m_spdb->Put(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixed_key), rocksdb::Slice((const char*)data, cb));
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
throw status.ToString();
|
throw status.ToString();
|
||||||
|
|
||||||
@ -59,7 +73,8 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **
|
|||||||
// Insert rows into the SST file, note that inserted keys must be
|
// Insert rows into the SST file, note that inserted keys must be
|
||||||
// strictly increasing (based on options.comparator)
|
// strictly increasing (based on options.comparator)
|
||||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||||
s = sst_file_writer.Put(rocksdb::Slice(rgkeys[ielem], rgcbkeys[ielem]), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
|
std::string prefixed_key = prefixKey(rgkeys[ielem], rgcbkeys[ielem]);
|
||||||
|
s = sst_file_writer.Put(rocksdb::Slice(prefixed_key), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
unlink(file_path.c_str());
|
unlink(file_path.c_str());
|
||||||
goto LFallback;
|
goto LFallback;
|
||||||
@ -87,7 +102,8 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **
|
|||||||
LFallback:
|
LFallback:
|
||||||
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
|
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
|
||||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||||
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], rgcbkeys[ielem]), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
|
std::string prefixed_key = prefixKey(rgkeys[ielem], rgcbkeys[ielem]);
|
||||||
|
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(prefixed_key), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
|
||||||
}
|
}
|
||||||
m_spdb->Write(WriteOptions(), spbatch.get());
|
m_spdb->Write(WriteOptions(), spbatch.get());
|
||||||
}
|
}
|
||||||
@ -100,15 +116,16 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
|||||||
{
|
{
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
std::unique_lock<fastlock> l(m_lock);
|
std::unique_lock<fastlock> l(m_lock);
|
||||||
if (!FKeyExists(key, cchKey))
|
std::string prefixed_key = prefixKey(key, cchKey);
|
||||||
|
if (!FKeyExists(prefixed_key))
|
||||||
return false;
|
return false;
|
||||||
if (m_spbatch != nullptr)
|
if (m_spbatch != nullptr)
|
||||||
{
|
{
|
||||||
status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
|
status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(prefixed_key));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
|
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixed_key));
|
||||||
}
|
}
|
||||||
if (status.ok())
|
if (status.ok())
|
||||||
--m_count;
|
--m_count;
|
||||||
@ -118,7 +135,8 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
|||||||
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const
|
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const
|
||||||
{
|
{
|
||||||
rocksdb::PinnableSlice slice;
|
rocksdb::PinnableSlice slice;
|
||||||
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &slice);
|
std::string prefixed_key = prefixKey(key, cchKey);
|
||||||
|
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixed_key), &slice);
|
||||||
if (status.ok())
|
if (status.ok())
|
||||||
fn(key, cchKey, slice.data(), slice.size());
|
fn(key, cchKey, slice.data(), slice.size());
|
||||||
}
|
}
|
||||||
@ -154,7 +172,7 @@ bool RocksDBStorageProvider::enumerate(callback fn) const
|
|||||||
if (FInternalKey(it->key().data(), it->key().size()))
|
if (FInternalKey(it->key().data(), it->key().size()))
|
||||||
continue;
|
continue;
|
||||||
++count;
|
++count;
|
||||||
bool fContinue = fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
|
bool fContinue = fn(it->key().data()+2, it->key().size()-2, it->value().data(), it->value().size());
|
||||||
if (!fContinue)
|
if (!fContinue)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -168,6 +186,31 @@ bool RocksDBStorageProvider::enumerate(callback fn) const
|
|||||||
return !it->Valid();
|
return !it->Valid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RocksDBStorageProvider::enumerate_hashslot(callback fn, unsigned int hashslot) const
|
||||||
|
{
|
||||||
|
std::string prefix = getPrefix(hashslot);
|
||||||
|
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
|
||||||
|
size_t count = 0;
|
||||||
|
for (it->Seek(prefix.c_str()); it->Valid(); it->Next()) {
|
||||||
|
if (FInternalKey(it->key().data(), it->key().size()))
|
||||||
|
continue;
|
||||||
|
if (strncmp(it->key().data(),prefix.c_str(),2) != 0)
|
||||||
|
break;
|
||||||
|
++count;
|
||||||
|
bool fContinue = fn(it->key().data()+2, it->key().size()-2, it->value().data(), it->value().size());
|
||||||
|
if (!fContinue)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
bool full_iter = !it->Valid() || (strncmp(it->key().data(),prefix.c_str(),2) != 0);
|
||||||
|
if (full_iter && count != g_pserver->cluster->slots_keys_count[hashslot])
|
||||||
|
{
|
||||||
|
printf("WARNING: rocksdb hashslot count mismatch");
|
||||||
|
}
|
||||||
|
assert(!full_iter || count == g_pserver->cluster->slots_keys_count[hashslot]);
|
||||||
|
assert(it->status().ok()); // Check for any errors found during the scan
|
||||||
|
return full_iter;
|
||||||
|
}
|
||||||
|
|
||||||
const IStorage *RocksDBStorageProvider::clone() const
|
const IStorage *RocksDBStorageProvider::clone() const
|
||||||
{
|
{
|
||||||
std::unique_lock<fastlock> l(m_lock);
|
std::unique_lock<fastlock> l(m_lock);
|
||||||
@ -227,10 +270,10 @@ void RocksDBStorageProvider::flush()
|
|||||||
m_spdb->SyncWAL();
|
m_spdb->SyncWAL();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const
|
bool RocksDBStorageProvider::FKeyExists(std::string& key) const
|
||||||
{
|
{
|
||||||
rocksdb::PinnableSlice slice;
|
rocksdb::PinnableSlice slice;
|
||||||
if (m_spbatch)
|
if (m_spbatch)
|
||||||
return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok();
|
return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||||
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok();
|
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key), &slice).ok();
|
||||||
}
|
}
|
@ -32,6 +32,7 @@ public:
|
|||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
||||||
virtual size_t clear() override;
|
virtual size_t clear() override;
|
||||||
virtual bool enumerate(callback fn) const override;
|
virtual bool enumerate(callback fn) const override;
|
||||||
|
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override;
|
||||||
|
|
||||||
virtual const IStorage *clone() const override;
|
virtual const IStorage *clone() const override;
|
||||||
|
|
||||||
@ -48,7 +49,7 @@ public:
|
|||||||
size_t count() const override;
|
size_t count() const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool FKeyExists(const char *key, size_t cchKey) const;
|
bool FKeyExists(std::string&) const;
|
||||||
|
|
||||||
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
|
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
|
||||||
rocksdb::WriteOptions WriteOptions() const;
|
rocksdb::WriteOptions WriteOptions() const;
|
||||||
|
@ -71,6 +71,7 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
|
|||||||
rocksdb::DB *db = nullptr;
|
rocksdb::DB *db = nullptr;
|
||||||
|
|
||||||
auto options = RocksDbOptions();
|
auto options = RocksDbOptions();
|
||||||
|
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(2));
|
||||||
|
|
||||||
for (int idb = 0; idb < dbnum; ++idb)
|
for (int idb = 0; idb < dbnum; ++idb)
|
||||||
{
|
{
|
||||||
@ -186,7 +187,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr
|
|||||||
printf("\tDatabase %d was not shutdown cleanly, recomputing metrics\n", db);
|
printf("\tDatabase %d was not shutdown cleanly, recomputing metrics\n", db);
|
||||||
fFirstRealKey = false;
|
fFirstRealKey = false;
|
||||||
if (iter != nullptr)
|
if (iter != nullptr)
|
||||||
iter(it->key().data(), it->key().size(), privdata);
|
iter(it->key().data()+2, it->key().size()-2, privdata);
|
||||||
++count;
|
++count;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,21 @@ bool TestStorageProvider::enumerate(callback fn) const
|
|||||||
return fAll;
|
return fAll;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool TestStorageProvider::enumerate_hashslot(callback fn, unsigned int hashslot) const
|
||||||
|
{
|
||||||
|
bool fAll = true;
|
||||||
|
for (auto &pair : m_map)
|
||||||
|
{
|
||||||
|
if (keyHashSlot(pair.first.data(), pair.first.size()) == hashslot)
|
||||||
|
if (!fn(pair.first.data(), pair.first.size(), pair.second.data(), pair.second.size()))
|
||||||
|
{
|
||||||
|
fAll = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fAll;
|
||||||
|
}
|
||||||
|
|
||||||
size_t TestStorageProvider::count() const
|
size_t TestStorageProvider::count() const
|
||||||
{
|
{
|
||||||
return m_map.size();
|
return m_map.size();
|
||||||
|
@ -24,6 +24,7 @@ public:
|
|||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
||||||
virtual size_t clear() override;
|
virtual size_t clear() override;
|
||||||
virtual bool enumerate(callback fn) const override;
|
virtual bool enumerate(callback fn) const override;
|
||||||
|
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const override;
|
||||||
virtual size_t count() const override;
|
virtual size_t count() const override;
|
||||||
|
|
||||||
virtual void flush() override;
|
virtual void flush() override;
|
||||||
|
@ -21,6 +21,7 @@ set ::tls 0
|
|||||||
set ::pause_on_error 0
|
set ::pause_on_error 0
|
||||||
set ::dont_clean 0
|
set ::dont_clean 0
|
||||||
set ::simulate_error 0
|
set ::simulate_error 0
|
||||||
|
set ::flash 0
|
||||||
set ::failed 0
|
set ::failed 0
|
||||||
set ::sentinel_instances {}
|
set ::sentinel_instances {}
|
||||||
set ::redis_instances {}
|
set ::redis_instances {}
|
||||||
@ -81,6 +82,10 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
|||||||
set cfg [open $cfgfile w]
|
set cfg [open $cfgfile w]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if {$::flash} {
|
||||||
|
puts $cfg "storage-provider flash ./flash_$base_port"
|
||||||
|
}
|
||||||
|
|
||||||
if {$::tls} {
|
if {$::tls} {
|
||||||
puts $cfg "tls-port $port"
|
puts $cfg "tls-port $port"
|
||||||
puts $cfg "tls-replication yes"
|
puts $cfg "tls-replication yes"
|
||||||
@ -266,6 +271,8 @@ proc parse_options {} {
|
|||||||
set val2 [lindex $::argv [expr $j+2]]
|
set val2 [lindex $::argv [expr $j+2]]
|
||||||
dict set ::global_config $val $val2
|
dict set ::global_config $val $val2
|
||||||
incr j 2
|
incr j 2
|
||||||
|
} elseif {$opt eq {--flash}} {
|
||||||
|
set ::flash 1
|
||||||
} elseif {$opt eq "--help"} {
|
} elseif {$opt eq "--help"} {
|
||||||
puts "--single <pattern> Only runs tests specified by pattern."
|
puts "--single <pattern> Only runs tests specified by pattern."
|
||||||
puts "--dont-clean Keep log files on exit."
|
puts "--dont-clean Keep log files on exit."
|
||||||
@ -275,6 +282,7 @@ proc parse_options {} {
|
|||||||
puts "--tls Run tests in TLS mode."
|
puts "--tls Run tests in TLS mode."
|
||||||
puts "--host <host> Use hostname instead of 127.0.0.1."
|
puts "--host <host> Use hostname instead of 127.0.0.1."
|
||||||
puts "--config <k> <v> Extra config argument(s)."
|
puts "--config <k> <v> Extra config argument(s)."
|
||||||
|
puts "--flash Run the whole suite with flash enabled"
|
||||||
puts "--help Shows this help."
|
puts "--help Shows this help."
|
||||||
exit 0
|
exit 0
|
||||||
} else {
|
} else {
|
||||||
@ -301,12 +309,16 @@ proc pause_on_error {} {
|
|||||||
break
|
break
|
||||||
} elseif {$cmd eq {show-keydb-logs}} {
|
} elseif {$cmd eq {show-keydb-logs}} {
|
||||||
set count 10
|
set count 10
|
||||||
|
set instance {}
|
||||||
if {[lindex $argv 1] ne {}} {set count [lindex $argv 1]}
|
if {[lindex $argv 1] ne {}} {set count [lindex $argv 1]}
|
||||||
|
if {[lindex $argv 2] ne {}} {set instance [lindex $argv 2]}
|
||||||
foreach_redis_id id {
|
foreach_redis_id id {
|
||||||
|
if {$instance eq $id || $instance eq {}} {
|
||||||
puts "=== KeyDB $id ===="
|
puts "=== KeyDB $id ===="
|
||||||
puts [exec tail -$count redis_$id/log.txt]
|
puts [exec tail -$count redis_$id/log.txt]
|
||||||
puts "---------------------\n"
|
puts "---------------------\n"
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} elseif {$cmd eq {show-sentinel-logs}} {
|
} elseif {$cmd eq {show-sentinel-logs}} {
|
||||||
set count 10
|
set count 10
|
||||||
if {[lindex $argv 1] ne {}} {set count [lindex $argv 1]}
|
if {[lindex $argv 1] ne {}} {set count [lindex $argv 1]}
|
||||||
@ -350,7 +362,7 @@ proc pause_on_error {} {
|
|||||||
} elseif {$cmd eq {help}} {
|
} elseif {$cmd eq {help}} {
|
||||||
puts "ls List Sentinel and KeyDB instances."
|
puts "ls List Sentinel and KeyDB instances."
|
||||||
puts "show-sentinel-logs \[N\] Show latest N lines of logs."
|
puts "show-sentinel-logs \[N\] Show latest N lines of logs."
|
||||||
puts "show-keydb-logs \[N\] Show latest N lines of logs."
|
puts "show-keydb-logs \[N\] \[id\] Show latest N lines of logs of server id."
|
||||||
puts "S <id> cmd ... arg Call command in Sentinel <id>."
|
puts "S <id> cmd ... arg Call command in Sentinel <id>."
|
||||||
puts "R <id> cmd ... arg Call command in KeyDB <id>."
|
puts "R <id> cmd ... arg Call command in KeyDB <id>."
|
||||||
puts "SI <id> <field> Show Sentinel <id> INFO <field>."
|
puts "SI <id> <field> Show Sentinel <id> INFO <field>."
|
||||||
|
Loading…
x
Reference in New Issue
Block a user