Fix failure to count keys in cluster slots when reloading a FLASH database

Former-commit-id: f6dd863e51f91620f184ff80f08cfe518d29c87f
This commit is contained in:
John Sully 2020-04-28 20:48:46 -04:00
parent 5bf79833c5
commit b752ad4ceb
8 changed files with 52 additions and 15 deletions

View File

@ -4,8 +4,10 @@
class IStorageFactory
{
public:
typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey);
virtual ~IStorageFactory() {}
virtual class IStorage *create(int db) = 0;
virtual class IStorage *create(int db, key_load_iterator itr) = 0;
virtual const char *name() const = 0;
};

View File

@ -739,7 +739,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
unsigned int keyHashSlot(const char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)

View File

@ -2053,11 +2053,11 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
void slotToKeyUpdateKey(robj *key, int add) {
void slotToKeyUpdateKeyCore(const char *rgch, size_t keylen, int add)
{
serverAssert(GlobalLocksAcquired());
size_t keylen = sdslen(szFromObj(key));
unsigned int hashslot = keyHashSlot(szFromObj(key),keylen);
unsigned int hashslot = keyHashSlot(rgch,(int)keylen);
unsigned char buf[64];
unsigned char *indexed = buf;
@ -2065,7 +2065,7 @@ void slotToKeyUpdateKey(robj *key, int add) {
if (keylen+2 > 64) indexed = (unsigned char*)zmalloc(keylen+2, MALLOC_SHARED);
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
memcpy(indexed+2,ptrFromObj(key),keylen);
memcpy(indexed+2,rgch,keylen);
int fModified = false;
if (add) {
fModified = raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
@ -2076,6 +2076,11 @@ void slotToKeyUpdateKey(robj *key, int add) {
if (indexed != buf) zfree(indexed);
}
void slotToKeyUpdateKey(robj *key, int add) {
size_t keylen = sdslen(szFromObj(key));
slotToKeyUpdateKeyCore(szFromObj(key), keylen, add);
}
void slotToKeyAdd(robj *key) {
slotToKeyUpdateKey(key,1);
}
@ -2160,6 +2165,11 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage)
m_spstorage = std::unique_ptr<IStorage>(pstorage);
}
void clusterStorageLoadCallback(const char *rgchkey, size_t cch)
{
slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/);
}
void redisDb::initialize(int id)
{
redisDbPersistentData::initialize();
@ -2172,8 +2182,15 @@ void redisDb::initialize(int id)
this->last_expire_set = 0;
this->defrag_later = listCreate();
listSetFreeMethod(this->defrag_later,(void (*)(const void*))sdsfree);
}
void redisDb::storageProviderInitialize()
{
if (g_pserver->m_pstorageFactory != nullptr)
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id));
{
IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr;
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id, itr));
}
}
bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)

View File

@ -3278,6 +3278,12 @@ void initServer(void) {
scriptingInit(1);
slowlogInit();
latencyMonitorInit();
/* We have to initialize storage providers after the cluster has been initialized */
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->storageProviderInitialize();
}
}
/* Some steps in server initialization need to be done last (after modules

View File

@ -1434,11 +1434,11 @@ struct redisDb : public redisDbPersistentDataSnapshot
{}
void initialize(int id);
void storageProviderInitialize();
virtual ~redisDb();
void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
bool FKeyExpires(const char *key);
size_t clear(bool fAsync, void(callback)(void*));
@ -3102,7 +3102,7 @@ int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys
/* Cluster */
void clusterInit(void);
extern "C" unsigned short crc16(const char *buf, int len);
unsigned int keyHashSlot(char *key, int keylen);
unsigned int keyHashSlot(const char *key, int keylen);
void clusterCron(void);
void clusterPropagatePublish(robj *channel, robj *message);
void migrateCloseTimedoutSockets(void);

View File

@ -13,7 +13,7 @@ public:
RocksDBStorageFactory(const char *dbfile, int dbnum);
~RocksDBStorageFactory();
virtual IStorage *create(int db) override;
virtual IStorage *create(int db, key_load_iterator iter) override;
virtual const char *name() const override;
private:
@ -102,11 +102,12 @@ void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle)
throw status.ToString();
}
IStorage *RocksDBStorageFactory::create(int db)
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter)
{
++db; // skip default col family
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
size_t count = 0;
bool fUnclean = false;
std::string value;
auto status = m_spdb->Get(rocksdb::ReadOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key)), &value);
@ -117,11 +118,22 @@ IStorage *RocksDBStorageFactory::create(int db)
}
else
{
printf("\tDatabase was not shutdown cleanly, recomputing metrics\n");
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get()));
fUnclean = true;
}
if (fUnclean || iter != nullptr)
{
count = 0;
if (fUnclean)
printf("\tDatabase was not shutdown cleanly, recomputing metrics\n");
auto opts = rocksdb::ReadOptions();
opts.tailing = true;
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(opts, spcolfamily.get()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (FInternalKey(it->key().data(), it->key().size()))
continue;
if (iter != nullptr)
iter(it->key().data(), it->key().size());
++count;
}
}

View File

@ -1,7 +1,7 @@
#include "teststorageprovider.h"
#include "../server.h"
IStorage *TestStorageFactory::create(int)
IStorage *TestStorageFactory::create(int, key_load_iterator)
{
return new (MALLOC_LOCAL) TestStorageProvider();
}

View File

@ -4,7 +4,7 @@
class TestStorageFactory : public IStorageFactory
{
virtual class IStorage *create(int db) override;
virtual class IStorage *create(int db, key_load_iterator itr) override;
virtual const char *name() const override;
};