From 551b30fb778b105391cbf91e68ab132a8e6ca51e Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 24 Nov 2019 17:53:06 -0500 Subject: [PATCH] Restrict the snapshot API Former-commit-id: cbafb88f267a9480cdbde6a88e9e1992e8a85971 --- src/aof.cpp | 2 +- src/db.cpp | 29 ++++++++++------- src/rdb-s3.cpp | 2 +- src/rdb.cpp | 24 +++++++------- src/rdb.h | 8 ++--- src/server.h | 85 ++++++++++++++++++++++++++++++++++++++------------ 6 files changed, 100 insertions(+), 50 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 85e535d3e..6fed4be3f 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1412,7 +1412,7 @@ int rewriteAppendOnlyFile(char *filename) { if (g_pserver->aof_use_rdb_preamble) { int error; - std::vector vecpdb; + std::vector vecpdb; for (int idb = 0; idb < cserver.dbnum; ++idb) { vecpdb.push_back(&g_pserver->db[idb]); diff --git a/src/db.cpp b/src/db.cpp index 0151073ed..2ee8445fe 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -669,7 +669,7 @@ bool redisDbPersistentData::iterate(std::function fn) return fResult; } -bool redisDbPersistentData::iterate_threadsafe(std::function fn) const +bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn) const { dictEntry *de = nullptr; bool fResult = true; @@ -707,7 +707,7 @@ bool redisDbPersistentData::iterate_threadsafe(std::functionargv[1]); - const redisDbPersistentData *snapshot = nullptr; + const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); if (snapshot != nullptr) @@ -2141,14 +2141,14 @@ void redisDbPersistentData::processChanges() m_setchanged.clear(); } -const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) +const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { serverAssert(GlobalLocksAcquired()); serverAssert(m_refCount == 0); // do not call this on a snapshot // First see if we have too many levels and can bail out of this to reduce load int levels = 1; - redisDbPersistentData *psnapshot = m_spdbSnapshotHOLDER.get(); + redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); while (psnapshot != nullptr) { ++levels; @@ -2166,7 +2166,7 @@ const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvcc } serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels); } - auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentData()); + auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; @@ -2201,24 +2201,24 @@ const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvcc return m_pdbSnapshot; } -void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentData *psnapshot) +void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot) { - std::vector m_stackSnapshots; + std::vector stackSnapshots; // gather a stack of snapshots, we do this so we can free them in reverse // Note: we don't touch the incoming psnapshot since the parent is free'ing that one while ((psnapshot = psnapshot->m_spdbSnapshotHOLDER.get()) != nullptr) { - m_stackSnapshots.push_back(psnapshot); + stackSnapshots.push_back(psnapshot); } - for (auto itr = m_stackSnapshots.rbegin(); itr != m_stackSnapshots.rend(); ++itr) + for (auto itr = stackSnapshots.rbegin(); itr != stackSnapshots.rend(); ++itr) { endSnapshot(*itr); } } -void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) +void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) { // Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where // a seperate thread holds the lock for it. Yes that's pretty crazy and should be fixed somehow... @@ -2362,7 +2362,7 @@ dict_iter redisDbPersistentData::random() return dict_iter(de); } -dict_iter redisDbPersistentData::random_threadsafe() const +dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const { if (size() == 0) return dict_iter(nullptr); @@ -2379,4 +2379,9 @@ dict_iter redisDbPersistentData::random_threadsafe() const serverAssert(dictSize(m_pdict) > 0); dictEntry *de = dictGetRandomKey(m_pdict); return dict_iter(de); +} + +size_t redisDbPersistentData::size() const +{ + return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); } \ No newline at end of file diff --git a/src/rdb-s3.cpp b/src/rdb-s3.cpp index 6590da03c..29fc1c0d5 100644 --- a/src/rdb-s3.cpp +++ b/src/rdb-s3.cpp @@ -6,7 +6,7 @@ extern "C" { #include /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSaveS3(char *s3bucket, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) +int rdbSaveS3(char *s3bucket, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi) { int status = EXIT_FAILURE; int fd[2]; diff --git a/src/rdb.cpp b/src/rdb.cpp index c4d728aca..f100c5843 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1115,7 +1115,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } -int saveKey(rio *rdb, const redisDbPersistentData *db, int flags, size_t *processed, const char *keystr, robj_roptr o) +int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o) { robj key; @@ -1145,7 +1145,7 @@ int saveKey(rio *rdb, const redisDbPersistentData *db, int flags, size_t *proces * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, int flags, rdbSaveInfo *rsi) { dictEntry *de; dictIterator *di = NULL; char magic[10]; @@ -1161,7 +1161,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int fl if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < cserver.dbnum; j++) { - const redisDbPersistentData *db = rgpdb[j]; + const redisDbPersistentDataSnapshot *db = rgpdb[j]; if (db->size() == 0) continue; /* Write the SELECT DB opcode */ @@ -1239,7 +1239,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(rio *rdb, const redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) { +int rdbSaveRioWithEOFMark(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); @@ -1257,7 +1257,7 @@ werr: /* Write error. */ return C_ERR; } -int rdbSaveFp(FILE *fp, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) +int rdbSaveFp(FILE *fp, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi) { int error = 0; rio rdb; @@ -1274,9 +1274,9 @@ int rdbSaveFp(FILE *fp, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) return C_OK; } -int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) +int rdbSave(const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi) { - std::vector vecdb; + std::vector vecdb; if (rgpdb == nullptr) { for (int idb = 0; idb < cserver.dbnum; ++idb) @@ -1296,7 +1296,7 @@ int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) } /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) { +int rdbSaveFile(char *filename, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; @@ -1360,7 +1360,7 @@ werr: struct rdbSaveThreadArgs { rdbSaveInfo rsi; - const redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length + const redisDbPersistentDataSnapshot *rgpdb[1]; // NOTE: Variable Length }; void *rdbSaveThread(void *vargs) @@ -1394,7 +1394,7 @@ void *rdbSaveThread(void *vargs) int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) { - rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentData*)), MALLOC_LOCAL); + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT; if (rsi == nullptr) rsi = &rsiT; @@ -2453,7 +2453,7 @@ struct rdbSaveSocketThreadArgs int *fds; int numfds; uint64_t *clientids; - const redisDbPersistentData *rgpdb[1]; + const redisDbPersistentDataSnapshot *rgpdb[1]; }; void *rdbSaveToSlavesSocketsThread(void *vargs) { @@ -2557,7 +2557,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { g_pserver->rdb_pipe_read_result_from_child = pipefds[0]; g_pserver->rdb_pipe_write_result_to_parent = pipefds[1]; - args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentData*)*(cserver.dbnum-1), MALLOC_LOCAL); + args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentDataSnapshot*)*(cserver.dbnum-1), MALLOC_LOCAL); /* Collect the file descriptors of the slaves we want to transfer * the RDB to, which are i WAIT_BGSAVE_START state. */ diff --git a/src/rdb.h b/src/rdb.h index 0ab373963..4249df2f1 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -140,10 +140,10 @@ int rdbLoadFile(const char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(int childpid); -int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); -int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); -int rdbSaveFp(FILE *pf, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); -int rdbSaveS3(char *path, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); +int rdbSave(const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi); +int rdbSaveFile(char *filename, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi); +int rdbSaveFp(FILE *pf, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi); +int rdbSaveS3(char *path, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj_roptr key); size_t rdbSavedObjectLen(robj *o); diff --git a/src/server.h b/src/server.h index 1f883e66d..d5bb4469b 100644 --- a/src/server.h +++ b/src/server.h @@ -1188,9 +1188,11 @@ public: operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; } }; +class redisDbPersistentDataSnapshot; class redisDbPersistentData { friend void dictDbKeyDestructor(void *privdata, void *key); + friend class redisDbPersistentDataSnapshot; public: ~redisDbPersistentData(); @@ -1200,10 +1202,7 @@ public: static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); size_t slots() const { return dictSlots(m_pdict); } - size_t size() const - { - return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); - } + size_t size() const; void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void trackkey(robj_roptr o) @@ -1260,7 +1259,6 @@ public: void emptyDbAsync(); // Note: If you do not need the obj then use the objless iterator version. It's faster bool iterate(std::function fn); - bool iterate_threadsafe(std::function fn) const; void setExpire(robj *key, robj *subkey, long long when); void setExpire(expireEntry &&e); expireEntry *getExpire(robj_roptr key); @@ -1277,23 +1275,15 @@ public: expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } - const redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); - void endSnapshot(const redisDbPersistentData *psnapshot); + const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); + void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); private: void ensure(const char *key); void ensure(const char *key, dictEntry **de); void storeDatabase(); void storeKey(const char *key, size_t cchKey, robj *o); - void recursiveFreeSnapshots(redisDbPersistentData *psnapshot); - - // These do not call ENSURE and so may have a NULL object - dict_iter random_threadsafe() const; - dict_iter find_threadsafe(const char *key) const - { - dictEntry *de = dictFind(m_pdict, key); - return dict_iter(de); - } + void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot); // Keyspace dict *m_pdict = nullptr; /* The keyspace for this DB */ @@ -1310,15 +1300,37 @@ private: // These two pointers are the same, UNLESS the database has been cleared. // in which case m_pdbSnapshot is NULL and we continue as though we weren' // in a snapshot - const redisDbPersistentData *m_pdbSnapshot = nullptr; - std::unique_ptr m_spdbSnapshotHOLDER; + const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; + std::unique_ptr m_spdbSnapshotHOLDER; int m_refCount = 0; }; +class redisDbPersistentDataSnapshot : protected redisDbPersistentData +{ + friend class redisDbPersistentData; +public: + bool iterate_threadsafe(std::function fn) const; + using redisDbPersistentData::createSnapshot; + using redisDbPersistentData::endSnapshot; + using redisDbPersistentData::end; + + dict_iter random_threadsafe() const; + dict_iter find_threadsafe(const char *key) const + { + dictEntry *de = dictFind(m_pdict, key); + return dict_iter(de); + } + + // These need to be fixed + using redisDbPersistentData::size; + using redisDbPersistentData::expireSize; + using redisDbPersistentData::getExpire; +}; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ -typedef struct redisDb : public redisDbPersistentData +typedef struct redisDb : public redisDbPersistentDataSnapshot { // Legacy C API, Do not add more friend void tryResizeHashTables(int); @@ -1350,6 +1362,39 @@ typedef struct redisDb : public redisDbPersistentData bool FKeyExpires(const char *key); size_t clear(bool fAsync, void(callback)(void*)); + // Import methods from redisDbPersistentData hidden by redisDbPersistentDataSnapshot + using redisDbPersistentData::slots; + using redisDbPersistentData::size; + using redisDbPersistentData::expand; + using redisDbPersistentData::trackkey; + using redisDbPersistentData::find; + using redisDbPersistentData::random; + using redisDbPersistentData::random_expire; + using redisDbPersistentData::findExpire; + using redisDbPersistentData::end; + using redisDbPersistentData::getStats; + using redisDbPersistentData::getExpireStats; + using redisDbPersistentData::insert; + using redisDbPersistentData::tryResize; + using redisDbPersistentData::incrementallyRehash; + using redisDbPersistentData::updateValue; + using redisDbPersistentData::syncDelete; + using redisDbPersistentData::asyncDelete; + using redisDbPersistentData::expireSize; + using redisDbPersistentData::removeExpire; + using redisDbPersistentData::removeSubkeyExpire; + using redisDbPersistentData::clear; + using redisDbPersistentData::emptyDbAsync; + using redisDbPersistentData::iterate; + using redisDbPersistentData::setExpire; + using redisDbPersistentData::getExpire; + using redisDbPersistentData::trackChanges; + using redisDbPersistentData::processChanges; + using redisDbPersistentData::setexpireUnsafe; + using redisDbPersistentData::setexpire; + using redisDbPersistentData::createSnapshot; + using redisDbPersistentData::endSnapshot; + public: expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ @@ -2545,7 +2590,7 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */