From 5fb7d4a325d1d3ea9c9288bf0753dac2dadc0bce Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 17 Oct 2019 22:53:50 -0400 Subject: [PATCH] Dependencies for forkless RDB saving Former-commit-id: c12be594754ac8079f467e0fa9f8580bd65e3a55 --- src/aof.cpp | 7 +++- src/db.cpp | 95 +++++++++++++++++++++++++++++++++++++++++++++----- src/debug.cpp | 2 +- src/rdb-s3.cpp | 4 +-- src/rdb.cpp | 81 +++++++++++++++++++++++++++--------------- src/rdb.h | 8 ++--- src/server.cpp | 2 +- src/server.h | 10 +++++- 8 files changed, 162 insertions(+), 47 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 1d10474da..313654d59 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1411,7 +1411,12 @@ int rewriteAppendOnlyFile(char *filename) { if (g_pserver->aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { + std::vector vecpdb; + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + vecpdb.push_back(&g_pserver->db[idb]); + } + if (rdbSaveRio(&aof,vecpdb.data(),&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } diff --git a/src/db.cpp b/src/db.cpp index 49e2d127a..eaf3cec08 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -552,7 +552,7 @@ void flushallCommand(client *c) { int saved_dirty = g_pserver->dirty; rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(rsiptr); + rdbSave(nullptr, rsiptr); g_pserver->dirty = saved_dirty; } g_pserver->dirty++; @@ -1917,17 +1917,34 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val) dictSetVal(m_pdict, itr.de, val); } +void redisDbPersistentData::ensure(const char *key) +{ + dictEntry *de = dictFind(m_pdict, key); + ensure(de); +} + void redisDbPersistentData::ensure(dictEntry *de) { if (de != nullptr && dictGetVal(de) == nullptr) { - serverAssert(m_pstorage != nullptr); - sds key = (sds)dictGetKey(de); - m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(data, cb); - serverAssert(o != nullptr); - dictSetVal(m_pdict, de, o); - }); + if (m_spdbSnapshot != nullptr) + { + auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de)); + sds strT = serializeStoredObject(itr.val()); + robj *objNew = deserializeStoredObject(strT, sdslen(strT)); + sdsfree(strT); + dictSetVal(m_pdict, de, objNew); + } + else + { + serverAssert(m_pstorage != nullptr); + sds key = (sds)dictGetKey(de); + m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ + robj *o = deserializeStoredObject(data, cb); + serverAssert(o != nullptr); + dictSetVal(m_pdict, de, o); + }); + } } } @@ -1984,4 +2001,66 @@ void redisDbPersistentData::processChanges() } } m_setchanged.clear(); +} + +std::shared_ptr redisDbPersistentData::createSnapshot() +{ + serverAssert(m_spdbSnapshot == nullptr); + auto spdb = std::make_shared(); + spdb->initialize(); + + for (unsigned iht = 0; iht < 2; ++iht) + { + spdb->m_pdict->ht[iht] = m_pdict->ht[iht]; + if (m_pdict->ht[iht].size) + m_pdict->ht[iht].table = (dictEntry**)zcalloc(m_pdict->ht[iht].size*sizeof(dictEntry*), MALLOC_SHARED); + else + m_pdict->ht[iht].table = nullptr; + + for (size_t idx = 0; idx < m_pdict->ht[iht].size; ++idx) + { + const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx]; + dictEntry **pdeDst = &m_pdict->ht[iht].table[idx]; + if (deSrc != nullptr) + { + *pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED); + (*pdeDst)->key = deSrc->key; + (*pdeDst)->v.val = nullptr; + (*pdeDst)->next = nullptr; + pdeDst = &(*pdeDst)->next; + deSrc = deSrc->next; + } + } + } + + m_spdbSnapshot = std::move(spdb); + return m_spdbSnapshot; +} + +void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) +{ + serverAssert(m_spdbSnapshot.get() == psnapshot); + + dictIterator *di = dictGetIterator(m_pdict); + dictEntry *de; + while ((de = dictNext(di)) != NULL) + { + if (dictGetVal(de) == nullptr) + { + dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de)); + if (deSnapshot != nullptr) + { + dictSetVal(m_pdict, de, dictGetVal(deSnapshot)); + } + } + } + dictReleaseIterator(di); + m_spdbSnapshot = nullptr; +} + +redisDbPersistentData::~redisDbPersistentData() +{ + dictRelease(m_pdict); + if (m_setexpire) + delete m_setexpire; } \ No newline at end of file diff --git a/src/debug.cpp b/src/debug.cpp index dc09e1e25..db2052dd7 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -359,7 +359,7 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"reload")) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(rsiptr) != C_OK) { + if (rdbSave(nullptr, rsiptr) != C_OK) { addReply(c,shared.err); return; } diff --git a/src/rdb-s3.cpp b/src/rdb-s3.cpp index 39be87f83..3b85a5f14 100644 --- a/src/rdb-s3.cpp +++ b/src/rdb-s3.cpp @@ -6,7 +6,7 @@ extern "C" { #include /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) +int rdbSaveS3(char *s3bucket, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) { int status = EXIT_FAILURE; int fd[2]; @@ -40,7 +40,7 @@ int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) return C_ERR; } - if (rdbSaveFp(fp, rsi) != C_OK) + if (rdbSaveFp(fp, rgpdb, rsi) != C_OK) { fclose(fp); return C_ERR; diff --git a/src/rdb.cpp b/src/rdb.cpp index 268a3c556..aa87fa813 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -42,6 +42,8 @@ #include #include #include +#include +#include #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) @@ -1142,7 +1144,7 @@ int saveKey(rio *rdb, redisDbPersistentData *db, int flags, size_t *processed, c * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi) { dictEntry *de; dictIterator *di = NULL; char magic[10]; @@ -1157,7 +1159,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < cserver.dbnum; j++) { - redisDbPersistentData *db = static_cast(g_pserver->db+j); + redisDbPersistentData *db = rgpdb[j]; if (db->size() == 0) continue; /* Write the SELECT DB opcode */ @@ -1229,7 +1231,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { +int rdbSaveRioWithEOFMark(rio *rdb, redisDbPersistentData **rgpdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); @@ -1237,7 +1239,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(rdb,rgpdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; return C_OK; @@ -1247,7 +1249,7 @@ werr: /* Write error. */ return C_ERR; } -int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi) +int rdbSaveFp(FILE *fp, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) { int error = 0; rio rdb; @@ -1257,26 +1259,36 @@ int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi) if (g_pserver->rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { + if (rdbSaveRio(&rdb,rgpdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { errno = error; return C_ERR; } return C_OK; } -int rdbSave(rdbSaveInfo *rsi) +int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) { + std::vector vecdb; + if (rgpdb == nullptr) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + vecdb.push_back(&g_pserver->db[idb]); + } + rgpdb = vecdb.data(); + } + int err = C_OK; if (g_pserver->rdb_filename != NULL) - err = rdbSaveFile(g_pserver->rdb_filename, rsi); + err = rdbSaveFile(g_pserver->rdb_filename, rgpdb, rsi); if (err == C_OK && g_pserver->rdb_s3bucketpath != NULL) - err = rdbSaveS3(g_pserver->rdb_s3bucketpath, rsi); + err = rdbSaveS3(g_pserver->rdb_s3bucketpath, rgpdb, rsi); return err; } /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSaveFile(char *filename, rdbSaveInfo *rsi) { +int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; @@ -1294,7 +1306,7 @@ int rdbSaveFile(char *filename, rdbSaveInfo *rsi) { return C_ERR; } - if (rdbSaveFp(fp, rsi) == C_ERR){ + if (rdbSaveFp(fp, rgpdb, rsi) == C_ERR){ goto werr; } @@ -1331,6 +1343,24 @@ werr: return C_ERR; } +int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) +{ + int retval = rdbSave(rgpdb, rsi); + if (retval == C_OK) { + size_t private_dirty = zmalloc_get_private_dirty(-1); + + if (private_dirty) { + serverLog(LL_NOTICE, + "RDB: %zu MB of memory used by copy-on-write", + private_dirty/(1024*1024)); + } + + g_pserver->child_info_data.cow_size = private_dirty; + sendChildInfo(CHILD_INFO_TYPE_RDB); + } + return (retval == C_OK) ? 0 : 1; +} + int rdbSaveBackground(rdbSaveInfo *rsi) { pid_t childpid; long long start; @@ -1343,25 +1373,11 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { start = ustime(); if ((childpid = fork()) == 0) { - int retval; - /* Child */ closeListeningSockets(0); redisSetProcTitle("keydb-rdb-bgsave"); - retval = rdbSave(rsi); - if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - g_pserver->child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); - } - exitFromChild((retval == C_OK) ? 0 : 1); + int rval = rdbSaveThread(nullptr, rsi); + exitFromChild(rval); } else { /* Parent */ g_pserver->stat_fork_time = ustime()-start; @@ -1379,6 +1395,7 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { g_pserver->rdb_child_pid = childpid; g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK; updateDictResizePolicy(); + return C_OK; } return C_OK; /* unreached */ @@ -2431,7 +2448,13 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { closeListeningSockets(0); redisSetProcTitle("keydb-rdb-to-slaves"); - retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); + std::vector vecpdb; + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + vecpdb.push_back(&g_pserver->db[idb]); + } + + retval = rdbSaveRioWithEOFMark(&slave_sockets,vecpdb.data(),NULL,rsi); if (retval == C_OK && rioFlush(&slave_sockets) == 0) retval = C_ERR; @@ -2539,7 +2562,7 @@ void saveCommand(client *c) { } rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(rsiptr) == C_OK) { + if (rdbSave(nullptr, rsiptr) == C_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); diff --git a/src/rdb.h b/src/rdb.h index edf43d422..b2d9a9218 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -140,10 +140,10 @@ int rdbLoadFile(const char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); -int rdbSave(rdbSaveInfo *rsi); -int rdbSaveFile(char *filename, rdbSaveInfo *rsi); -int rdbSaveFp(FILE *pf, rdbSaveInfo *rsi); -int rdbSaveS3(char *path, rdbSaveInfo *rsi); +int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); +int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); +int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); +int rdbSaveS3(char *path, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key); size_t rdbSavedObjectLen(robj *o); diff --git a/src/server.cpp b/src/server.cpp index ba8bfb4d6..1a17f2796 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3773,7 +3773,7 @@ int prepareForShutdown(int flags) { /* Snapshotting. Perform a SYNC SAVE and exit */ rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); - if (rdbSave(rsiptr) != C_OK) { + if (rdbSave(nullptr, rsiptr) != C_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background diff --git a/src/server.h b/src/server.h index 7bb0517e0..26e17325a 100644 --- a/src/server.h +++ b/src/server.h @@ -1083,6 +1083,8 @@ public: class redisDbPersistentData { public: + ~redisDbPersistentData(); + static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); size_t slots() const { return dictSlots(m_pdict); } @@ -1159,7 +1161,11 @@ public: expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } + std::shared_ptr createSnapshot(); + void endSnapshot(const redisDbPersistentData *psnapshot); + private: + void ensure(const char *key); void ensure(dictEntry *de); void storeDatabase(); void storeKey(const char *key, size_t cchKey, robj *o); @@ -1173,6 +1179,8 @@ private: // Expire expireset *m_setexpire; + + std::shared_ptr m_spdbSnapshot; }; /* Redis database representation. There are multiple databases identified @@ -2387,7 +2395,7 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */