From b752ad4ceb4d27d75fc9ae2ab4acece77d854fae Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 28 Apr 2020 20:48:46 -0400 Subject: [PATCH] Fix failure to count keys in cluster slots when reloading a FLASH database Former-commit-id: f6dd863e51f91620f184ff80f08cfe518d29c87f --- src/IStorage.h | 4 +++- src/cluster.cpp | 2 +- src/db.cpp | 27 ++++++++++++++++++++++----- src/server.cpp | 6 ++++++ src/server.h | 4 ++-- src/storage/rocksdbfactory.cpp | 20 ++++++++++++++++---- src/storage/teststorageprovider.cpp | 2 +- src/storage/teststorageprovider.h | 2 +- 8 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 016960350..3ac6d9c59 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -4,8 +4,10 @@ class IStorageFactory { public: + typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey); + virtual ~IStorageFactory() {} - virtual class IStorage *create(int db) = 0; + virtual class IStorage *create(int db, key_load_iterator itr) = 0; virtual const char *name() const = 0; }; diff --git a/src/cluster.cpp b/src/cluster.cpp index 21edb3920..8709ccf95 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -739,7 +739,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * However if the key contains the {...} pattern, only the part between * { and } is hashed. This may be useful in the future to force certain * keys to be in the same node (assuming no resharding is in progress). */ -unsigned int keyHashSlot(char *key, int keylen) { +unsigned int keyHashSlot(const char *key, int keylen) { int s, e; /* start-end indexes of { and } */ for (s = 0; s < keylen; s++) diff --git a/src/db.cpp b/src/db.cpp index b909bf310..87fcb6210 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2053,11 +2053,11 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) * a fast way a key that belongs to a specified hash slot. This is useful * while rehashing the cluster and in other conditions when we need to * understand if we have keys for a given hash slot. */ -void slotToKeyUpdateKey(robj *key, int add) { +void slotToKeyUpdateKeyCore(const char *rgch, size_t keylen, int add) +{ serverAssert(GlobalLocksAcquired()); - size_t keylen = sdslen(szFromObj(key)); - unsigned int hashslot = keyHashSlot(szFromObj(key),keylen); + unsigned int hashslot = keyHashSlot(rgch,(int)keylen); unsigned char buf[64]; unsigned char *indexed = buf; @@ -2065,7 +2065,7 @@ void slotToKeyUpdateKey(robj *key, int add) { if (keylen+2 > 64) indexed = (unsigned char*)zmalloc(keylen+2, MALLOC_SHARED); indexed[0] = (hashslot >> 8) & 0xff; indexed[1] = hashslot & 0xff; - memcpy(indexed+2,ptrFromObj(key),keylen); + memcpy(indexed+2,rgch,keylen); int fModified = false; if (add) { fModified = raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL); @@ -2076,6 +2076,11 @@ void slotToKeyUpdateKey(robj *key, int add) { if (indexed != buf) zfree(indexed); } +void slotToKeyUpdateKey(robj *key, int add) { + size_t keylen = sdslen(szFromObj(key)); + slotToKeyUpdateKeyCore(szFromObj(key), keylen, add); +} + void slotToKeyAdd(robj *key) { slotToKeyUpdateKey(key,1); } @@ -2160,6 +2165,11 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage) m_spstorage = std::unique_ptr(pstorage); } +void clusterStorageLoadCallback(const char *rgchkey, size_t cch) +{ + slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/); +} + void redisDb::initialize(int id) { redisDbPersistentData::initialize(); @@ -2172,8 +2182,15 @@ void redisDb::initialize(int id) this->last_expire_set = 0; this->defrag_later = listCreate(); listSetFreeMethod(this->defrag_later,(void (*)(const void*))sdsfree); +} + +void redisDb::storageProviderInitialize() +{ if (g_pserver->m_pstorageFactory != nullptr) - this->setStorageProvider(g_pserver->m_pstorageFactory->create(id)); + { + IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr; + this->setStorageProvider(g_pserver->m_pstorageFactory->create(id, itr)); + } } bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew) diff --git a/src/server.cpp b/src/server.cpp index de049077f..35b5e3531 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3278,6 +3278,12 @@ void initServer(void) { scriptingInit(1); slowlogInit(); latencyMonitorInit(); + + /* We have to initialize storage providers after the cluster has been initialized */ + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->storageProviderInitialize(); + } } /* Some steps in server initialization need to be done last (after modules diff --git a/src/server.h b/src/server.h index 0c7d0f63b..111017e19 100644 --- a/src/server.h +++ b/src/server.h @@ -1434,11 +1434,11 @@ struct redisDb : public redisDbPersistentDataSnapshot {} void initialize(int id); + void storageProviderInitialize(); virtual ~redisDb(); void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); - bool FKeyExpires(const char *key); size_t clear(bool fAsync, void(callback)(void*)); @@ -3102,7 +3102,7 @@ int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys /* Cluster */ void clusterInit(void); extern "C" unsigned short crc16(const char *buf, int len); -unsigned int keyHashSlot(char *key, int keylen); +unsigned int keyHashSlot(const char *key, int keylen); void clusterCron(void); void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index ca4d86c23..803c1e783 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -13,7 +13,7 @@ public: RocksDBStorageFactory(const char *dbfile, int dbnum); ~RocksDBStorageFactory(); - virtual IStorage *create(int db) override; + virtual IStorage *create(int db, key_load_iterator iter) override; virtual const char *name() const override; private: @@ -102,11 +102,12 @@ void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle) throw status.ToString(); } -IStorage *RocksDBStorageFactory::create(int db) +IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter) { ++db; // skip default col family std::shared_ptr spcolfamily(m_vecspcols[db].release()); size_t count = 0; + bool fUnclean = false; std::string value; auto status = m_spdb->Get(rocksdb::ReadOptions(), spcolfamily.get(), rocksdb::Slice(count_key, sizeof(count_key)), &value); @@ -117,11 +118,22 @@ IStorage *RocksDBStorageFactory::create(int db) } else { - printf("\tDatabase was not shutdown cleanly, recomputing metrics\n"); - std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get())); + fUnclean = true; + } + + if (fUnclean || iter != nullptr) + { + count = 0; + if (fUnclean) + printf("\tDatabase was not shutdown cleanly, recomputing metrics\n"); + auto opts = rocksdb::ReadOptions(); + opts.tailing = true; + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(opts, spcolfamily.get())); for (it->SeekToFirst(); it->Valid(); it->Next()) { if (FInternalKey(it->key().data(), it->key().size())) continue; + if (iter != nullptr) + iter(it->key().data(), it->key().size()); ++count; } } diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index c0f0b53e8..6995a8b10 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -1,7 +1,7 @@ #include "teststorageprovider.h" #include "../server.h" -IStorage *TestStorageFactory::create(int) +IStorage *TestStorageFactory::create(int, key_load_iterator) { return new (MALLOC_LOCAL) TestStorageProvider(); } diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 876571444..c39095acf 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -4,7 +4,7 @@ class TestStorageFactory : public IStorageFactory { - virtual class IStorage *create(int db) override; + virtual class IStorage *create(int db, key_load_iterator itr) override; virtual const char *name() const override; };