Dependencies for forkless RDB saving

Former-commit-id: c12be594754ac8079f467e0fa9f8580bd65e3a55
This commit is contained in:
John Sully 2019-10-17 22:53:50 -04:00
parent 6ee58a3b21
commit 48787c8a03
8 changed files with 162 additions and 47 deletions

View File

@ -1411,7 +1411,12 @@ int rewriteAppendOnlyFile(char *filename) {
if (g_pserver->aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
std::vector<redisDbPersistentData*> vecpdb;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecpdb.push_back(&g_pserver->db[idb]);
}
if (rdbSaveRio(&aof,vecpdb.data(),&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}

View File

@ -552,7 +552,7 @@ void flushallCommand(client *c) {
int saved_dirty = g_pserver->dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(rsiptr);
rdbSave(nullptr, rsiptr);
g_pserver->dirty = saved_dirty;
}
g_pserver->dirty++;
@ -1917,17 +1917,34 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
dictSetVal(m_pdict, itr.de, val);
}
void redisDbPersistentData::ensure(const char *key)
{
dictEntry *de = dictFind(m_pdict, key);
ensure(de);
}
void redisDbPersistentData::ensure(dictEntry *de)
{
if (de != nullptr && dictGetVal(de) == nullptr)
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(de);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, de, o);
});
if (m_spdbSnapshot != nullptr)
{
auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de));
sds strT = serializeStoredObject(itr.val());
robj *objNew = deserializeStoredObject(strT, sdslen(strT));
sdsfree(strT);
dictSetVal(m_pdict, de, objNew);
}
else
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(de);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, de, o);
});
}
}
}
@ -1984,4 +2001,66 @@ void redisDbPersistentData::processChanges()
}
}
m_setchanged.clear();
}
std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot()
{
serverAssert(m_spdbSnapshot == nullptr);
auto spdb = std::make_shared<redisDbPersistentData>();
spdb->initialize();
for (unsigned iht = 0; iht < 2; ++iht)
{
spdb->m_pdict->ht[iht] = m_pdict->ht[iht];
if (m_pdict->ht[iht].size)
m_pdict->ht[iht].table = (dictEntry**)zcalloc(m_pdict->ht[iht].size*sizeof(dictEntry*), MALLOC_SHARED);
else
m_pdict->ht[iht].table = nullptr;
for (size_t idx = 0; idx < m_pdict->ht[iht].size; ++idx)
{
const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx];
dictEntry **pdeDst = &m_pdict->ht[iht].table[idx];
if (deSrc != nullptr)
{
*pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED);
(*pdeDst)->key = deSrc->key;
(*pdeDst)->v.val = nullptr;
(*pdeDst)->next = nullptr;
pdeDst = &(*pdeDst)->next;
deSrc = deSrc->next;
}
}
}
m_spdbSnapshot = std::move(spdb);
return m_spdbSnapshot;
}
void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
{
serverAssert(m_spdbSnapshot.get() == psnapshot);
dictIterator *di = dictGetIterator(m_pdict);
dictEntry *de;
while ((de = dictNext(di)) != NULL)
{
if (dictGetVal(de) == nullptr)
{
dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de));
if (deSnapshot != nullptr)
{
dictSetVal(m_pdict, de, dictGetVal(deSnapshot));
}
}
}
dictReleaseIterator(di);
m_spdbSnapshot = nullptr;
}
redisDbPersistentData::~redisDbPersistentData()
{
dictRelease(m_pdict);
if (m_setexpire)
delete m_setexpire;
}

View File

@ -359,7 +359,7 @@ NULL
} else if (!strcasecmp(szFromObj(c->argv[1]),"reload")) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(rsiptr) != C_OK) {
if (rdbSave(nullptr, rsiptr) != C_OK) {
addReply(c,shared.err);
return;
}

View File

@ -6,7 +6,7 @@ extern "C" {
#include <sys/wait.h>
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
int rdbSaveS3(char *s3bucket, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
int status = EXIT_FAILURE;
int fd[2];
@ -40,7 +40,7 @@ int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi)
return C_ERR;
}
if (rdbSaveFp(fp, rsi) != C_OK)
if (rdbSaveFp(fp, rgpdb, rsi) != C_OK)
{
fclose(fp);
return C_ERR;

View File

@ -42,6 +42,8 @@
#include <arpa/inet.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <thread>
#include <future>
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
@ -1142,7 +1144,7 @@ int saveKey(rio *rdb, redisDbPersistentData *db, int flags, size_t *processed, c
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) {
dictEntry *de;
dictIterator *di = NULL;
char magic[10];
@ -1157,7 +1159,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < cserver.dbnum; j++) {
redisDbPersistentData *db = static_cast<redisDbPersistentData*>(g_pserver->db+j);
redisDbPersistentData *db = rgpdb[j];
if (db->size() == 0) continue;
/* Write the SELECT DB opcode */
@ -1229,7 +1231,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(rio *rdb, redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@ -1237,7 +1239,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rdbSaveRio(rdb,rgpdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@ -1247,7 +1249,7 @@ werr: /* Write error. */
return C_ERR;
}
int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi)
int rdbSaveFp(FILE *fp, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
int error = 0;
rio rdb;
@ -1257,26 +1259,36 @@ int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi)
if (g_pserver->rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
if (rdbSaveRio(&rdb,rgpdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
return C_ERR;
}
return C_OK;
}
int rdbSave(rdbSaveInfo *rsi)
int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
std::vector<redisDbPersistentData*> vecdb;
if (rgpdb == nullptr)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecdb.push_back(&g_pserver->db[idb]);
}
rgpdb = vecdb.data();
}
int err = C_OK;
if (g_pserver->rdb_filename != NULL)
err = rdbSaveFile(g_pserver->rdb_filename, rsi);
err = rdbSaveFile(g_pserver->rdb_filename, rgpdb, rsi);
if (err == C_OK && g_pserver->rdb_s3bucketpath != NULL)
err = rdbSaveS3(g_pserver->rdb_s3bucketpath, rsi);
err = rdbSaveS3(g_pserver->rdb_s3bucketpath, rgpdb, rsi);
return err;
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSaveFile(char *filename, rdbSaveInfo *rsi) {
int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@ -1294,7 +1306,7 @@ int rdbSaveFile(char *filename, rdbSaveInfo *rsi) {
return C_ERR;
}
if (rdbSaveFp(fp, rsi) == C_ERR){
if (rdbSaveFp(fp, rgpdb, rsi) == C_ERR){
goto werr;
}
@ -1331,6 +1343,24 @@ werr:
return C_ERR;
}
int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
{
int retval = rdbSave(rgpdb, rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
return (retval == C_OK) ? 0 : 1;
}
int rdbSaveBackground(rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
@ -1343,25 +1373,11 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("keydb-rdb-bgsave");
retval = rdbSave(rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
int rval = rdbSaveThread(nullptr, rsi);
exitFromChild(rval);
} else {
/* Parent */
g_pserver->stat_fork_time = ustime()-start;
@ -1379,6 +1395,7 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
g_pserver->rdb_child_pid = childpid;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
@ -2431,7 +2448,13 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
closeListeningSockets(0);
redisSetProcTitle("keydb-rdb-to-slaves");
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
std::vector<redisDbPersistentData*> vecpdb;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
vecpdb.push_back(&g_pserver->db[idb]);
}
retval = rdbSaveRioWithEOFMark(&slave_sockets,vecpdb.data(),NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;
@ -2539,7 +2562,7 @@ void saveCommand(client *c) {
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(rsiptr) == C_OK) {
if (rdbSave(nullptr, rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);

View File

@ -140,10 +140,10 @@ int rdbLoadFile(const char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, rdbSaveInfo *rsi);
int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveS3(char *path, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key);
size_t rdbSavedObjectLen(robj *o);

View File

@ -3773,7 +3773,7 @@ int prepareForShutdown(int flags) {
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(rsiptr) != C_OK) {
if (rdbSave(nullptr, rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background

View File

@ -1083,6 +1083,8 @@ public:
class redisDbPersistentData
{
public:
~redisDbPersistentData();
static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2);
size_t slots() const { return dictSlots(m_pdict); }
@ -1159,7 +1161,11 @@ public:
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
std::shared_ptr<redisDbPersistentData> createSnapshot();
void endSnapshot(const redisDbPersistentData *psnapshot);
private:
void ensure(const char *key);
void ensure(dictEntry *de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o);
@ -1173,6 +1179,8 @@ private:
// Expire
expireset *m_setexpire;
std::shared_ptr<redisDbPersistentData> m_spdbSnapshot;
};
/* Redis database representation. There are multiple databases identified
@ -2387,7 +2395,7 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */