Final design of forkless background save. expires NYI

Former-commit-id: e2dc24b441bf52b181c820c853e0bc7524254f3f
This commit is contained in:
John Sully 2019-11-18 19:47:12 -05:00
parent 331f38b94e
commit b51ece810f
5 changed files with 153 additions and 92 deletions

View File

@ -371,6 +371,8 @@ bool redisDbPersistentData::syncDelete(robj *key)
if (itr != nullptr && itr.val()->FExpires())
removeExpire(key, itr);
if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) {
if (m_pdbSnapshot != nullptr)
dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr);
if (g_pserver->cluster_enabled) slotToKeyDel(key);
return 1;
} else {
@ -636,7 +638,7 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
bool fResult = true;
while((de = dictNext(di)) != nullptr)
{
ensure(de);
ensure((const char*)dictGetKey(de), &de);
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
{
fResult = false;
@ -1793,6 +1795,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
void redisDbPersistentData::initialize()
{
m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType,this);
m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = false;
m_fTrackingChanges = 0;
@ -1846,24 +1849,27 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_pstorage != nullptr)
m_pstorage->clear();
m_pdbSnapshot = nullptr;
}
/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2)
{
redisDbPersistentData aux = *db1;
redisDbPersistentData aux = std::move(*db1);
db1->m_pdict = db2->m_pdict;
db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire;
db1->m_pstorage = db2->m_pstorage;
db1->m_spdbSnapshot = db2->m_spdbSnapshot;
db1->m_pdbSnapshot = db2->m_pdbSnapshot;
db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER);
db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire;
db2->m_pstorage = aux.m_pstorage;
db2->m_spdbSnapshot = aux.m_spdbSnapshot;
db2->m_pdbSnapshot = aux.m_pdbSnapshot;
db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER);
db1->m_pdict->privdata = static_cast<redisDbPersistentData*>(db1);
db2->m_pdict->privdata = static_cast<redisDbPersistentData*>(db2);
@ -1919,41 +1925,45 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
void redisDbPersistentData::ensure(const char *key)
{
dictEntry *de = dictFind(m_pdict, key);
ensure(de);
ensure(key, &de);
}
void redisDbPersistentData::ensure(dictEntry *de)
void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
{
if (de != nullptr && dictGetVal(de) == nullptr)
serverAssert(sdsKey != nullptr);
if (*pde == nullptr && m_pdbSnapshot != nullptr)
{
if (m_spdbSnapshot != nullptr)
dictEntry *deTombstone = dictFind(m_pdictTombstone, sdsKey);
if (deTombstone == nullptr)
{
auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de));
serverAssert(itr != m_spdbSnapshot->end());
auto itr = m_pdbSnapshot->find(sdsKey);
if (itr == m_pdbSnapshot->end())
return; // not found
if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
dictSetVal(m_pdict, de, itr.val());
dictAdd(m_pdict, sdsdup(sdsKey), itr.val());
}
else
{
sds strT = serializeStoredObject(itr.val());
robj *objNew = deserializeStoredObject(strT, sdslen(strT));
sdsfree(strT);
dictSetVal(m_pdict, de, objNew);
dictAdd(m_pdict, sdsdup(sdsKey), objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
}
*pde = dictFind(m_pdict, sdsKey);
}
else
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(de);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, de, o);
});
}
}
else if (*pde != nullptr && dictGetVal(*pde) == nullptr)
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(*pde);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, *pde, o);
});
}
}
@ -2012,81 +2022,100 @@ void redisDbPersistentData::processChanges()
m_setchanged.clear();
}
std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot()
redisDbPersistentData *redisDbPersistentData::createSnapshot()
{
serverAssert(GlobalLocksAcquired());
serverAssert(m_spdbSnapshot == nullptr);
auto spdb = std::make_shared<redisDbPersistentData>();
spdb->m_pdict = dictCreate(&dbDictType,spdb.get());
serverAssert(m_spdbSnapshotHOLDER == nullptr);
auto spdb = std::make_unique<redisDbPersistentData>();
spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0;
spdb->m_pdict->rehashidx = m_pdict->rehashidx;
spdb->m_pdict = m_pdict;
spdb->m_pdict->iterators++; // fake an iterator so it doesn't rehash
if (m_setexpire != nullptr)
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
spdb->m_setexpire = m_setexpire;
for (unsigned iht = 0; iht < 2; ++iht)
{
spdb->m_pdict->ht[iht] = m_pdict->ht[iht];
if (m_pdict->ht[iht].size)
m_pdict->ht[iht].table = (dictEntry**)zcalloc(m_pdict->ht[iht].size*sizeof(dictEntry*), MALLOC_SHARED);
else
m_pdict->ht[iht].table = nullptr;
for (size_t idx = 0; idx < m_pdict->ht[iht].size; ++idx)
{
const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx];
dictEntry **pdeDst = &m_pdict->ht[iht].table[idx];
while (deSrc != nullptr)
{
*pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED);
(*pdeDst)->key = deSrc->key;
(*pdeDst)->v.val = nullptr;
(*pdeDst)->next = nullptr;
pdeDst = &(*pdeDst)->next;
deSrc = deSrc->next;
}
}
}
m_pdict = dictCreate(&dbDictType,this);
m_setexpire = new (MALLOC_LOCAL) expireset();
m_spdbSnapshot = std::move(spdb);
return m_spdbSnapshot;
m_spdbSnapshotHOLDER = std::move(spdb);
m_pdbSnapshot = m_spdbSnapshotHOLDER.get();
return m_pdbSnapshot;
}
void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
{
serverAssert(GlobalLocksAcquired());
serverAssert(m_spdbSnapshot.get() == psnapshot);
if (!GlobalLocksAcquired())
serverLog(LL_WARNING, "Global locks not acquired");
serverAssert(m_spdbSnapshotHOLDER.get() == psnapshot);
m_spdbSnapshotHOLDER->m_pdict->iterators--;
dictIterator *di = dictGetIterator(m_pdict);
if (m_pdbSnapshot == nullptr)
{
// the database was cleared so we don't need to recover the snapshot
dictEmpty(m_pdictTombstone, nullptr);
m_spdbSnapshotHOLDER = nullptr;
return;
}
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
dictIterator *di = dictGetIterator(m_pdictTombstone);
dictEntry *de;
while ((de = dictNext(di)) != NULL)
{
dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de));
if (dictGetVal(de) == nullptr)
dictEntry *deSnapshot = dictFind(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de));
if (deSnapshot == nullptr)
continue; // sometimes we delete things that were never in the snapshot
robj *obj = (robj*)dictGetVal(deSnapshot);
const char *key = (const char*)dictGetKey(deSnapshot);
if (obj == nullptr || obj->FExpires())
{
if (deSnapshot != nullptr)
auto itrExpire = m_spdbSnapshotHOLDER->m_setexpire->find(key);
if (itrExpire != m_spdbSnapshotHOLDER->m_setexpire->end())
{
de->v.val = deSnapshot->v.val;
deSnapshot->v.val = nullptr;
m_spdbSnapshotHOLDER->m_setexpire->erase(itrExpire); // Note: normally we would have to set obj::fexpire false but we're deleting it anyways...
}
}
if (deSnapshot && (dictGetKey(deSnapshot) == dictGetKey(de)))
{
// The key is owned by the parent snapshot, so we modify the DB key dtor
// to ensure the key is not free'd during the delete
m_spdbSnapshot->m_pdict->type = &dbSnapshotDictType;
dictDelete(m_spdbSnapshot->m_pdict, dictGetKey(de));
m_spdbSnapshot->m_pdict->type = &dbDictType;
}
dictDelete(m_spdbSnapshotHOLDER->m_pdict, key);
}
dictReleaseIterator(di);
m_spdbSnapshot->m_pdict->iterators--;
m_spdbSnapshot = nullptr;
dictEmpty(m_pdictTombstone, nullptr);
// Stage 2 Move all new keys to the snapshot DB
di = dictGetIterator(m_pdict);
while ((de = dictNext(di)) != NULL)
{
dictEntry *deExisting = dictFind(m_spdbSnapshotHOLDER->m_pdict, (const char*)dictGetKey(de));
if (deExisting != nullptr)
{
decrRefCount((robj*)dictGetVal(deExisting));
dictSetVal(m_spdbSnapshotHOLDER->m_pdict, deExisting, dictGetVal(de));
}
else
{
dictAdd(m_spdbSnapshotHOLDER->m_pdict, sdsdup((sds)dictGetKey(de)), dictGetVal(de));
}
incrRefCount((robj*)dictGetVal(de));
}
dictReleaseIterator(di);
// Stage 3 swap the databases with the snapshot
std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict);
// Stage 4 merge all expires
// TODO
std::swap(m_setexpire, m_spdbSnapshotHOLDER->m_setexpire);
// Finally free the snapshot
m_spdbSnapshotHOLDER = nullptr;
m_pdbSnapshot = nullptr;
}
redisDbPersistentData::~redisDbPersistentData()
{
dictRelease(m_pdict);
if (m_pdictTombstone)
dictRelease(m_pdictTombstone);
delete m_setexpire;
}

View File

@ -56,6 +56,8 @@ bool redisDbPersistentData::asyncDelete(robj *key) {
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key));
if (m_pdbSnapshot != nullptr)
dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr);
if (de) {
robj *val = (robj*)dictGetVal(de);
if (val->FExpires())

View File

@ -1348,7 +1348,10 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (g_pserver->rdbThreadVars.fRdbThreadCancel)
serverLog(LL_WARNING, "Background save cancelled");
else
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
@ -1378,10 +1381,13 @@ void *rdbSaveThread(void *vargs)
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
aeAcquireLock();
// If we were told to cancel the requesting thread holds the lock for us
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
aeReleaseLock();
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeReleaseLock();
zfree(args);
return (retval == C_OK) ? (void*)0 : (void*)1;
}
@ -1397,7 +1403,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get();
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot();
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
@ -2423,6 +2429,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
* the child did not exit for an error, but because we wanted), and performs
* the cleanup needed. */
void killRDBChild(void) {
serverAssert(GlobalLocksAcquired());
g_pserver->rdbThreadVars.fRdbThreadCancel = true;
void *rval;
pthread_join(g_pserver->rdbThreadVars.rdb_child_thread,&rval);
@ -2508,10 +2515,14 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
}
zfree(msg);
}
aeAcquireLock();
// If we were told to cancel the requesting thread is holding the lock for us
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
aeReleaseLock();
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
aeReleaseLock();
zfree(args->clientids);
zfree(args);
rioFreeFdset(&slave_sockets);
@ -2574,7 +2585,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
start = ustime();
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get();
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot();
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;

View File

@ -1183,14 +1183,6 @@ void dictSdsNOPDestructor(void *, void *) {}
void dictDbKeyDestructor(void *privdata, void *key)
{
redisDbPersistentData *owner = (redisDbPersistentData*)privdata;
serverAssert(owner != nullptr);
if (owner->m_spdbSnapshot != nullptr)
{
dictEntry *deSnapshot = dictFind(owner->m_spdbSnapshot->m_pdict, key);
if (deSnapshot && (key == dictGetKey(deSnapshot)))
return; // don't free, it's now owned by the snapshot
}
sdsfree((sds)key);
}

View File

@ -1108,10 +1108,16 @@ class redisDbPersistentData
public:
~redisDbPersistentData();
redisDbPersistentData() = default;
redisDbPersistentData(redisDbPersistentData &&) = default;
static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2);
size_t slots() const { return dictSlots(m_pdict); }
size_t size() const { return dictSize(m_pdict); }
size_t size() const
{
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
}
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void trackkey(robj_roptr o)
@ -1128,7 +1134,7 @@ public:
dict_iter find(const char *key)
{
dictEntry *de = dictFind(m_pdict, key);
ensure(de);
ensure(key, &de);
return dict_iter(de);
}
@ -1139,8 +1145,24 @@ public:
dict_iter random()
{
if (size() == 0)
return dict_iter(nullptr);
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0)
{
dict_iter iter(nullptr);
double pctInSnapshot = (double)m_pdbSnapshot->size() / (size() + m_pdbSnapshot->size());
double randval = (double)rand()/RAND_MAX;
if (randval <= pctInSnapshot)
{
iter = m_pdbSnapshot->random();
ensure(iter.key());
dictEntry *de = dictFind(m_pdict, iter.key());
return dict_iter(de);
}
}
dictEntry *de = dictGetRandomKey(m_pdict);
ensure(de);
if (de != nullptr)
ensure((const char*)dictGetKey(de), &de);
return dict_iter(de);
}
@ -1187,17 +1209,18 @@ public:
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
std::shared_ptr<redisDbPersistentData> createSnapshot();
redisDbPersistentData *createSnapshot();
void endSnapshot(const redisDbPersistentData *psnapshot);
private:
void ensure(const char *key);
void ensure(dictEntry *de);
void ensure(const char *key, dictEntry **de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
// Keyspace
dict *m_pdict = nullptr; /* The keyspace for this DB */
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
int m_fTrackingChanges = 0; // Note: Stack based
bool m_fAllChanged = false;
std::set<std::string> m_setchanged;
@ -1206,7 +1229,11 @@ private:
// Expire
expireset *m_setexpire = nullptr;
std::shared_ptr<redisDbPersistentData> m_spdbSnapshot;
// These two pointers are the same, UNLESS the database has been cleared.
// in which case m_pdbSnapshot is NULL and we continue as though we weren'
// in a snapshot
redisDbPersistentData *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentData> m_spdbSnapshotHOLDER;
};
/* Redis database representation. There are multiple databases identified