Forkless background save. NOT OPTIMIZED

Former-commit-id: bd9d8e05b0430efd226be773c0530828f1f6b428
This commit is contained in:
John Sully 2019-10-20 23:54:05 -04:00
parent 764a05fedd
commit 9df7197221
12 changed files with 433 additions and 209 deletions

View File

@ -4794,6 +4794,7 @@ NULL
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands * DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val);
/* Generates a DUMP-format representation of the object 'o', adding it to the /* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */ * io stream pointed by 'rio'. This function can't fail. */
@ -4806,6 +4807,9 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) {
rioInitWithBuffer(payload,sdsempty()); rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key)); serverAssert(rdbSaveObject(payload,o,key));
char szT[32];
snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp);
serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1);
/* Write the footer, this is how it looks like: /* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+ * ----------------+---------------------+---------------+
@ -4941,6 +4945,21 @@ void restoreCommand(client *c) {
addReplyError(c,"Bad data format"); addReplyError(c,"Bad data format");
return; return;
} }
if (rdbLoadType(&payload) == RDB_OPCODE_AUX)
{
robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(&payload)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(&payload)) == NULL) {
decrRefCount(auxkey);
goto eoferr;
}
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
}
decrRefCount(auxkey);
decrRefCount(auxval);
}
eoferr:
/* Remove the old key if needed. */ /* Remove the old key if needed. */
if (replace) dbDelete(c->db,c->argv[1]); if (replace) dbDelete(c->db,c->argv[1]);

View File

@ -30,7 +30,16 @@ public:
zfree(m_data); zfree(m_data);
} }
compactvector(compactvector &) noexcept = delete; compactvector(const compactvector &src)
{
m_celem = src.m_celem;
m_max = src.m_max;
m_data = (T*)zmalloc(sizeof(T) * m_max, MALLOC_LOCAL);
for (size_t ielem = 0; ielem < m_celem; ++ielem)
{
new (m_data+ielem) T(src[ielem]);
}
}
compactvector(compactvector &&src) noexcept compactvector(compactvector &&src) noexcept
{ {
@ -42,7 +51,6 @@ public:
src.m_max = 0; src.m_max = 0;
} }
compactvector &operator=(const compactvector&) noexcept = delete;
compactvector &operator=(compactvector &&src) noexcept compactvector &operator=(compactvector &&src) noexcept
{ {
zfree(m_data); zfree(m_data);

View File

@ -197,7 +197,8 @@ bool dbAddCore(redisDb *db, robj *key, robj *val) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdup(szFromObj(key)); sds copy = sdsdup(szFromObj(key));
bool fInserted = db->insert(copy, val); bool fInserted = db->insert(copy, val);
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); if (g_pserver->fActiveReplica)
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
if (fInserted) if (fInserted)
{ {
@ -635,6 +636,7 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
bool fResult = true; bool fResult = true;
while((de = dictNext(di)) != nullptr) while((de = dictNext(di)) != nullptr)
{ {
ensure(de);
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
{ {
fResult = false; fResult = false;
@ -1798,7 +1800,7 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
void redisDbPersistentData::initialize() void redisDbPersistentData::initialize()
{ {
m_pdict = dictCreate(&dbDictType,NULL); m_pdict = dictCreate(&dbDictType,this);
m_setexpire = new(MALLOC_LOCAL) expireset(); m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = false; m_fAllChanged = false;
m_fTrackingChanges = 0; m_fTrackingChanges = 0;
@ -1862,12 +1864,17 @@ void redisDbPersistentData::clear(void(callback)(void*))
db1->m_fAllChanged = db2->m_fAllChanged; db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire; db1->m_setexpire = db2->m_setexpire;
db1->m_pstorage = db2->m_pstorage; db1->m_pstorage = db2->m_pstorage;
db1->m_spdbSnapshot = db2->m_spdbSnapshot;
db2->m_pdict = aux.m_pdict; db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged; db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire; db2->m_setexpire = aux.m_setexpire;
db2->m_pstorage = aux.m_pstorage; db2->m_pstorage = aux.m_pstorage;
db2->m_spdbSnapshot = aux.m_spdbSnapshot;
db1->m_pdict->privdata = static_cast<redisDbPersistentData*>(db1);
db2->m_pdict->privdata = static_cast<redisDbPersistentData*>(db2);
} }
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
@ -1930,10 +1937,20 @@ void redisDbPersistentData::ensure(dictEntry *de)
if (m_spdbSnapshot != nullptr) if (m_spdbSnapshot != nullptr)
{ {
auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de)); auto itr = m_spdbSnapshot->find((const char*)dictGetKey(de));
sds strT = serializeStoredObject(itr.val()); serverAssert(itr != m_spdbSnapshot->end());
robj *objNew = deserializeStoredObject(strT, sdslen(strT)); if (itr.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
sdsfree(strT); {
dictSetVal(m_pdict, de, objNew); dictSetVal(m_pdict, de, itr.val());
}
else
{
sds strT = serializeStoredObject(itr.val());
robj *objNew = deserializeStoredObject(strT, sdslen(strT));
sdsfree(strT);
dictSetVal(m_pdict, de, objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
}
} }
else else
{ {
@ -2005,9 +2022,16 @@ void redisDbPersistentData::processChanges()
std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot() std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot()
{ {
serverAssert(GlobalLocksAcquired());
serverAssert(m_spdbSnapshot == nullptr); serverAssert(m_spdbSnapshot == nullptr);
auto spdb = std::make_shared<redisDbPersistentData>(); auto spdb = std::make_shared<redisDbPersistentData>();
spdb->initialize(); spdb->m_pdict = dictCreate(&dbDictType,spdb.get());
spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0;
spdb->m_pdict->rehashidx = m_pdict->rehashidx;
spdb->m_pdict->iterators++; // fake an iterator so it doesn't rehash
if (m_setexpire != nullptr)
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
for (unsigned iht = 0; iht < 2; ++iht) for (unsigned iht = 0; iht < 2; ++iht)
{ {
@ -2021,7 +2045,7 @@ std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot()
{ {
const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx]; const dictEntry *deSrc = spdb->m_pdict->ht[iht].table[idx];
dictEntry **pdeDst = &m_pdict->ht[iht].table[idx]; dictEntry **pdeDst = &m_pdict->ht[iht].table[idx];
if (deSrc != nullptr) while (deSrc != nullptr)
{ {
*pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED); *pdeDst = (dictEntry*)zmalloc(sizeof(dictEntry), MALLOC_SHARED);
(*pdeDst)->key = deSrc->key; (*pdeDst)->key = deSrc->key;
@ -2039,28 +2063,38 @@ std::shared_ptr<redisDbPersistentData> redisDbPersistentData::createSnapshot()
void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot) void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
{ {
serverAssert(GlobalLocksAcquired());
serverAssert(m_spdbSnapshot.get() == psnapshot); serverAssert(m_spdbSnapshot.get() == psnapshot);
dictIterator *di = dictGetIterator(m_pdict); dictIterator *di = dictGetIterator(m_pdict);
dictEntry *de; dictEntry *de;
while ((de = dictNext(di)) != NULL) while ((de = dictNext(di)) != NULL)
{ {
dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de));
if (dictGetVal(de) == nullptr) if (dictGetVal(de) == nullptr)
{ {
dictEntry *deSnapshot = dictFind(m_spdbSnapshot->m_pdict, dictGetKey(de));
if (deSnapshot != nullptr) if (deSnapshot != nullptr)
{ {
dictSetVal(m_pdict, de, dictGetVal(deSnapshot)); de->v.val = deSnapshot->v.val;
deSnapshot->v.val = nullptr;
} }
} }
if (deSnapshot && (dictGetKey(deSnapshot) == dictGetKey(de)))
{
// The key is owned by the parent snapshot, so we modify the DB key dtor
// to ensure the key is not free'd during the delete
m_spdbSnapshot->m_pdict->type = &dbSnapshotDictType;
dictDelete(m_spdbSnapshot->m_pdict, dictGetKey(de));
m_spdbSnapshot->m_pdict->type = &dbDictType;
}
} }
dictReleaseIterator(di); dictReleaseIterator(di);
m_spdbSnapshot->m_pdict->iterators--;
m_spdbSnapshot = nullptr; m_spdbSnapshot = nullptr;
} }
redisDbPersistentData::~redisDbPersistentData() redisDbPersistentData::~redisDbPersistentData()
{ {
dictRelease(m_pdict); dictRelease(m_pdict);
if (m_setexpire) delete m_setexpire;
delete m_setexpire;
} }

View File

@ -115,7 +115,7 @@ void redisDbPersistentData::emptyDbAsync() {
dict *oldht1 = m_pdict; dict *oldht1 = m_pdict;
auto *set = m_setexpire; auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset(); m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,NULL); m_pdict = dictCreate(&dbDictType,this);
if (m_pstorage != nullptr) if (m_pstorage != nullptr)
m_pstorage->clear(); m_pstorage->clear();
if (m_fTrackingChanges) if (m_fTrackingChanges)

View File

@ -1678,6 +1678,8 @@ int handleClientsWithPendingWrites(int iel) {
std::unique_lock<decltype(c->lock)> lock(c->lock); std::unique_lock<decltype(c->lock)> lock(c->lock);
AeLocker locker;
locker.arm(c);
/* Try to write buffers to the client socket. */ /* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) if (writeToClient(c->fd,c,0) == C_ERR)
{ {

View File

@ -1494,24 +1494,23 @@ void redisObject::setrefcount(unsigned ref)
refcount.store(ref, std::memory_order_relaxed); refcount.store(ref, std::memory_order_relaxed);
} }
sds serializeStoredStringObject(robj_roptr o) sds serializeStoredStringObject(sds str, robj_roptr o)
{ {
sds str = sdsempty(); str = sdscatlen(str, &(*o), sizeof(robj));
sdscatlen(str, &(*o), sizeof(robj));
switch (o->encoding) switch (o->encoding)
{ {
case OBJ_ENCODING_RAW: case OBJ_ENCODING_RAW:
sdscat(str, szFromObj(o)); str = sdscatsds(str, (sds)szFromObj(o));
break; break;
case OBJ_ENCODING_INT: case OBJ_ENCODING_INT:
break; //nop break; //nop
case OBJ_ENCODING_EMBSTR: case OBJ_ENCODING_EMBSTR:
size_t cch = sdslen(szFromObj(o)); size_t cch = sdslen(szFromObj(o)) + sizeof(struct sdshdr8);
if (cch > sizeof(redisObject::m_ptr)) if (cch > sizeof(redisObject::m_ptr))
{ {
sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr)); str = sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr));
} }
break; break;
} }
@ -1525,10 +1524,14 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
robj *newObject = nullptr; robj *newObject = nullptr;
switch (oT->encoding) switch (oT->encoding)
{ {
case OBJ_ENCODING_EMBSTR:
case OBJ_ENCODING_INT: case OBJ_ENCODING_INT:
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); serverAssert(cb == sizeof(robj));
[[fallthrough]];
case OBJ_ENCODING_EMBSTR:
newObject = (robj*)zmalloc(cb+1, MALLOC_LOCAL);
((char*)newObject)[cb] = '\0';
memcpy(newObject, data, cb); memcpy(newObject, data, cb);
newObject->setrefcount(1);
return newObject; return newObject;
case OBJ_ENCODING_RAW: case OBJ_ENCODING_RAW:
@ -1536,6 +1539,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
memcpy(newObject, data, sizeof(robj)); memcpy(newObject, data, sizeof(robj));
newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj));
memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj));
newObject->setrefcount(1);
return newObject; return newObject;
} }
serverPanic("Unknown string object encoding from storage"); serverPanic("Unknown string object encoding from storage");
@ -1544,11 +1548,10 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
robj *deserializeStoredObject(const void *data, size_t cb) robj *deserializeStoredObject(const void *data, size_t cb)
{ {
const robj *oT = (const robj*)data; switch (((char*)data)[0])
switch (oT->type)
{ {
case OBJ_STRING: //case RDB_TYPE_STRING:
return deserializeStoredStringObject((char*)data, cb); // return deserializeStoredStringObject(((char*)data)+1, cb-1);
default: default:
rio payload; rio payload;
@ -1560,6 +1563,21 @@ robj *deserializeStoredObject(const void *data, size_t cb)
{ {
serverPanic("Bad data format"); serverPanic("Bad data format");
} }
if (rdbLoadType(&payload) == RDB_OPCODE_AUX)
{
robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(&payload)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(&payload)) == NULL) {
decrRefCount(auxkey);
goto eoferr;
}
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
}
decrRefCount(auxkey);
decrRefCount(auxval);
}
eoferr:
return obj; return obj;
} }
serverPanic("Unknown object type loading from storage"); serverPanic("Unknown object type loading from storage");
@ -1569,8 +1587,12 @@ sds serializeStoredObject(robj_roptr o)
{ {
switch (o->type) switch (o->type)
{ {
case OBJ_STRING: //case OBJ_STRING:
return serializeStoredStringObject(o); //{
// sds sdsT = sdsnewlen(nullptr, 1);
// sdsT[0] = RDB_TYPE_STRING;
// return serializeStoredStringObject(sdsT, o);
//}
default: default:
rio rdb; rio rdb;

View File

@ -705,7 +705,7 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+ nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
sizeof(raxNode*); sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*); if (h->iskey && !h->isnull) nodesize += sizeof(void*);
trimmed = rax_malloc(nodesize); trimmed = zcalloc(nodesize, MALLOC_LOCAL);//rax_malloc(nodesize);
} }
if (postfixlen) { if (postfixlen) {

View File

@ -44,6 +44,7 @@
#include <sys/param.h> #include <sys/param.h>
#include <thread> #include <thread>
#include <future> #include <future>
#include "aelocker.h"
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
@ -1105,9 +1106,9 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
if (rsi) { if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1; == -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",g_pserver->replid) if (rdbSaveAuxFieldStrStr(rdb,"repl-id",rsi->repl_id)
== -1) return -1; == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",g_pserver->master_repl_offset) if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
== -1) return -1; == -1) return -1;
} }
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
@ -1156,6 +1157,7 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r
rdb->update_cksum = rioGenericUpdateChecksum; rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < cserver.dbnum; j++) { for (j = 0; j < cserver.dbnum; j++) {
@ -1179,13 +1181,13 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r
/* Iterate this DB writing every entry */ /* Iterate this DB writing every entry */
size_t ckeysExpired = 0; size_t ckeysExpired = 0;
bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool{ bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool {
if (o->FExpires()) if (o->FExpires())
++ckeysExpired; ++ckeysExpired;
if (!saveKey(rdb, db, flags, &processed, keystr, o)) if (!saveKey(rdb, db, flags, &processed, keystr, o))
return false; return false;
return true; return !g_pserver->rdbThreadVars.fRdbThreadCancel;
}); });
if (!fSavedAll) if (!fSavedAll)
goto werr; goto werr;
@ -1196,16 +1198,22 @@ int rdbSaveRio(rio *rdb, redisDbPersistentData **rgpdb, int *error, int flags, r
* the script cache as well: on successful PSYNC after a restart, we need * the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the * to be able to process any EVALSHA inside the replication backlog the
* master will send us. */ * master will send us. */
{
AeLocker lock;
lock.arm(nullptr);
if (rsi && dictSize(g_pserver->lua_scripts)) { if (rsi && dictSize(g_pserver->lua_scripts)) {
di = dictGetIterator(g_pserver->lua_scripts); di = dictGetIterator(g_pserver->lua_scripts);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
robj *body = (robj*)dictGetVal(de); robj *body = (robj*)dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,szFromObj(body),sdslen(szFromObj(body))) == -1) if (rdbSaveAuxField(rdb,"lua",3,szFromObj(body),sdslen(szFromObj(body))) == -1)
goto werr; goto werr;
if (g_pserver->rdbThreadVars.fRdbThreadCancel)
goto werr;
} }
dictReleaseIterator(di); dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */ di = NULL; /* So that we don't release it again on error. */
} }
} // AeLocker end scope
/* EOF opcode */ /* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@ -1293,7 +1301,7 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp; FILE *fp;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); snprintf(tmpfile,256,"temp-%d.rdb", g_pserver->rdbThreadVars.tmpfileNum);
fp = fopen(tmpfile,"w"); fp = fopen(tmpfile,"w");
if (!fp) { if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN); char *cwdp = getcwd(cwd,MAXPATHLEN);
@ -1331,9 +1339,12 @@ int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
} }
serverLog(LL_NOTICE,"DB saved on disk"); serverLog(LL_NOTICE,"DB saved on disk");
g_pserver->dirty = 0; if (serverTL != nullptr)
g_pserver->lastsave = time(NULL); {
g_pserver->lastbgsave_status = C_OK; g_pserver->dirty = 0;
g_pserver->lastsave = time(NULL);
g_pserver->lastbgsave_status = C_OK;
}
return C_OK; return C_OK;
werr: werr:
@ -1343,9 +1354,17 @@ werr:
return C_ERR; return C_ERR;
} }
int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi) struct rdbSaveThreadArgs
{ {
int retval = rdbSave(rgpdb, rsi); rdbSaveInfo rsi;
redisDbPersistentData *rgpdb[1]; // NOTE: Variable Length
};
void *rdbSaveThread(void *vargs)
{
rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs);
serverAssert(serverTL == nullptr);
int retval = rdbSave(args->rgpdb, &args->rsi);
if (retval == C_OK) { if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1); size_t private_dirty = zmalloc_get_private_dirty(-1);
@ -1358,11 +1377,41 @@ int rdbSaveThread(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi)
g_pserver->child_info_data.cow_size = private_dirty; g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB); sendChildInfo(CHILD_INFO_TYPE_RDB);
} }
return (retval == C_OK) ? 0 : 1;
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
aeReleaseLock();
zfree(args);
return (retval == C_OK) ? (void*)0 : (void*)1;
}
int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
{
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentData*)), MALLOC_LOCAL);
rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT;
if (rsi == nullptr)
rsi = &rsiT;
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo));
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get();
g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, NULL, rdbSaveThread, args)) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
zfree(args);
return C_ERR;
}
return C_OK;
} }
int rdbSaveBackground(rdbSaveInfo *rsi) { int rdbSaveBackground(rdbSaveInfo *rsi) {
pid_t childpid; pthread_t child;
long long start; long long start;
if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR; if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR;
@ -1372,39 +1421,32 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
openChildInfoPipe(); openChildInfoPipe();
start = ustime(); start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
closeListeningSockets(0);
redisSetProcTitle("keydb-rdb-bgsave");
int rval = rdbSaveThread(nullptr, rsi);
exitFromChild(rval);
} else {
/* Parent */
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
g_pserver->lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdb_child_pid = childpid;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
if (launchRdbSaveThread(child, rsi) != C_OK) {
closeChildInfoPipe();
g_pserver->lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
} }
return C_OK; /* unreached */ serverLog(LL_NOTICE,"Background saving started");
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdbThreadVars.fRdbThreadActive = true;
g_pserver->rdbThreadVars.rdb_child_thread = child;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
} }
void rdbRemoveTempFile(pid_t childpid) { void rdbRemoveTempFile(int tmpfileNum) {
char tmpfile[256]; char tmpfile[256];
snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid); snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", tmpfileNum);
unlink(tmpfile); unlink(tmpfile);
} }
@ -2249,7 +2291,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Background saving terminated by signal %d", bysignal); "Background saving terminated by signal %d", bysignal);
latencyStartMonitor(latency); latencyStartMonitor(latency);
rdbRemoveTempFile(g_pserver->rdb_child_pid); rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum);
latencyEndMonitor(latency); latencyEndMonitor(latency);
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency); latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without /* SIGUSR1 is whitelisted, so we have a way to kill a child without
@ -2257,7 +2299,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
if (bysignal != SIGUSR1) if (bysignal != SIGUSR1)
g_pserver->lastbgsave_status = C_ERR; g_pserver->lastbgsave_status = C_ERR;
} }
g_pserver->rdb_child_pid = -1; g_pserver->rdbThreadVars.fRdbThreadActive = false;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE;
g_pserver->rdb_save_time_last = time(NULL)-g_pserver->rdb_save_time_start; g_pserver->rdb_save_time_last = time(NULL)-g_pserver->rdb_save_time_start;
g_pserver->rdb_save_time_start = -1; g_pserver->rdb_save_time_start = -1;
@ -2282,7 +2324,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Background transfer terminated by signal %d", bysignal); "Background transfer terminated by signal %d", bysignal);
} }
g_pserver->rdb_child_pid = -1; g_pserver->rdbThreadVars.fRdbThreadActive = false;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE;
g_pserver->rdb_save_time_start = -1; g_pserver->rdb_save_time_start = -1;
@ -2381,24 +2423,112 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
* the child did not exit for an error, but because we wanted), and performs * the child did not exit for an error, but because we wanted), and performs
* the cleanup needed. */ * the cleanup needed. */
void killRDBChild(void) { void killRDBChild(void) {
kill(g_pserver->rdb_child_pid,SIGUSR1); g_pserver->rdbThreadVars.fRdbThreadCancel = true;
rdbRemoveTempFile(g_pserver->rdb_child_pid); void *rval;
pthread_join(g_pserver->rdbThreadVars.rdb_child_thread,&rval);
g_pserver->rdbThreadVars.fRdbThreadActive = false;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum);
closeChildInfoPipe(); closeChildInfoPipe();
updateDictResizePolicy(); updateDictResizePolicy();
} }
struct rdbSaveSocketThreadArgs
{
rdbSaveInfo rsi;
int *fds;
int numfds;
uint64_t *clientids;
redisDbPersistentData *rgpdb[1];
};
void *rdbSaveToSlavesSocketsThread(void *vargs)
{
/* Child */
serverAssert(serverTL == nullptr);
rdbSaveSocketThreadArgs *args = (rdbSaveSocketThreadArgs*)vargs;
int retval;
rio slave_sockets;
rioInitWithFdset(&slave_sockets,args->fds,args->numfds);
zfree(args->fds);
args->fds = nullptr;
retval = rdbSaveRioWithEOFMark(&slave_sockets,args->rgpdb,NULL,&args->rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
/* If we are returning OK, at least one replica was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <replica[0].id> <replica[0].error> ...
*
* len, replica IDs, and replica errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the replica's client ID, so that the master
* can match the report with a specific replica, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*args->numfds), MALLOC_LOCAL);
uint64_t *len = (uint64_t*)msg;
uint64_t *ids = len+1;
int j, msglen;
*len = args->numfds;
for (j = 0; j < args->numfds; j++) {
*ids++ = args->clientids[j];
*ids++ = slave_sockets.io.fdset.state[j];
}
/* Write the message to the parent. If we have no good slaves or
* we are unable to transfer the message to the parent, we exit
* with an error so that the parent will abort the replication
* process with all the childre that were waiting. */
msglen = sizeof(uint64_t)*(1+2*args->numfds);
if (*len == 0 ||
write(g_pserver->rdb_pipe_write_result_to_parent,msg,msglen)
!= msglen)
{
retval = C_ERR;
}
zfree(msg);
}
aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
aeReleaseLock();
zfree(args->clientids);
zfree(args);
rioFreeFdset(&slave_sockets);
return (retval == C_OK) ? (void*)0 : (void*)1;
}
/* Spawn an RDB child that writes the RDB to the sockets of the slaves /* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
int *fds;
uint64_t *clientids;
int numfds;
listNode *ln; listNode *ln;
listIter li; listIter li;
pid_t childpid; pthread_t child;
long long start; long long start;
int pipefds[2]; int pipefds[2];
rdbSaveSocketThreadArgs *args = nullptr;
if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR; if (g_pserver->aof_child_pid != -1 || g_pserver->FRdbSaveInProgress()) return C_ERR;
@ -2409,22 +2539,27 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdb_pipe_read_result_from_child = pipefds[0]; g_pserver->rdb_pipe_read_result_from_child = pipefds[0];
g_pserver->rdb_pipe_write_result_to_parent = pipefds[1]; g_pserver->rdb_pipe_write_result_to_parent = pipefds[1];
args = (rdbSaveSocketThreadArgs*)zmalloc(sizeof(rdbSaveSocketThreadArgs) + sizeof(redisDbPersistentData*)*(cserver.dbnum-1), MALLOC_LOCAL);
/* Collect the file descriptors of the slaves we want to transfer /* Collect the file descriptors of the slaves we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */ * the RDB to, which are i WAIT_BGSAVE_START state. */
fds = (int*)zmalloc(sizeof(int)*listLength(g_pserver->slaves), MALLOC_LOCAL); args->fds = (int*)zmalloc(sizeof(int)*listLength(g_pserver->slaves), MALLOC_LOCAL);
/* We also allocate an array of corresponding client IDs. This will /* We also allocate an array of corresponding client IDs. This will
* be useful for the child process in order to build the report * be useful for the child process in order to build the report
* (sent via unix pipe) that will be sent to the parent. */ * (sent via unix pipe) that will be sent to the parent. */
clientids = (uint64_t*)zmalloc(sizeof(uint64_t)*listLength(g_pserver->slaves), MALLOC_LOCAL); args->clientids = (uint64_t*)zmalloc(sizeof(uint64_t)*listLength(g_pserver->slaves), MALLOC_LOCAL);
numfds = 0; args->numfds = 0;
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo));
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *replica = (client*)ln->value; client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numfds] = replica->id; args->clientids[args->numfds] = replica->id;
fds[numfds++] = replica->fd; args->fds[args->numfds++] = replica->fd;
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
/* Put the socket in blocking mode to simplify RDB transfer. /* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket * We'll restore it when the children returns (since duped socket
@ -2437,121 +2572,52 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
/* Create the child process. */ /* Create the child process. */
openChildInfoPipe(); openChildInfoPipe();
start = ustime(); start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
int retval;
rio slave_sockets;
rioInitWithFdset(&slave_sockets,fds,numfds); for (int idb = 0; idb < cserver.dbnum; ++idb)
zfree(fds); args->rgpdb[idb] = g_pserver->db[idb].createSnapshot().get();
closeListeningSockets(0); g_pserver->rdbThreadVars.tmpfileNum++;
redisSetProcTitle("keydb-rdb-to-slaves"); g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, nullptr, rdbSaveToSlavesSocketsThread, args)) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
std::vector<redisDbPersistentData*> vecpdb; /* Undo the state change. The caller will perform cleanup on
for (int idb = 0; idb < cserver.dbnum; ++idb) * all the slaves in BGSAVE_START state, but an early call to
{ * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
vecpdb.push_back(&g_pserver->db[idb]); listRewind(g_pserver->slaves,&li);
} while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
int j;
retval = rdbSaveRioWithEOFMark(&slave_sockets,vecpdb.data(),NULL,rsi); for (j = 0; j < args->numfds; j++) {
if (retval == C_OK && rioFlush(&slave_sockets) == 0) if (replica->id == args->clientids[j]) {
retval = C_ERR; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break;
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
/* If we are returning OK, at least one replica was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <replica[0].id> <replica[0].error> ...
*
* len, replica IDs, and replica errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the replica's client ID, so that the master
* can match the report with a specific replica, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds), MALLOC_LOCAL);
uint64_t *len = (uint64_t*)msg;
uint64_t *ids = len+1;
int j, msglen;
*len = numfds;
for (j = 0; j < numfds; j++) {
*ids++ = clientids[j];
*ids++ = slave_sockets.io.fdset.state[j];
}
/* Write the message to the parent. If we have no good slaves or
* we are unable to transfer the message to the parent, we exit
* with an error so that the parent will abort the replication
* process with all the childre that were waiting. */
msglen = sizeof(uint64_t)*(1+2*numfds);
if (*len == 0 ||
write(g_pserver->rdb_pipe_write_result_to_parent,msg,msglen)
!= msglen)
{
retval = C_ERR;
}
zfree(msg);
}
zfree(clientids);
rioFreeFdset(&slave_sockets);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
int j;
for (j = 0; j < numfds; j++) {
if (replica->id == clientids[j]) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break;
}
} }
} }
close(pipefds[0]);
close(pipefds[1]);
closeChildInfoPipe();
} else {
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
childpid);
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdb_child_pid = childpid;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
updateDictResizePolicy();
} }
zfree(clientids); close(pipefds[0]);
zfree(fds); close(pipefds[1]);
return (childpid == -1) ? C_ERR : C_OK; closeChildInfoPipe();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb].endSnapshot(args->rgpdb[idb]);
zfree(args->clientids);
zfree(args->fds);
zfree(args);
return C_ERR;
} }
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
serverLog(LL_NOTICE,"Background RDB transfer started");
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdbThreadVars.rdb_child_thread = child;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
updateDictResizePolicy();
return C_OK; /* Unreached. */ return C_OK; /* Unreached. */
} }
@ -2619,6 +2685,9 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
*rsi = rsi_init; *rsi = rsi_init;
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
rsi->master_repl_offset = g_pserver->master_repl_offset;
/* If the instance is a master, we can populate the replication info /* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL, * only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this * it means that the instance isn't in any replication chains. In this

View File

@ -139,7 +139,7 @@ int rdbLoad(rdbSaveInfo *rsi);
int rdbLoadFile(const char *filename, rdbSaveInfo *rsi); int rdbLoadFile(const char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid); void rdbRemoveTempFile(int childpid);
int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbSave(redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbSaveFile(char *filename, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);
int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi); int rdbSaveFp(FILE *pf, redisDbPersistentData **rgpdb, rdbSaveInfo *rsi);

View File

@ -1514,9 +1514,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_NOTICE, serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the " "Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. " "master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid " "Cancelling RDB the save and removing its temp file to avoid "
"any race", "any race");
(long) g_pserver->rdb_child_pid);
killRDBChild(); killRDBChild();
} }

View File

@ -1179,6 +1179,21 @@ int dictSdsKeyCompare(void *privdata, const void *key1,
return memcmp(key1, key2, l1) == 0; return memcmp(key1, key2, l1) == 0;
} }
void dictSdsNOPDestructor(void *, void *) {}
void dictDbKeyDestructor(void *privdata, void *key)
{
redisDbPersistentData *owner = (redisDbPersistentData*)privdata;
serverAssert(owner != nullptr);
if (owner->m_spdbSnapshot != nullptr)
{
dictEntry *deSnapshot = dictFind(owner->m_spdbSnapshot->m_pdict, key);
if (deSnapshot && (key == dictGetKey(deSnapshot)))
return; // don't free, it's now owned by the snapshot
}
sdsfree((sds)key);
}
/* A case insensitive version used for the command lookup table and other /* A case insensitive version used for the command lookup table and other
* places where case insensitive non binary-safe comparison is needed. */ * places where case insensitive non binary-safe comparison is needed. */
int dictSdsKeyCaseCompare(void *privdata, const void *key1, int dictSdsKeyCaseCompare(void *privdata, const void *key1,
@ -1314,10 +1329,19 @@ dictType dbDictType = {
NULL, /* key dup */ NULL, /* key dup */
NULL, /* val dup */ NULL, /* val dup */
dictSdsKeyCompare, /* key compare */ dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */ dictDbKeyDestructor, /* key destructor */
dictObjectDestructor /* val destructor */ dictObjectDestructor /* val destructor */
}; };
dictType dbSnapshotDictType = {
dictSdsHash,
NULL,
NULL,
dictSdsKeyCompare,
dictSdsNOPDestructor,
dictObjectDestructor,
};
/* g_pserver->lua_scripts sha (as sds string) -> scripts (as robj) cache. */ /* g_pserver->lua_scripts sha (as sds string) -> scripts (as robj) cache. */
dictType shaScriptObjectDictType = { dictType shaScriptObjectDictType = {
dictSdsCaseHash, /* hash function */ dictSdsCaseHash, /* hash function */
@ -1934,7 +1958,24 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int statloc; int statloc;
pid_t pid; pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { if (g_pserver->FRdbSaveInProgress())
{
void *rval = nullptr;
if (pthread_tryjoin_np(g_pserver->rdbThreadVars.rdb_child_thread, &rval))
{
if (errno != EBUSY && errno != EAGAIN)
serverLog(LL_WARNING, "Error joining the background RDB save thread: %s\n", strerror(errno));
}
else
{
int exitcode = (int)reinterpret_cast<ptrdiff_t>(rval);
backgroundSaveDoneHandler(exitcode,0);
if (exitcode == 0) receiveChildInfo();
updateDictResizePolicy();
closeChildInfoPipe();
}
}
else if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc); int exitcode = WEXITSTATUS(statloc);
int bysignal = 0; int bysignal = 0;
@ -1942,13 +1983,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (pid == -1) { if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. " serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d", "aof_child_pid = %d",
strerror(errno), strerror(errno),
(int) g_pserver->rdb_child_pid,
(int) g_pserver->aof_child_pid); (int) g_pserver->aof_child_pid);
} else if (pid == g_pserver->rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == g_pserver->aof_child_pid) { } else if (pid == g_pserver->aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal); backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo(); if (!bysignal && exitcode == 0) receiveChildInfo();
@ -2971,7 +3008,7 @@ void initServer(void) {
listSetFreeMethod(g_pserver->pubsub_patterns,freePubsubPattern); listSetFreeMethod(g_pserver->pubsub_patterns,freePubsubPattern);
listSetMatchMethod(g_pserver->pubsub_patterns,listMatchPubsubPattern); listSetMatchMethod(g_pserver->pubsub_patterns,listMatchPubsubPattern);
g_pserver->cronloops = 0; g_pserver->cronloops = 0;
g_pserver->rdb_child_pid = -1; g_pserver->rdbThreadVars.fRdbThreadActive = false;
g_pserver->aof_child_pid = -1; g_pserver->aof_child_pid = -1;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE; g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE;
g_pserver->rdb_bgsave_scheduled = 0; g_pserver->rdb_bgsave_scheduled = 0;
@ -4757,7 +4794,7 @@ static void sigShutdownHandler(int sig) {
* on disk. */ * on disk. */
if (g_pserver->shutdown_asap && sig == SIGINT) { if (g_pserver->shutdown_asap && sig == SIGINT) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now."); serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(getpid()); rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum);
exit(1); /* Exit with an error since this was not a clean shutdown. */ exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (g_pserver->loading) { } else if (g_pserver->loading) {
serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now."); serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");

View File

@ -791,6 +791,16 @@ public:
: when(when), spsubkey(subkey, sdsfree) : when(when), spsubkey(subkey, sdsfree)
{} {}
subexpireEntry(const subexpireEntry &e)
: when(e.when), spsubkey(nullptr, sdsfree)
{
if (e.spsubkey)
spsubkey = std::unique_ptr<const char, void(*)(const char*)>((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree);
}
subexpireEntry(subexpireEntry &&e) = default;
subexpireEntry& operator=(subexpireEntry &&e) = default;
bool operator<(long long when) const noexcept { return this->when < when; } bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; } bool operator<(const subexpireEntry &se) { return this->when < se.when; }
}; };
@ -803,6 +813,10 @@ public:
expireEntryFat(sds keyPrimary) expireEntryFat(sds keyPrimary)
: m_keyPrimary(keyPrimary) : m_keyPrimary(keyPrimary)
{} {}
expireEntryFat(const expireEntryFat &e) = default;
expireEntryFat(expireEntryFat &&e) = default;
long long when() const noexcept { return m_vecexpireEntries.front().when; } long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return m_keyPrimary; } const char *key() const noexcept { return m_keyPrimary; }
@ -904,6 +918,14 @@ public:
m_when = LLONG_MIN; m_when = LLONG_MIN;
} }
expireEntry(const expireEntry &e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
if (e.FFat())
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry);
}
expireEntry(expireEntry &&e) expireEntry(expireEntry &&e)
{ {
u.m_key = e.u.m_key; u.m_key = e.u.m_key;
@ -1082,6 +1104,7 @@ public:
class redisDbPersistentData class redisDbPersistentData
{ {
friend void dictDbKeyDestructor(void *privdata, void *key);
public: public:
~redisDbPersistentData(); ~redisDbPersistentData();
@ -1131,6 +1154,9 @@ public:
return m_setexpire->find(key); return m_setexpire->find(key);
} }
dict_iter end() { return dict_iter(nullptr); }
dict_const_iter end() const { return dict_const_iter(nullptr); }
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); }
void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); }
@ -1171,14 +1197,14 @@ private:
void storeKey(const char *key, size_t cchKey, robj *o); void storeKey(const char *key, size_t cchKey, robj *o);
// Keyspace // Keyspace
dict *m_pdict; /* The keyspace for this DB */ dict *m_pdict = nullptr; /* The keyspace for this DB */
int m_fTrackingChanges = 0; // Note: Stack based int m_fTrackingChanges = 0; // Note: Stack based
bool m_fAllChanged = false; bool m_fAllChanged = false;
std::set<std::string> m_setchanged; std::set<std::string> m_setchanged;
IStorage *m_pstorage = nullptr; IStorage *m_pstorage = nullptr;
// Expire // Expire
expireset *m_setexpire; expireset *m_setexpire = nullptr;
std::shared_ptr<redisDbPersistentData> m_spdbSnapshot; std::shared_ptr<redisDbPersistentData> m_spdbSnapshot;
}; };
@ -1212,8 +1238,6 @@ typedef struct redisDb : public redisDbPersistentData
{} {}
void initialize(int id); void initialize(int id);
const_iter end() { return const_iter(nullptr); }
void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
@ -1564,6 +1588,9 @@ typedef struct rdbSaveInfo {
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */ long long repl_offset; /* Replication offset. */
int fForceSetKey; int fForceSetKey;
/* Used In Save */
long long master_repl_offset;
} rdbSaveInfo; } rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} #define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE}
@ -1821,7 +1848,13 @@ struct redisServer {
/* RDB persistence */ /* RDB persistence */
long long dirty; /* Changes to DB from the last save */ long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
pid_t rdb_child_pid; /* PID of RDB saving child */ struct _rdbThreadVars
{
bool fRdbThreadActive = false;
volatile bool fRdbThreadCancel = false;
pthread_t rdb_child_thread; /* PID of RDB saving child */
int tmpfileNum = 0;
} rdbThreadVars;
struct saveparam *saveparams; /* Save points array for RDB */ struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */ int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */ char *rdb_filename; /* Name of RDB file */
@ -2001,7 +2034,7 @@ struct redisServer {
/* System hardware info */ /* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */ size_t system_memory_size; /* Total memory in system as reported by OS */
bool FRdbSaveInProgress() const { return rdb_child_pid != -1; } bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
}; };
typedef struct pubsubPattern { typedef struct pubsubPattern {
@ -2107,6 +2140,7 @@ extern dictType zsetDictType;
extern dictType clusterNodesDictType; extern dictType clusterNodesDictType;
extern dictType clusterNodesBlackListDictType; extern dictType clusterNodesBlackListDictType;
extern dictType dbDictType; extern dictType dbDictType;
extern dictType dbSnapshotDictType;
extern dictType shaScriptObjectDictType; extern dictType shaScriptObjectDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
extern dictType hashDictType; extern dictType hashDictType;