Restrict the snapshot API

Former-commit-id: cbafb88f267a9480cdbde6a88e9e1992e8a85971
This commit is contained in:
John Sully 2019-11-24 17:53:06 -05:00
parent 9a75557651
commit 551b30fb77
6 changed files with 100 additions and 50 deletions

View File

@ -1412,7 +1412,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (g_pserver->aof_use_rdb_preamble) {
int error;
std::vector<const redisDbPersistentData*> vecpdb;
std::vector<const redisDbPersistentDataSnapshot*> vecpdb;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecpdb.push_back(&g_pserver->db[idb]);

View File

@ -669,7 +669,7 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
return fResult;
}
bool redisDbPersistentData::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const
{
dictEntry *de = nullptr;
bool fResult = true;
@ -707,7 +707,7 @@ bool redisDbPersistentData::iterate_threadsafe(std::function<bool(const char*, r
client *createFakeClient(void);
void freeFakeClient(client *);
void keysCommandCore(client *cIn, const redisDbPersistentData *db, sds pattern)
void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds pattern)
{
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
@ -752,7 +752,7 @@ int prepareClientToWrite(client *c, bool fAsync);
void keysCommand(client *c) {
sds pattern = szFromObj(c->argv[1]);
const redisDbPersistentData *snapshot = nullptr;
const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
if (snapshot != nullptr)
@ -2141,14 +2141,14 @@ void redisDbPersistentData::processChanges()
m_setchanged.clear();
}
const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
{
serverAssert(GlobalLocksAcquired());
serverAssert(m_refCount == 0); // do not call this on a snapshot
// First see if we have too many levels and can bail out of this to reduce load
int levels = 1;
redisDbPersistentData *psnapshot = m_spdbSnapshotHOLDER.get();
redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get();
while (psnapshot != nullptr)
{
++levels;
@ -2166,7 +2166,7 @@ const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvcc
}
serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels);
}
auto spdb = std::unique_ptr<redisDbPersistentData>(new (MALLOC_LOCAL) redisDbPersistentData());
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0;
@ -2201,24 +2201,24 @@ const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvcc
return m_pdbSnapshot;
}
void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentData *psnapshot)
void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot)
{
std::vector<redisDbPersistentData*> m_stackSnapshots;
std::vector<redisDbPersistentDataSnapshot*> stackSnapshots;
// gather a stack of snapshots, we do this so we can free them in reverse
// Note: we don't touch the incoming psnapshot since the parent is free'ing that one
while ((psnapshot = psnapshot->m_spdbSnapshotHOLDER.get()) != nullptr)
{
m_stackSnapshots.push_back(psnapshot);
stackSnapshots.push_back(psnapshot);
}
for (auto itr = m_stackSnapshots.rbegin(); itr != m_stackSnapshots.rend(); ++itr)
for (auto itr = stackSnapshots.rbegin(); itr != stackSnapshots.rend(); ++itr)
{
endSnapshot(*itr);
}
}
void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
{
// Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where
// a seperate thread holds the lock for it. Yes that's pretty crazy and should be fixed somehow...
@ -2362,7 +2362,7 @@ dict_iter redisDbPersistentData::random()
return dict_iter(de);
}
dict_iter redisDbPersistentData::random_threadsafe() const
dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const
{
if (size() == 0)
return dict_iter(nullptr);
@ -2379,4 +2379,9 @@ dict_iter redisDbPersistentData::random_threadsafe() const
serverAssert(dictSize(m_pdict) > 0);
dictEntry *de = dictGetRandomKey(m_pdict);
return dict_iter(de);
}
size_t redisDbPersistentData::size() const
{
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
}

View File

@ -6,7 +6,7 @@ extern "C" {
#include <sys/wait.h>
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveS3(char *s3bucket, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSaveS3(char *s3bucket, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi)
{
int status = EXIT_FAILURE;
int fd[2];

View File

@ -1115,7 +1115,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
int saveKey(rio *rdb, const redisDbPersistentData *db, int flags, size_t *processed, const char *keystr, robj_roptr o)
int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o)
{
robj key;
@ -1145,7 +1145,7 @@ int saveKey(rio *rdb, const redisDbPersistentData *db, int flags, size_t *proces
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) {
int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, int flags, rdbSaveInfo *rsi) {
dictEntry *de;
dictIterator *di = NULL;
char magic[10];
@ -1161,7 +1161,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int fl
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < cserver.dbnum; j++) {
const redisDbPersistentData *db = rgpdb[j];
const redisDbPersistentDataSnapshot *db = rgpdb[j];
if (db->size() == 0) continue;
/* Write the SELECT DB opcode */
@ -1239,7 +1239,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, const redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@ -1257,7 +1257,7 @@ werr: /* Write error. */
return C_ERR;
}
int rdbSaveFp(FILE *fp, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSaveFp(FILE *fp, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi)
{
int error = 0;
rio rdb;
@ -1274,9 +1274,9 @@ int rdbSaveFp(FILE *fp, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
return C_OK;
}
int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSave(const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi)
{
std::vector<const redisDbPersistentData*> vecdb;
std::vector<const redisDbPersistentDataSnapshot*> vecdb;
if (rgpdb == nullptr)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
@ -1296,7 +1296,7 @@ int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) {
int rdbSaveFile(char *filename, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@ -1360,7 +1360,7 @@ werr:
struct rdbSaveThreadArgs
{
rdbSaveInfo rsi;
const redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length
const redisDbPersistentDataSnapshot *rgpdb[1]; // NOTE: Variable Length
};
void *rdbSaveThread(void *vargs)
@ -1394,7 +1394,7 @@ void *rdbSaveThread(void *vargs)
int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
{
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentData*)), MALLOC_LOCAL);
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT;
if (rsi == nullptr)
rsi = &rsiT;
@ -2453,7 +2453,7 @@ struct rdbSaveSocketThreadArgs
int *fds;
int numfds;
uint64_t *clientids;
const redisDbPersistentData *rgpdb[1];
const redisDbPersistentDataSnapshot *rgpdb[1];
};
void *rdbSaveToSlavesSocketsThread(void *vargs)
{
@ -2557,7 +2557,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdb_pipe_read_result_from_child = pipefds[0];
g_pserver->rdb_pipe_write_result_to_parent = pipefds[1];
args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentData*)*(cserver.dbnum-1), MALLOC_LOCAL);
args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentDataSnapshot*)*(cserver.dbnum-1), MALLOC_LOCAL);
/* Collect the file descriptors of the slaves we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */

View File

@ -140,10 +140,10 @@ int rdbLoadFile(const char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(int childpid);
int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSave(const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, const redisDbPersistentDataSnapshot **rgpdb, rdbSaveInfo *rsi);
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj_roptr key);
size_t rdbSavedObjectLen(robj *o);

View File

@ -1188,9 +1188,11 @@ public:
operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; }
};
class redisDbPersistentDataSnapshot;
class redisDbPersistentData
{
friend void dictDbKeyDestructor(void *privdata, void *key);
friend class redisDbPersistentDataSnapshot;
public:
~redisDbPersistentData();
@ -1200,10 +1202,7 @@ public:
static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2);
size_t slots() const { return dictSlots(m_pdict); }
size_t size() const
{
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
}
size_t size() const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void trackkey(robj_roptr o)
@ -1260,7 +1259,6 @@ public:
void emptyDbAsync();
// Note: If you do not need the obj then use the objless iterator version. It's faster
bool iterate(std::function<bool(const char*, robj*)> fn);
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
void setExpire(robj *key, robj *subkey, long long when);
void setExpire(expireEntry &&e);
expireEntry *getExpire(robj_roptr key);
@ -1277,23 +1275,15 @@ public:
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
const redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
void endSnapshot(const redisDbPersistentData *psnapshot);
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
private:
void ensure(const char *key);
void ensure(const char *key, dictEntry **de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
void recursiveFreeSnapshots(redisDbPersistentData *psnapshot);
// These do not call ENSURE and so may have a NULL object
dict_iter random_threadsafe() const;
dict_iter find_threadsafe(const char *key) const
{
dictEntry *de = dictFind(m_pdict, key);
return dict_iter(de);
}
void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot);
// Keyspace
dict *m_pdict = nullptr; /* The keyspace for this DB */
@ -1310,15 +1300,37 @@ private:
// These two pointers are the same, UNLESS the database has been cleared.
// in which case m_pdbSnapshot is NULL and we continue as though we weren'
// in a snapshot
const redisDbPersistentData *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentData> m_spdbSnapshotHOLDER;
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
int m_refCount = 0;
};
class redisDbPersistentDataSnapshot : protected redisDbPersistentData
{
friend class redisDbPersistentData;
public:
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
using redisDbPersistentData::createSnapshot;
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::end;
dict_iter random_threadsafe() const;
dict_iter find_threadsafe(const char *key) const
{
dictEntry *de = dictFind(m_pdict, key);
return dict_iter(de);
}
// These need to be fixed
using redisDbPersistentData::size;
using redisDbPersistentData::expireSize;
using redisDbPersistentData::getExpire;
};
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb : public redisDbPersistentData
typedef struct redisDb : public redisDbPersistentDataSnapshot
{
// Legacy C API, Do not add more
friend void tryResizeHashTables(int);
@ -1350,6 +1362,39 @@ typedef struct redisDb : public redisDbPersistentData
bool FKeyExpires(const char *key);
size_t clear(bool fAsync, void(callback)(void*));
// Import methods from redisDbPersistentData hidden by redisDbPersistentDataSnapshot
using redisDbPersistentData::slots;
using redisDbPersistentData::size;
using redisDbPersistentData::expand;
using redisDbPersistentData::trackkey;
using redisDbPersistentData::find;
using redisDbPersistentData::random;
using redisDbPersistentData::random_expire;
using redisDbPersistentData::findExpire;
using redisDbPersistentData::end;
using redisDbPersistentData::getStats;
using redisDbPersistentData::getExpireStats;
using redisDbPersistentData::insert;
using redisDbPersistentData::tryResize;
using redisDbPersistentData::incrementallyRehash;
using redisDbPersistentData::updateValue;
using redisDbPersistentData::syncDelete;
using redisDbPersistentData::asyncDelete;
using redisDbPersistentData::expireSize;
using redisDbPersistentData::removeExpire;
using redisDbPersistentData::removeSubkeyExpire;
using redisDbPersistentData::clear;
using redisDbPersistentData::emptyDbAsync;
using redisDbPersistentData::iterate;
using redisDbPersistentData::setExpire;
using redisDbPersistentData::getExpire;
using redisDbPersistentData::trackChanges;
using redisDbPersistentData::processChanges;
using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire;
using redisDbPersistentData::createSnapshot;
using redisDbPersistentData::endSnapshot;
public:
expireset::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
@ -2545,7 +2590,7 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */