Add support for nested snapshots

Former-commit-id: 43456c7807152062d59b2a90597b6204f637f5cd
This commit is contained in:
John Sully 2019-11-22 20:53:36 -05:00
parent 219cd22fd6
commit fda0f82d64
8 changed files with 151 additions and 52 deletions

View File

@ -1412,7 +1412,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (g_pserver->aof_use_rdb_preamble) {
int error;
std::vector<redisDbPersistentData*> vecpdb;
std::vector<const redisDbPersistentData*> vecpdb;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecpdb.push_back(&g_pserver->db[idb]);

View File

@ -669,7 +669,43 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
return fResult;
}
bool redisDbPersistentData::iterate(std::function<bool(const char*)> fn)
bool redisDbPersistentData::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const
{
dictIterator *di = dictGetSafeIterator(m_pdict);
dictEntry *de = nullptr;
bool fResult = true;
while((de = dictNext(di)) != nullptr)
{
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
{
fResult = false;
break;
}
}
dictReleaseIterator(di);
if (m_pdbSnapshot != nullptr)
{
fResult = m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
// Before passing off to the user we need to make sure it's not already in the
// the current set, and not deleted
dictEntry *deCurrent = dictFind(m_pdict, key);
if (deCurrent != nullptr)
return true;
dictEntry *deTombstone = dictFind(m_pdictTombstone, key);
if (deTombstone != nullptr)
return true;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
return fn(key, o);
});
}
return fResult;
}
bool redisDbPersistentData::iterate(std::function<bool(const char*)> fn) const
{
dictIterator *di = dictGetSafeIterator(m_pdict);
dictEntry *de = nullptr;
@ -704,7 +740,7 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*)> fn)
return fResult;
}
void keysCommandCore(client *c, redisDbPersistentData *db, sds pattern)
void keysCommandCore(client *c, const redisDbPersistentData *db, sds pattern)
{
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
@ -737,7 +773,7 @@ int prepareClientToWrite(client *c, bool fAsync);
void keysCommand(client *c) {
sds pattern = szFromObj(c->argv[1]);
redisDbPersistentData *snapshot = nullptr;
const redisDbPersistentData *snapshot = nullptr;
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
snapshot = c->db->createSnapshot(c->mvccCheckpoint);
if (snapshot != nullptr)
@ -1428,6 +1464,11 @@ expireEntry *redisDbPersistentData::getExpire(robj_roptr key) {
return itrExpire.operator->();
}
const expireEntry *redisDbPersistentData::getExpire(robj_roptr key) const
{
return const_cast<redisDbPersistentData*>(this)->getExpire(key);
}
/* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
@ -1892,6 +1933,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
void redisDbPersistentData::initialize()
{
m_pdbSnapshot = nullptr;
m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType,this);
m_setexpire = new(MALLOC_LOCAL) expireset();
@ -2120,45 +2162,85 @@ void redisDbPersistentData::processChanges()
m_setchanged.clear();
}
redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint)
const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint)
{
serverAssert(GlobalLocksAcquired());
if (m_spdbSnapshotHOLDER != nullptr)
{
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint)
{
++m_snapshotRefcount;
m_spdbSnapshotHOLDER->m_refCount++;
return m_spdbSnapshotHOLDER.get();
}
return nullptr;
serverLog(LL_WARNING, "Nested snapshot created");
}
auto spdb = std::make_unique<redisDbPersistentData>();
auto spdb = std::unique_ptr<redisDbPersistentData>(new (MALLOC_LOCAL) redisDbPersistentData());
spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0;
spdb->m_pdict = m_pdict;
spdb->m_pdictTombstone = m_pdictTombstone;
spdb->m_pdict->iterators++; // fake an iterator so it doesn't rehash
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1;
if (m_setexpire != nullptr)
spdb->m_setexpire = m_setexpire;
m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType, this);
m_setexpire = new (MALLOC_LOCAL) expireset();
m_spdbSnapshotHOLDER = std::move(spdb);
m_pdbSnapshot = m_spdbSnapshotHOLDER.get();
++m_snapshotRefcount;
// Finally we need to take a ref on all our children snapshots. This ensures they aren't free'd before we are
redisDbPersistentData *pdbSnapshotNext = m_pdbSnapshot->m_spdbSnapshotHOLDER.get();
while (pdbSnapshotNext != nullptr)
{
pdbSnapshotNext->m_refCount++;
pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get();
}
return m_pdbSnapshot;
}
void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentData *psnapshot)
{
std::vector<redisDbPersistentData*> m_stackSnapshots;
// gather a stack of snapshots, we do this so we can free them in reverse
// Note: we don't touch the incoming psnapshot since the parent is free'ing that one
while ((psnapshot = psnapshot->m_spdbSnapshotHOLDER.get()) != nullptr)
{
m_stackSnapshots.push_back(psnapshot);
}
for (auto itr = m_stackSnapshots.rbegin(); itr != m_stackSnapshots.rend(); ++itr)
{
endSnapshot(*itr);
}
}
void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
{
if (!GlobalLocksAcquired())
serverLog(LL_WARNING, "Global locks not acquired");
serverAssert(m_spdbSnapshotHOLDER.get() == psnapshot);
// Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where
// a seperate thread holds the lock for it. Yes that's pretty crazy and should be fixed somehow...
--m_snapshotRefcount;
if (m_snapshotRefcount > 0)
if (m_spdbSnapshotHOLDER.get() != psnapshot)
{
serverAssert(m_spdbSnapshotHOLDER != nullptr);
m_spdbSnapshotHOLDER->endSnapshot(psnapshot);
return;
}
m_spdbSnapshotHOLDER->m_refCount--;
if (m_spdbSnapshotHOLDER->m_refCount > 0)
return;
serverAssert(m_spdbSnapshotHOLDER->m_refCount == 0);
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
m_spdbSnapshotHOLDER->m_pdict->iterators--;
@ -2166,7 +2248,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
{
// the database was cleared so we don't need to recover the snapshot
dictEmpty(m_pdictTombstone, nullptr);
m_spdbSnapshotHOLDER = nullptr;
m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER);
return;
}
@ -2220,12 +2302,25 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
std::swap(m_setexpire, m_spdbSnapshotHOLDER->m_setexpire);
// Finally free the snapshot
m_spdbSnapshotHOLDER = nullptr;
m_pdbSnapshot = nullptr;
if (m_pdbSnapshot != nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
{
m_pdbSnapshot = m_spdbSnapshotHOLDER->m_pdbSnapshot;
m_spdbSnapshotHOLDER->m_pdbSnapshot = nullptr;
}
else
{
m_pdbSnapshot = nullptr;
}
m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER);
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
serverAssert(m_pdbSnapshot == m_spdbSnapshotHOLDER.get() || m_pdbSnapshot == nullptr);
}
redisDbPersistentData::~redisDbPersistentData()
{
serverAssert(m_spdbSnapshotHOLDER == nullptr);
serverAssert(m_pdbSnapshot == nullptr);
serverAssert(m_refCount == 0);
dictRelease(m_pdict);
if (m_pdictTombstone)
dictRelease(m_pdictTombstone);

View File

@ -347,7 +347,7 @@ uint8_t LFULogIncr(uint8_t counter) {
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long LFUDecrAndReturn(robj_roptr o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = g_pserver->lfu_decay_time ? LFUTimeElapsed(ldt) / g_pserver->lfu_decay_time : 0;

View File

@ -6,7 +6,7 @@ extern "C" {
#include <sys/wait.h>
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveS3(char *s3bucket, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSaveS3(char *s3bucket, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
int status = EXIT_FAILURE;
int fd[2];

View File

@ -755,7 +755,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
/* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) {
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj_roptr key) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
@ -970,7 +970,7 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) {
RedisModuleIO io;
moduleValue *mv = (moduleValue*)ptrFromObj(o);
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb,key);
moduleInitIOContext(io,mt,rdb,key.unsafe_robjcast());
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@ -1034,7 +1034,7 @@ size_t rdbSavedObjectLen(robj *o) {
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEntry *pexpire) {
int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU;
@ -1115,12 +1115,12 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
int saveKey(rio *rdb, redisDbPersistentData *db, int flags, size_t *processed, const char *keystr, robj *o)
int saveKey(rio *rdb, const redisDbPersistentData *db, int flags, size_t *processed, const char *keystr, robj_roptr o)
{
robj key;
initStaticStringObject(key,(char*)keystr);
expireEntry *pexpire = db->getExpire(&key);
const expireEntry *pexpire = db->getExpire(&key);
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
return 0;
@ -1145,7 +1145,7 @@ int saveKey(rio *rdb, redisDbPersistentData *db, int flags, size_t *processed, c
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) {
int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) {
dictEntry *de;
dictIterator *di = NULL;
char magic[10];
@ -1161,7 +1161,7 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < cserver.dbnum; j++) {
redisDbPersistentData *db = rgpdb[j];
const redisDbPersistentData *db = rgpdb[j];
if (db->size() == 0) continue;
/* Write the SELECT DB opcode */
@ -1181,7 +1181,7 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r
/* Iterate this DB writing every entry */
size_t ckeysExpired = 0;
bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool {
bool fSavedAll = db->iterate_threadsafe([&](const char *keystr, robj_roptr o)->bool {
if (o->FExpires())
++ckeysExpired;
@ -1239,7 +1239,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(rio *rdb, const redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@ -1257,7 +1257,7 @@ werr: /* Write error. */
return C_ERR;
}
int rdbSaveFp(FILE *fp, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSaveFp(FILE *fp, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
int error = 0;
rio rdb;
@ -1274,9 +1274,9 @@ int rdbSaveFp(FILE *fp, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
return C_OK;
}
int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
std::vector<redisDbPersistentData*> vecdb;
std::vector<const redisDbPersistentData*> vecdb;
if (rgpdb == nullptr)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
@ -1296,7 +1296,7 @@ int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) {
int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@ -1360,7 +1360,7 @@ werr:
struct rdbSaveThreadArgs
{
rdbSaveInfo rsi;
redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length
const redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length
};
void *rdbSaveThread(void *vargs)
@ -2453,7 +2453,7 @@ struct rdbSaveSocketThreadArgs
int *fds;
int numfds;
uint64_t *clientids;
redisDbPersistentData *rgpdb[1];
const redisDbPersistentData *rgpdb[1];
};
void *rdbSaveToSlavesSocketsThread(void *vargs)
{

View File

@ -140,16 +140,15 @@ int rdbLoadFile(const char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(int childpid);
int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSave(const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, const redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key);
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj_roptr key);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key, uint64_t mvcc_tstamp);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
robj *rdbLoadStringObject(rio *rdb);
ssize_t rdbSaveStringObject(rio *rdb, robj_roptr obj);
ssize_t rdbSaveRawString(rio *rdb, const unsigned char *s, size_t len);

View File

@ -1183,6 +1183,7 @@ void dictSdsNOPDestructor(void *, void *) {}
void dictDbKeyDestructor(void *privdata, void *key)
{
DICT_NOTUSED(privdata);
sdsfree((sds)key);
}

View File

@ -935,7 +935,7 @@ public:
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); }
const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; }
const subexpireEntry &operator[](size_t idx) const { return m_vecexpireEntries[idx]; }
size_t size() const noexcept { return m_vecexpireEntries.size(); }
};
@ -951,11 +951,11 @@ public:
class iter
{
friend class expireEntry;
expireEntry *m_pentry = nullptr;
const expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(expireEntry *pentry, size_t idx)
iter(const expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
@ -1035,6 +1035,7 @@ public:
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; }
bool operator==(const char *key) const noexcept
@ -1087,8 +1088,8 @@ public:
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() { return iter(this, 0); }
iter end()
iter begin() const { return iter(this, 0); }
iter end() const
{
if (FFat())
return iter(this, u.m_pfatentry->size());
@ -1103,7 +1104,7 @@ public:
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
bool FGetPrimaryExpire(long long *pwhen)
bool FGetPrimaryExpire(long long *pwhen) const
{
*pwhen = -1;
for (auto itr : *this)
@ -1280,10 +1281,12 @@ public:
void emptyDbAsync();
// Note: If you do not need the obj then use the objless iterator version. It's faster
bool iterate(std::function<bool(const char*, robj*)> fn);
bool iterate(std::function<bool(const char*)> fn);
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
bool iterate(std::function<bool(const char*)> fn) const;
void setExpire(robj *key, robj *subkey, long long when);
void setExpire(expireEntry &&e);
expireEntry *getExpire(robj_roptr key);
const expireEntry *getExpire(robj_roptr key) const;
void initialize();
void trackChanges() { m_fTrackingChanges++; }
@ -1296,7 +1299,7 @@ public:
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint);
const redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint);
void endSnapshot(const redisDbPersistentData *psnapshot);
private:
@ -1304,6 +1307,7 @@ private:
void ensure(const char *key, dictEntry **de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
void recursiveFreeSnapshots(redisDbPersistentData *psnapshot);
// Keyspace
dict *m_pdict = nullptr; /* The keyspace for this DB */
@ -1322,7 +1326,7 @@ private:
// in a snapshot
redisDbPersistentData *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentData> m_spdbSnapshotHOLDER;
int m_snapshotRefcount = 0;
int m_refCount = 0;
};
/* Redis database representation. There are multiple databases identified
@ -2555,7 +2559,7 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, const redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */
@ -2885,7 +2889,7 @@ void evictionPoolAlloc(void);
#define LFU_INIT_VAL 5
unsigned long LFUGetTimeInMinutes(void);
uint8_t LFULogIncr(uint8_t value);
unsigned long LFUDecrAndReturn(robj *o);
unsigned long LFUDecrAndReturn(robj_roptr o);
/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);