From 9df7197221b31f72a351237a905fe0adda1c8fa9 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 20 Oct 2019 23:54:05 -0400 Subject: [PATCH] Forkless background save. NOT OPTIMIZED Former-commit-id: bd9d8e05b0430efd226be773c0530828f1f6b428 --- src/cluster.cpp | 19 +++ src/compactvector.h | 12 +- src/db.cpp | 58 +++++-- src/lazyfree.cpp | 2 +- src/networking.cpp | 2 + src/object.cpp | 50 ++++-- src/rax.c | 2 +- src/rdb.cpp | 389 ++++++++++++++++++++++++++------------------ src/rdb.h | 2 +- src/replication.cpp | 5 +- src/server.cpp | 55 ++++++- src/server.h | 46 +++++- 12 files changed, 433 insertions(+), 209 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 497105595..af1eb5971 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4794,6 +4794,7 @@ NULL /* ----------------------------------------------------------------------------- * DUMP, RESTORE and MIGRATE commands * -------------------------------------------------------------------------- */ +ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val); /* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ @@ -4806,6 +4807,9 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) { rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key)); + char szT[32]; + snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); + serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -4941,6 +4945,21 @@ void restoreCommand(client *c) { addReplyError(c,"Bad data format"); return; } + if (rdbLoadType(&payload) == RDB_OPCODE_AUX) + { + robj *auxkey, *auxval; + if ((auxkey = rdbLoadStringObject(&payload)) == NULL) goto eoferr; + if ((auxval = rdbLoadStringObject(&payload)) == NULL) { + decrRefCount(auxkey); + goto eoferr; + } + if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { + obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + } + decrRefCount(auxkey); + decrRefCount(auxval); + } +eoferr: /* Remove the old key if needed. */ if (replace) dbDelete(c->db,c->argv[1]); diff --git a/src/compactvector.h b/src/compactvector.h index daa8ad9fc..4571f9528 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -30,7 +30,16 @@ public: zfree(m_data); } - compactvector(compactvector &) noexcept = delete; + compactvector(const compactvector &src) + { + m_celem = src.m_celem; + m_max = src.m_max; + m_data = (T*)zmalloc(sizeof(T) * m_max, MALLOC_LOCAL); + for (size_t ielem = 0; ielem < m_celem; ++ielem) + { + new (m_data+ielem) T(src[ielem]); + } + } compactvector(compactvector &&src) noexcept { @@ -42,7 +51,6 @@ public: src.m_max = 0; } - compactvector &operator=(const compactvector&) noexcept = delete; compactvector &operator=(compactvector &&src) noexcept { zfree(m_data); diff --git a/src/db.cpp b/src/db.cpp index b41d4f28c..e5ad0328b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -197,7 +197,8 @@ bool dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); bool fInserted = db->insert(copy, val); - val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); + if (g_pserver->fActiveReplica) + val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); if (fInserted) { @@ -635,6 +636,7 @@ bool redisDbPersistentData::iterate(std::function fn) bool fResult = true; while((de = dictNext(di)) != nullptr) { + ensure(de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) { fResult = false; @@ -1798,7 +1800,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) { void redisDbPersistentData::initialize() { - m_pdict = dictCreate(&dbDictType,NULL); + m_pdict = dictCreate(&dbDictType,this); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = false; m_fTrackingChanges = 0; @@ -1862,12 +1864,17 @@ void redisDbPersistentData::clear(void(callback)(void*)) db1->m_fAllChanged = db2->m_fAllChanged; db1->m_setexpire = db2->m_setexpire; db1->m_pstorage = db2->m_pstorage; + db1->m_spdbSnapshot = db2->m_spdbSnapshot; db2->m_pdict = aux.m_pdict; db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fAllChanged = aux.m_fAllChanged; db2->m_setexpire = aux.m_setexpire; db2->m_pstorage = aux.m_pstorage; + db2->m_spdbSnapshot = aux.m_spdbSnapshot; + + db1->m_pdict->privdata = static_cast(db1); + db2->m_pdict->privdata = static_cast(db2); } void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) @@ -1930,10 +1937,20 @@ void redisDbPersistentData::ensure(dictEntry *de) if (m_spdbSnapshot != nullptr) { auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de)); - sds strT = serializeStoredObject(itr.val()); - robj *objNew = deserializeStoredObject(strT, sdslen(strT)); - sdsfree(strT); - dictSetVal(m_pdict, de, objNew); + serverAssert(itr != m_spdbSnapshot->end()); + if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + dictSetVal(m_pdict, de, itr.val()); + } + else + { + sds strT = serializeStoredObject(itr.val()); + robj *objNew = deserializeStoredObject(strT, sdslen(strT)); + sdsfree(strT); + dictSetVal(m_pdict, de, objNew); + serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); + serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); + } } else { @@ -2005,9 +2022,16 @@ void redisDbPersistentData::processChanges() std::shared_ptr redisDbPersistentData::createSnapshot() { + serverAssert(GlobalLocksAcquired()); serverAssert(m_spdbSnapshot == nullptr); auto spdb = std::make_shared(); - spdb->initialize(); + spdb->m_pdict = dictCreate(&dbDictType,spdb.get()); + spdb->m_fAllChanged = false; + spdb->m_fTrackingChanges = 0; + spdb->m_pdict->rehashidx = m_pdict->rehashidx; + spdb->m_pdict->iterators++; // fake an iterator so it doesn't rehash + if (m_setexpire != nullptr) + spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); for (unsigned iht = 0; iht < 2; ++iht) { @@ -2021,7 +2045,7 @@ std::shared_ptr redisDbPersistentData::createSnapshot() { const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx]; dictEntry **pdeDst = &m_pdict->ht[iht].table[idx]; - if (deSrc != nullptr) + while (deSrc != nullptr) { *pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED); (*pdeDst)->key = deSrc->key; @@ -2039,28 +2063,38 @@ std::shared_ptr redisDbPersistentData::createSnapshot() void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) { + serverAssert(GlobalLocksAcquired()); serverAssert(m_spdbSnapshot.get() == psnapshot); dictIterator *di = dictGetIterator(m_pdict); dictEntry *de; while ((de = dictNext(di)) != NULL) { + dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de)); if (dictGetVal(de) == nullptr) { - dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de)); if (deSnapshot != nullptr) { - dictSetVal(m_pdict, de, dictGetVal(deSnapshot)); + de->v.val = deSnapshot->v.val; + deSnapshot->v.val = nullptr; } } + if (deSnapshot && (dictGetKey(deSnapshot) == dictGetKey(de))) + { + // The key is owned by the parent snapshot, so we modify the DB key dtor + // to ensure the key is not free'd during the delete + m_spdbSnapshot->m_pdict->type = &dbSnapshotDictType; + dictDelete(m_spdbSnapshot->m_pdict, dictGetKey(de)); + m_spdbSnapshot->m_pdict->type = &dbDictType; + } } dictReleaseIterator(di); + m_spdbSnapshot->m_pdict->iterators--; m_spdbSnapshot = nullptr; } redisDbPersistentData::~redisDbPersistentData() { dictRelease(m_pdict); - if (m_setexpire) - delete m_setexpire; + delete m_setexpire; } \ No newline at end of file diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 247c5b961..58da000ee 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -115,7 +115,7 @@ void redisDbPersistentData::emptyDbAsync() { dict *oldht1 = m_pdict; auto *set = m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); - m_pdict = dictCreate(&dbDictType,NULL); + m_pdict = dictCreate(&dbDictType,this); if (m_pstorage != nullptr) m_pstorage->clear(); if (m_fTrackingChanges) diff --git a/src/networking.cpp b/src/networking.cpp index 636e95c62..7e88be1d0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1678,6 +1678,8 @@ int handleClientsWithPendingWrites(int iel) { std::unique_locklock)> lock(c->lock); + AeLocker locker; + locker.arm(c); /* Try to write buffers to the client socket. */ if (writeToClient(c->fd,c,0) == C_ERR) { diff --git a/src/object.cpp b/src/object.cpp index 9c296a99c..bd0c908df 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1494,24 +1494,23 @@ void redisObject::setrefcount(unsigned ref) refcount.store(ref, std::memory_order_relaxed); } -sds serializeStoredStringObject(robj_roptr o) +sds serializeStoredStringObject(sds str, robj_roptr o) { - sds str = sdsempty(); - sdscatlen(str, &(*o), sizeof(robj)); + str = sdscatlen(str, &(*o), sizeof(robj)); switch (o->encoding) { case OBJ_ENCODING_RAW: - sdscat(str, szFromObj(o)); + str = sdscatsds(str, (sds)szFromObj(o)); break; case OBJ_ENCODING_INT: break; //nop case OBJ_ENCODING_EMBSTR: - size_t cch = sdslen(szFromObj(o)); + size_t cch = sdslen(szFromObj(o)) + sizeof(struct sdshdr8); if (cch > sizeof(redisObject::m_ptr)) { - sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr)); + str = sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr)); } break; } @@ -1525,10 +1524,14 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) robj *newObject = nullptr; switch (oT->encoding) { - case OBJ_ENCODING_EMBSTR: case OBJ_ENCODING_INT: - newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); + serverAssert(cb == sizeof(robj)); + [[fallthrough]]; + case OBJ_ENCODING_EMBSTR: + newObject = (robj*)zmalloc(cb+1, MALLOC_LOCAL); + ((char*)newObject)[cb] = '\0'; memcpy(newObject, data, cb); + newObject->setrefcount(1); return newObject; case OBJ_ENCODING_RAW: @@ -1536,6 +1539,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) memcpy(newObject, data, sizeof(robj)); newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); + newObject->setrefcount(1); return newObject; } serverPanic("Unknown string object encoding from storage"); @@ -1544,11 +1548,10 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) robj *deserializeStoredObject(const void *data, size_t cb) { - const robj *oT = (const robj*)data; - switch (oT->type) + switch (((char*)data)[0]) { - case OBJ_STRING: - return deserializeStoredStringObject((char*)data, cb); + //case RDB_TYPE_STRING: + // return deserializeStoredStringObject(((char*)data)+1, cb-1); default: rio payload; @@ -1560,6 +1563,21 @@ robj *deserializeStoredObject(const void *data, size_t cb) { serverPanic("Bad data format"); } + if (rdbLoadType(&payload) == RDB_OPCODE_AUX) + { + robj *auxkey, *auxval; + if ((auxkey = rdbLoadStringObject(&payload)) == NULL) goto eoferr; + if ((auxval = rdbLoadStringObject(&payload)) == NULL) { + decrRefCount(auxkey); + goto eoferr; + } + if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { + obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + } + decrRefCount(auxkey); + decrRefCount(auxval); + } + eoferr: return obj; } serverPanic("Unknown object type loading from storage"); @@ -1569,8 +1587,12 @@ sds serializeStoredObject(robj_roptr o) { switch (o->type) { - case OBJ_STRING: - return serializeStoredStringObject(o); + //case OBJ_STRING: + //{ + // sds sdsT = sdsnewlen(nullptr, 1); + // sdsT[0] = RDB_TYPE_STRING; + // return serializeStoredStringObject(sdsT, o); + //} default: rio rdb; diff --git a/src/rax.c b/src/rax.c index b3c263dc4..c5e879ff1 100644 --- a/src/rax.c +++ b/src/rax.c @@ -705,7 +705,7 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void ** nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+ sizeof(raxNode*); if (h->iskey && !h->isnull) nodesize += sizeof(void*); - trimmed = rax_malloc(nodesize); + trimmed = zcalloc(nodesize, MALLOC_LOCAL);//rax_malloc(nodesize); } if (postfixlen) { diff --git a/src/rdb.cpp b/src/rdb.cpp index f872f49ee..f3f53ebe6 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -44,6 +44,7 @@ #include #include #include +#include "aelocker.h" #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) @@ -1105,9 +1106,9 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { if (rsi) { if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) == -1) return -1; - if (rdbSaveAuxFieldStrStr(rdb,"repl-id",g_pserver->replid) + if (rdbSaveAuxFieldStrStr(rdb,"repl-id",rsi->repl_id) == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",g_pserver->master_repl_offset) + if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; } if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; @@ -1156,6 +1157,7 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < cserver.dbnum; j++) { @@ -1179,13 +1181,13 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r /* Iterate this DB writing every entry */ size_t ckeysExpired = 0; - bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool{ + bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool { if (o->FExpires()) ++ckeysExpired; if (!saveKey(rdb, db, flags, &processed, keystr, o)) return false; - return true; + return !g_pserver->rdbThreadVars.fRdbThreadCancel; }); if (!fSavedAll) goto werr; @@ -1196,16 +1198,22 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */ + { + AeLocker lock; + lock.arm(nullptr); if (rsi && dictSize(g_pserver->lua_scripts)) { di = dictGetIterator(g_pserver->lua_scripts); while((de = dictNext(di)) != NULL) { robj *body = (robj*)dictGetVal(de); if (rdbSaveAuxField(rdb,"lua",3,szFromObj(body),sdslen(szFromObj(body))) == -1) goto werr; + if (g_pserver->rdbThreadVars.fRdbThreadCancel) + goto werr; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } + } // AeLocker end scope /* EOF opcode */ if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; @@ -1293,7 +1301,7 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); + snprintf(tmpfile,256,"temp-%d.rdb", g_pserver->rdbThreadVars.tmpfileNum); fp = fopen(tmpfile,"w"); if (!fp) { char *cwdp = getcwd(cwd,MAXPATHLEN); @@ -1331,9 +1339,12 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) } serverLog(LL_NOTICE,"DB saved on disk"); - g_pserver->dirty = 0; - g_pserver->lastsave = time(NULL); - g_pserver->lastbgsave_status = C_OK; + if (serverTL != nullptr) + { + g_pserver->dirty = 0; + g_pserver->lastsave = time(NULL); + g_pserver->lastbgsave_status = C_OK; + } return C_OK; werr: @@ -1343,9 +1354,17 @@ werr: return C_ERR; } -int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) +struct rdbSaveThreadArgs { - int retval = rdbSave(rgpdb, rsi); + rdbSaveInfo rsi; + redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length +}; + +void *rdbSaveThread(void *vargs) +{ + rdbSaveThreadArgs *args = reinterpret_cast(vargs); + serverAssert(serverTL == nullptr); + int retval = rdbSave(args->rgpdb, &args->rsi); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(-1); @@ -1358,11 +1377,41 @@ int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) g_pserver->child_info_data.cow_size = private_dirty; sendChildInfo(CHILD_INFO_TYPE_RDB); } - return (retval == C_OK) ? 0 : 1; + + aeAcquireLock(); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); + aeReleaseLock(); + zfree(args); + return (retval == C_OK) ? (void*)0 : (void*)1; +} + +int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) +{ + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentData*)), MALLOC_LOCAL); + rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT; + if (rsi == nullptr) + rsi = &rsiT; + memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); + memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); + args->rsi.master_repl_offset = g_pserver->master_repl_offset; + + for (int idb = 0; idb < cserver.dbnum; ++idb) + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get(); + + g_pserver->rdbThreadVars.tmpfileNum++; + g_pserver->rdbThreadVars.fRdbThreadCancel = false; + if (pthread_create(&child, NULL, rdbSaveThread, args)) { + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); + zfree(args); + return C_ERR; + } + return C_OK; } int rdbSaveBackground(rdbSaveInfo *rsi) { - pid_t childpid; + pthread_t child; long long start; if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR; @@ -1372,39 +1421,32 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { openChildInfoPipe(); start = ustime(); - if ((childpid = fork()) == 0) { - /* Child */ - closeListeningSockets(0); - redisSetProcTitle("keydb-rdb-bgsave"); - int rval = rdbSaveThread(nullptr, rsi); - exitFromChild(rval); - } else { - /* Parent */ - g_pserver->stat_fork_time = ustime()-start; - g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); - if (childpid == -1) { - closeChildInfoPipe(); - g_pserver->lastbgsave_status = C_ERR; - serverLog(LL_WARNING,"Can't save in background: fork: %s", - strerror(errno)); - return C_ERR; - } - serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); - g_pserver->rdb_save_time_start = time(NULL); - g_pserver->rdb_child_pid = childpid; - g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK; - updateDictResizePolicy(); - return C_OK; + + g_pserver->stat_fork_time = ustime()-start; + g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); + if (launchRdbSaveThread(child, rsi) != C_OK) { + closeChildInfoPipe(); + g_pserver->lastbgsave_status = C_ERR; + serverLog(LL_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + return C_ERR; } - return C_OK; /* unreached */ + serverLog(LL_NOTICE,"Background saving started"); + g_pserver->rdb_save_time_start = time(NULL); + g_pserver->rdbThreadVars.fRdbThreadActive = true; + g_pserver->rdbThreadVars.rdb_child_thread = child; + g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK; + updateDictResizePolicy(); + + return C_OK; } -void rdbRemoveTempFile(pid_t childpid) { +void rdbRemoveTempFile(int tmpfileNum) { char tmpfile[256]; - snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid); + snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", tmpfileNum); unlink(tmpfile); } @@ -2249,7 +2291,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { serverLog(LL_WARNING, "Background saving terminated by signal %d", bysignal); latencyStartMonitor(latency); - rdbRemoveTempFile(g_pserver->rdb_child_pid); + rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum); latencyEndMonitor(latency); latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); /* SIGUSR1 is whitelisted, so we have a way to kill a child without @@ -2257,7 +2299,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { if (bysignal != SIGUSR1) g_pserver->lastbgsave_status = C_ERR; } - g_pserver->rdb_child_pid = -1; + g_pserver->rdbThreadVars.fRdbThreadActive = false; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_save_time_last = time(NULL)-g_pserver->rdb_save_time_start; g_pserver->rdb_save_time_start = -1; @@ -2282,7 +2324,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { serverLog(LL_WARNING, "Background transfer terminated by signal %d", bysignal); } - g_pserver->rdb_child_pid = -1; + g_pserver->rdbThreadVars.fRdbThreadActive = false; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_save_time_start = -1; @@ -2381,24 +2423,112 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { * the child did not exit for an error, but because we wanted), and performs * the cleanup needed. */ void killRDBChild(void) { - kill(g_pserver->rdb_child_pid,SIGUSR1); - rdbRemoveTempFile(g_pserver->rdb_child_pid); + g_pserver->rdbThreadVars.fRdbThreadCancel = true; + void *rval; + pthread_join(g_pserver->rdbThreadVars.rdb_child_thread,&rval); + g_pserver->rdbThreadVars.fRdbThreadActive = false; + g_pserver->rdbThreadVars.fRdbThreadCancel = false; + rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum); closeChildInfoPipe(); updateDictResizePolicy(); } +struct rdbSaveSocketThreadArgs +{ + rdbSaveInfo rsi; + int *fds; + int numfds; + uint64_t *clientids; + redisDbPersistentData *rgpdb[1]; +}; +void *rdbSaveToSlavesSocketsThread(void *vargs) +{ + /* Child */ + serverAssert(serverTL == nullptr); + rdbSaveSocketThreadArgs *args = (rdbSaveSocketThreadArgs*)vargs; + int retval; + rio slave_sockets; + + rioInitWithFdset(&slave_sockets,args->fds,args->numfds); + zfree(args->fds); + args->fds = nullptr; + + retval = rdbSaveRioWithEOFMark(&slave_sockets,args->rgpdb,NULL,&args->rsi); + if (retval == C_OK && rioFlush(&slave_sockets) == 0) + retval = C_ERR; + + if (retval == C_OK) { + size_t private_dirty = zmalloc_get_private_dirty(-1); + + if (private_dirty) { + serverLog(LL_NOTICE, + "RDB: %zu MB of memory used by copy-on-write", + private_dirty/(1024*1024)); + } + + g_pserver->child_info_data.cow_size = private_dirty; + sendChildInfo(CHILD_INFO_TYPE_RDB); + + /* If we are returning OK, at least one replica was served + * with the RDB file as expected, so we need to send a report + * to the parent via the pipe. The format of the message is: + * + * ... + * + * len, replica IDs, and replica errors, are all uint64_t integers, + * so basically the reply is composed of 64 bits for the len field + * plus 2 additional 64 bit integers for each entry, for a total + * of 'len' entries. + * + * The 'id' represents the replica's client ID, so that the master + * can match the report with a specific replica, and 'error' is + * set to 0 if the replication process terminated with a success + * or the error code if an error occurred. */ + void *msg = zmalloc(sizeof(uint64_t)*(1+2*args->numfds), MALLOC_LOCAL); + uint64_t *len = (uint64_t*)msg; + uint64_t *ids = len+1; + int j, msglen; + + *len = args->numfds; + for (j = 0; j < args->numfds; j++) { + *ids++ = args->clientids[j]; + *ids++ = slave_sockets.io.fdset.state[j]; + } + + /* Write the message to the parent. If we have no good slaves or + * we are unable to transfer the message to the parent, we exit + * with an error so that the parent will abort the replication + * process with all the childre that were waiting. */ + msglen = sizeof(uint64_t)*(1+2*args->numfds); + if (*len == 0 || + write(g_pserver->rdb_pipe_write_result_to_parent,msg,msglen) + != msglen) + { + retval = C_ERR; + } + zfree(msg); + } + aeAcquireLock(); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); + aeReleaseLock(); + zfree(args->clientids); + zfree(args); + rioFreeFdset(&slave_sockets); + + return (retval == C_OK) ? (void*)0 : (void*)1; +} + /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { serverAssert(GlobalLocksAcquired()); - int *fds; - uint64_t *clientids; - int numfds; listNode *ln; listIter li; - pid_t childpid; + pthread_t child; long long start; int pipefds[2]; + rdbSaveSocketThreadArgs *args = nullptr; if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR; @@ -2409,22 +2539,27 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { g_pserver->rdb_pipe_read_result_from_child = pipefds[0]; g_pserver->rdb_pipe_write_result_to_parent = pipefds[1]; + args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentData*)*(cserver.dbnum-1), MALLOC_LOCAL); + /* Collect the file descriptors of the slaves we want to transfer * the RDB to, which are i WAIT_BGSAVE_START state. */ - fds = (int*)zmalloc(sizeof(int)*listLength(g_pserver->slaves), MALLOC_LOCAL); + args->fds = (int*)zmalloc(sizeof(int)*listLength(g_pserver->slaves), MALLOC_LOCAL); /* We also allocate an array of corresponding client IDs. This will * be useful for the child process in order to build the report * (sent via unix pipe) that will be sent to the parent. */ - clientids = (uint64_t*)zmalloc(sizeof(uint64_t)*listLength(g_pserver->slaves), MALLOC_LOCAL); - numfds = 0; + args->clientids = (uint64_t*)zmalloc(sizeof(uint64_t)*listLength(g_pserver->slaves), MALLOC_LOCAL); + args->numfds = 0; + memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); + memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); + args->rsi.master_repl_offset = g_pserver->master_repl_offset; listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - clientids[numfds] = replica->id; - fds[numfds++] = replica->fd; + args->clientids[args->numfds] = replica->id; + args->fds[args->numfds++] = replica->fd; replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); /* Put the socket in blocking mode to simplify RDB transfer. * We'll restore it when the children returns (since duped socket @@ -2437,121 +2572,52 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { /* Create the child process. */ openChildInfoPipe(); start = ustime(); - if ((childpid = fork()) == 0) { - /* Child */ - int retval; - rio slave_sockets; - rioInitWithFdset(&slave_sockets,fds,numfds); - zfree(fds); + for (int idb = 0; idb < cserver.dbnum; ++idb) + args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get(); - closeListeningSockets(0); - redisSetProcTitle("keydb-rdb-to-slaves"); + g_pserver->rdbThreadVars.tmpfileNum++; + g_pserver->rdbThreadVars.fRdbThreadCancel = false; + if (pthread_create(&child, nullptr, rdbSaveToSlavesSocketsThread, args)) { + serverLog(LL_WARNING,"Can't save in background: fork: %s", + strerror(errno)); - std::vector vecpdb; - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - vecpdb.push_back(&g_pserver->db[idb]); - } + /* Undo the state change. The caller will perform cleanup on + * all the slaves in BGSAVE_START state, but an early call to + * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ + listRewind(g_pserver->slaves,&li); + while((ln = listNext(&li))) { + client *replica = (client*)ln->value; + int j; - retval = rdbSaveRioWithEOFMark(&slave_sockets,vecpdb.data(),NULL,rsi); - if (retval == C_OK && rioFlush(&slave_sockets) == 0) - retval = C_ERR; - - if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - g_pserver->child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); - - /* If we are returning OK, at least one replica was served - * with the RDB file as expected, so we need to send a report - * to the parent via the pipe. The format of the message is: - * - * ... - * - * len, replica IDs, and replica errors, are all uint64_t integers, - * so basically the reply is composed of 64 bits for the len field - * plus 2 additional 64 bit integers for each entry, for a total - * of 'len' entries. - * - * The 'id' represents the replica's client ID, so that the master - * can match the report with a specific replica, and 'error' is - * set to 0 if the replication process terminated with a success - * or the error code if an error occurred. */ - void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds), MALLOC_LOCAL); - uint64_t *len = (uint64_t*)msg; - uint64_t *ids = len+1; - int j, msglen; - - *len = numfds; - for (j = 0; j < numfds; j++) { - *ids++ = clientids[j]; - *ids++ = slave_sockets.io.fdset.state[j]; - } - - /* Write the message to the parent. If we have no good slaves or - * we are unable to transfer the message to the parent, we exit - * with an error so that the parent will abort the replication - * process with all the childre that were waiting. */ - msglen = sizeof(uint64_t)*(1+2*numfds); - if (*len == 0 || - write(g_pserver->rdb_pipe_write_result_to_parent,msg,msglen) - != msglen) - { - retval = C_ERR; - } - zfree(msg); - } - zfree(clientids); - rioFreeFdset(&slave_sockets); - exitFromChild((retval == C_OK) ? 0 : 1); - } else { - /* Parent */ - if (childpid == -1) { - serverLog(LL_WARNING,"Can't save in background: fork: %s", - strerror(errno)); - - /* Undo the state change. The caller will perform cleanup on - * all the slaves in BGSAVE_START state, but an early call to - * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */ - listRewind(g_pserver->slaves,&li); - while((ln = listNext(&li))) { - client *replica = (client*)ln->value; - int j; - - for (j = 0; j < numfds; j++) { - if (replica->id == clientids[j]) { - replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - break; - } + for (j = 0; j < args->numfds; j++) { + if (replica->id == args->clientids[j]) { + replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + break; } } - close(pipefds[0]); - close(pipefds[1]); - closeChildInfoPipe(); - } else { - g_pserver->stat_fork_time = ustime()-start; - g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); - - serverLog(LL_NOTICE,"Background RDB transfer started by pid %d", - childpid); - g_pserver->rdb_save_time_start = time(NULL); - g_pserver->rdb_child_pid = childpid; - g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; - updateDictResizePolicy(); } - zfree(clientids); - zfree(fds); - return (childpid == -1) ? C_ERR : C_OK; + close(pipefds[0]); + close(pipefds[1]); + closeChildInfoPipe(); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].endSnapshot(args->rgpdb[idb]); + zfree(args->clientids); + zfree(args->fds); + zfree(args); + return C_ERR; } + + g_pserver->stat_fork_time = ustime()-start; + g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); + + serverLog(LL_NOTICE,"Background RDB transfer started"); + g_pserver->rdb_save_time_start = time(NULL); + g_pserver->rdbThreadVars.rdb_child_thread = child; + g_pserver->rdbThreadVars.fRdbThreadActive = true; + g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; + updateDictResizePolicy(); return C_OK; /* Unreached. */ } @@ -2619,6 +2685,9 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; *rsi = rsi_init; + memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); + rsi->master_repl_offset = g_pserver->master_repl_offset; + /* If the instance is a master, we can populate the replication info * only when repl_backlog is not NULL. If the repl_backlog is NULL, * it means that the instance isn't in any replication chains. In this diff --git a/src/rdb.h b/src/rdb.h index b2d9a9218..4ba92ae00 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -139,7 +139,7 @@ int rdbLoad(rdbSaveInfo *rsi); int rdbLoadFile(const char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); -void rdbRemoveTempFile(pid_t childpid); +void rdbRemoveTempFile(int childpid); int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); diff --git a/src/replication.cpp b/src/replication.cpp index 748f3d891..f43080ba2 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1514,9 +1514,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_NOTICE, "Replica is about to load the RDB file received from the " "master, but there is a pending RDB child running. " - "Killing process %ld and removing its temp file to avoid " - "any race", - (long) g_pserver->rdb_child_pid); + "Cancelling RDB the save and removing its temp file to avoid " + "any race"); killRDBChild(); } diff --git a/src/server.cpp b/src/server.cpp index 9cc5752fa..6d46d7350 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1179,6 +1179,21 @@ int dictSdsKeyCompare(void *privdata, const void *key1, return memcmp(key1, key2, l1) == 0; } +void dictSdsNOPDestructor(void *, void *) {} + +void dictDbKeyDestructor(void *privdata, void *key) +{ + redisDbPersistentData *owner = (redisDbPersistentData*)privdata; + serverAssert(owner != nullptr); + if (owner->m_spdbSnapshot != nullptr) + { + dictEntry *deSnapshot = dictFind(owner->m_spdbSnapshot->m_pdict, key); + if (deSnapshot && (key == dictGetKey(deSnapshot))) + return; // don't free, it's now owned by the snapshot + } + sdsfree((sds)key); +} + /* A case insensitive version used for the command lookup table and other * places where case insensitive non binary-safe comparison is needed. */ int dictSdsKeyCaseCompare(void *privdata, const void *key1, @@ -1314,10 +1329,19 @@ dictType dbDictType = { NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ + dictDbKeyDestructor, /* key destructor */ dictObjectDestructor /* val destructor */ }; +dictType dbSnapshotDictType = { + dictSdsHash, + NULL, + NULL, + dictSdsKeyCompare, + dictSdsNOPDestructor, + dictObjectDestructor, +}; + /* g_pserver->lua_scripts sha (as sds string) -> scripts (as robj) cache. */ dictType shaScriptObjectDictType = { dictSdsCaseHash, /* hash function */ @@ -1934,7 +1958,24 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int statloc; pid_t pid; - if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { + if (g_pserver->FRdbSaveInProgress()) + { + void *rval = nullptr; + if (pthread_tryjoin_np(g_pserver->rdbThreadVars.rdb_child_thread, &rval)) + { + if (errno != EBUSY && errno != EAGAIN) + serverLog(LL_WARNING, "Error joining the background RDB save thread: %s\n", strerror(errno)); + } + else + { + int exitcode = (int)reinterpret_cast(rval); + backgroundSaveDoneHandler(exitcode,0); + if (exitcode == 0) receiveChildInfo(); + updateDictResizePolicy(); + closeChildInfoPipe(); + } + } + else if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; @@ -1942,13 +1983,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " - "rdb_child_pid = %d, aof_child_pid = %d", + "aof_child_pid = %d", strerror(errno), - (int) g_pserver->rdb_child_pid, (int) g_pserver->aof_child_pid); - } else if (pid == g_pserver->rdb_child_pid) { - backgroundSaveDoneHandler(exitcode,bysignal); - if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == g_pserver->aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); @@ -2971,7 +3008,7 @@ void initServer(void) { listSetFreeMethod(g_pserver->pubsub_patterns,freePubsubPattern); listSetMatchMethod(g_pserver->pubsub_patterns,listMatchPubsubPattern); g_pserver->cronloops = 0; - g_pserver->rdb_child_pid = -1; + g_pserver->rdbThreadVars.fRdbThreadActive = false; g_pserver->aof_child_pid = -1; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_bgsave_scheduled = 0; @@ -4757,7 +4794,7 @@ static void sigShutdownHandler(int sig) { * on disk. */ if (g_pserver->shutdown_asap && sig == SIGINT) { serverLogFromHandler(LL_WARNING, "You insist... exiting now."); - rdbRemoveTempFile(getpid()); + rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum); exit(1); /* Exit with an error since this was not a clean shutdown. */ } else if (g_pserver->loading) { serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now."); diff --git a/src/server.h b/src/server.h index fbe57ff1c..13f27a949 100644 --- a/src/server.h +++ b/src/server.h @@ -791,6 +791,16 @@ public: : when(when), spsubkey(subkey, sdsfree) {} + subexpireEntry(const subexpireEntry &e) + : when(e.when), spsubkey(nullptr, sdsfree) + { + if (e.spsubkey) + spsubkey = std::unique_ptr((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree); + } + + subexpireEntry(subexpireEntry &&e) = default; + subexpireEntry& operator=(subexpireEntry &&e) = default; + bool operator<(long long when) const noexcept { return this->when < when; } bool operator<(const subexpireEntry &se) { return this->when < se.when; } }; @@ -803,6 +813,10 @@ public: expireEntryFat(sds keyPrimary) : m_keyPrimary(keyPrimary) {} + + expireEntryFat(const expireEntryFat &e) = default; + expireEntryFat(expireEntryFat &&e) = default; + long long when() const noexcept { return m_vecexpireEntries.front().when; } const char *key() const noexcept { return m_keyPrimary; } @@ -904,6 +918,14 @@ public: m_when = LLONG_MIN; } + expireEntry(const expireEntry &e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + if (e.FFat()) + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); + } + expireEntry(expireEntry &&e) { u.m_key = e.u.m_key; @@ -1082,6 +1104,7 @@ public: class redisDbPersistentData { + friend void dictDbKeyDestructor(void *privdata, void *key); public: ~redisDbPersistentData(); @@ -1131,6 +1154,9 @@ public: return m_setexpire->find(key); } + dict_iter end() { return dict_iter(nullptr); } + dict_const_iter end() const { return dict_const_iter(nullptr); } + void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } @@ -1171,14 +1197,14 @@ private: void storeKey(const char *key, size_t cchKey, robj *o); // Keyspace - dict *m_pdict; /* The keyspace for this DB */ + dict *m_pdict = nullptr; /* The keyspace for this DB */ int m_fTrackingChanges = 0; // Note: Stack based bool m_fAllChanged = false; std::set m_setchanged; IStorage *m_pstorage = nullptr; // Expire - expireset *m_setexpire; + expireset *m_setexpire = nullptr; std::shared_ptr m_spdbSnapshot; }; @@ -1212,8 +1238,6 @@ typedef struct redisDb : public redisDbPersistentData {} void initialize(int id); - const_iter end() { return const_iter(nullptr); } - void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); @@ -1564,6 +1588,9 @@ typedef struct rdbSaveInfo { char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ long long repl_offset; /* Replication offset. */ int fForceSetKey; + + /* Used In Save */ + long long master_repl_offset; } rdbSaveInfo; #define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} @@ -1821,7 +1848,13 @@ struct redisServer { /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ - pid_t rdb_child_pid; /* PID of RDB saving child */ + struct _rdbThreadVars + { + bool fRdbThreadActive = false; + volatile bool fRdbThreadCancel = false; + pthread_t rdb_child_thread; /* PID of RDB saving child */ + int tmpfileNum = 0; + } rdbThreadVars; struct saveparam *saveparams; /* Save points array for RDB */ int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ @@ -2001,7 +2034,7 @@ struct redisServer { /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ - bool FRdbSaveInProgress() const { return rdb_child_pid != -1; } + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; typedef struct pubsubPattern { @@ -2107,6 +2140,7 @@ extern dictType zsetDictType; extern dictType clusterNodesDictType; extern dictType clusterNodesBlackListDictType; extern dictType dbDictType; +extern dictType dbSnapshotDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern dictType hashDictType;