From 0b607ecfc373123ebebac54e941e4a70f6476c5d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Nov 2021 18:47:36 +0000 Subject: [PATCH] Fix multiple test failures Former-commit-id: ba99418643e7b4a12e79bb08e4a7f152da28b861 --- runtest | 1 - src/SnapshotPayloadParseState.cpp | 139 +++++++++++++++++++++--------- src/SnapshotPayloadParseState.h | 6 +- src/replication.cpp | 65 +++++++++++--- 4 files changed, 154 insertions(+), 57 deletions(-) diff --git a/runtest b/runtest index 6c9ea5ae4..c6349d118 100755 --- a/runtest +++ b/runtest @@ -14,4 +14,3 @@ then exit 1 fi $TCLSH tests/test_helper.tcl "${@}" -$TCLSH tests/test_helper.tcl "${@}--flash" \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 4bf89c718..8ba4b109b 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -84,19 +84,31 @@ dictType metadataLongLongDictType = { nullptr /* async free destructor */ }; +dictType metadataDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictSdsDestructor, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + + SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() { if (stackParse.empty()) return ParseStageName::Global; - switch (stackParse.top().name) + switch (stackParse.back().name) { case ParseStageName::None: return ParseStageName::Global; case ParseStageName::Global: - if (stackParse.top().arraycur == 0) + if (stackParse.back().arraycur == 0) return ParseStageName::MetaData; - else if (stackParse.top().arraycur == 1) + else if (stackParse.back().arraycur == 1) return ParseStageName::Databases; break; @@ -118,6 +130,7 @@ SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStag void SnapshotPayloadParseState::flushQueuedKeys() { if (vecqueuedKeys.empty()) return; + serverAssert(current_database >= 0); // TODO: We can't finish parse until all the work functions finish int idb = current_database; @@ -125,11 +138,20 @@ void SnapshotPayloadParseState::flushQueuedKeys() { auto sizePrev = vecqueuedKeys.size(); ++insertsInFlight; auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous - g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { - g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); - --insertsInFlightTmp; - delete pallocator; - }); + if (current_database < cserver.dbnum) { + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + --insertsInFlightTmp; + delete pallocator; + }); + } else { + // else drop the data + vecqueuedKeys.clear(); + vecqueuedKeysCb.clear(); + vecqueuedVals.clear(); + vecqueuedValsCb.clear(); + // Note: m_spallocator will get free'd when overwritten below + } m_spallocator = std::make_unique(); cbQueued = 0; vecqueuedKeys.reserve(sizePrev); @@ -146,19 +168,21 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() { ParseStage stage; stage.name = ParseStageName::None; stage.arraylen = 1; - stackParse.push(stage); + stackParse.push_back(stage); dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); + dictMetaData = dictCreate(&metadataDictType, nullptr); insertsInFlight = 0; m_spallocator = std::make_unique(); } SnapshotPayloadParseState::~SnapshotPayloadParseState() { dictRelease(dictLongLongMetaData); + dictRelease(dictMetaData); } -const char *SnapshotPayloadParseState::getStateDebugName() { - switch (stackParse.top().name) { +const char *SnapshotPayloadParseState::getStateDebugName(ParseStage stage) { + switch (stage.name) { case ParseStageName::None: return "None"; @@ -185,8 +209,8 @@ const char *SnapshotPayloadParseState::getStateDebugName() { size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); } void SnapshotPayloadParseState::trimState() { - while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen)) - stackParse.pop(); + while (!stackParse.empty() && (stackParse.back().arraycur == stackParse.back().arraylen)) + stackParse.pop_back(); if (stackParse.empty()) { flushQueuedKeys(); @@ -203,13 +227,13 @@ void SnapshotPayloadParseState::pushArray(long long size) { throw "Bad Protocol: unexpected trailing data"; if (size == 0) { - stackParse.top().arraycur++; + stackParse.back().arraycur++; return; } - if (stackParse.top().name == ParseStageName::Databases) { + if (stackParse.back().name == ParseStageName::Databases) { flushQueuedKeys(); - current_database = stackParse.top().arraycur; + current_database = stackParse.back().arraycur; } ParseStage stage; @@ -221,43 +245,65 @@ void SnapshotPayloadParseState::pushArray(long long size) { } // Note: This affects getNextStage so ensure its after - stackParse.top().arraycur++; - stackParse.push(stage); + stackParse.back().arraycur++; + stackParse.push_back(stage); } void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) { if (stackParse.empty()) throw "Bad Protocol: unexpected trailing bulk string"; - if (stackParse.top().arraycur >= static_cast(stackParse.top().arrvalues.size())) + if (stackParse.back().arraycur >= static_cast(stackParse.back().arrvalues.size())) throw "Bad protocol: Unexpected value"; - auto &stage = stackParse.top(); - stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch); - stage.arrvalues[stackParse.top().arraycur].second = cch; - memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch); + auto &stage = stackParse.back(); + stage.arrvalues[stackParse.back().arraycur].first = (char*)m_spallocator->allocate(cch); + stage.arrvalues[stackParse.back().arraycur].second = cch; + memcpy(stage.arrvalues[stackParse.back().arraycur].first, rgch, cch); stage.arraycur++; switch (stage.name) { case ParseStageName::KeyValuePair: - if (stage.arraycur == 2) { - // We loaded both pairs - if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) - throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small - vecqueuedKeys.push_back(stage.arrvalues[0].first); - vecqueuedKeysCb.push_back(stage.arrvalues[0].second); - vecqueuedVals.push_back(stage.arrvalues[1].first); - vecqueuedValsCb.push_back(stage.arrvalues[1].second); - stage.arrvalues[0].first = nullptr; - stage.arrvalues[1].first = nullptr; - cbQueued += vecqueuedKeysCb.back(); - cbQueued += vecqueuedValsCb.back(); - if (cbQueued >= queuedBatchLimit) - flushQueuedKeys(); + if (stackParse.size() < 2) + throw "Bad Protocol: unexpected bulk string"; + if (stackParse[stackParse.size()-2].name == ParseStageName::MetaData) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + + if (!strcasecmp(stage.arrvalues[0].first, "lua")) { + /* Load the script back in memory. */ + robj *auxval = createStringObject(stage.arrvalues[1].first, stage.arrvalues[1].second); + if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) { + throw "Can't load Lua script"; + } + } else { + dictAdd(dictMetaData, sdsnewlen(stage.arrvalues[0].first, stage.arrvalues[0].second), sdsnewlen(stage.arrvalues[1].first, stage.arrvalues[1].second)); + } + } + } else if (stackParse[stackParse.size()-2].name == ParseStageName::Dataset) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + vecqueuedKeys.push_back(stage.arrvalues[0].first); + vecqueuedKeysCb.push_back(stage.arrvalues[0].second); + vecqueuedVals.push_back(stage.arrvalues[1].first); + vecqueuedValsCb.push_back(stage.arrvalues[1].second); + stage.arrvalues[0].first = nullptr; + stage.arrvalues[1].first = nullptr; + cbQueued += vecqueuedKeysCb.back(); + cbQueued += vecqueuedValsCb.back(); + if (cbQueued >= queuedBatchLimit) + flushQueuedKeys(); + } + } else { + throw "Bad Protocol: unexpected bulk string"; } break; default: - throw "Bad Protocol: unexpected bulk string"; + throw "Bad Protocol: unexpected bulk string out of KV pair"; } } @@ -265,12 +311,12 @@ void SnapshotPayloadParseState::pushValue(long long value) { if (stackParse.empty()) throw "Bad Protocol: unexpected integer value"; - stackParse.top().arraycur++; + stackParse.back().arraycur++; - if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr) + if (stackParse.back().arraycur != 2 || stackParse.back().arrvalues[0].first == nullptr) throw "Bad Protocol: unexpected integer value"; - dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr); + dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.back().arrvalues[0].first, stackParse.back().arrvalues[0].second), nullptr); if (de == nullptr) throw "Bad Protocol: metadata field sent twice"; de->v.s64 = value; @@ -284,4 +330,15 @@ long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) co throw false; } return de->v.s64; +} + +sds SnapshotPayloadParseState::getMetaDataStr(const char *szField) const { + sdsstring str(szField, strlen(szField)); + + dictEntry *de = dictFind(dictMetaData, str.get()); + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return (sds)de->v.val; } \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index fee42178a..cb1e0420a 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -27,7 +27,7 @@ class SnapshotPayloadParseState { } }; - std::stack stackParse; + std::vector stackParse; std::vector vecqueuedKeys; @@ -39,6 +39,7 @@ class SnapshotPayloadParseState { std::atomic insertsInFlight; std::unique_ptr m_spallocator; dict *dictLongLongMetaData = nullptr; + dict *dictMetaData = nullptr; size_t cbQueued = 0; static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB int current_database = -1; @@ -51,8 +52,9 @@ public: ~SnapshotPayloadParseState(); long long getMetaDataLongLong(const char *field) const; + sds getMetaDataStr(const char *szField) const; - const char *getStateDebugName(); + static const char *getStateDebugName(ParseStage stage); size_t depth() const; diff --git a/src/replication.cpp b/src/replication.cpp index 6cfe7d3ba..3651b0113 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1079,7 +1079,42 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { } serverAssert(replica != nullptr); - g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{ + spreplBuf->addArrayLen(2); // Two sections: Metadata and databases + + // MetaData + aeAcquireLock(); + spreplBuf->addArrayLen(3 + dictSize(g_pserver->lua_scripts)); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-stream-db", 14); + spreplBuf->addLong(rsi->repl_stream_db); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-id", 7); + spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE+1); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-offset", 11); + spreplBuf->addLong(rsi->master_repl_offset); + + if (dictSize(g_pserver->lua_scripts)) { + dictEntry *de; + auto di = dictGetIterator(g_pserver->lua_scripts); + while((de = dictNext(di)) != NULL) { + robj *body = (robj*)dictGetVal(de); + + spreplBuf->addArrayLen(2); + spreplBuf->addString("lua", 3); + spreplBuf->addString(szFromObj(body), sdslen(szFromObj(body))); + } + dictReleaseIterator(di); + di = NULL; /* So that we don't release it again on error. */ + } + + std::shared_ptr>> spvecspsnapshot = std::make_shared>>(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache()); + } + aeReleaseLock(); + + g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{ int retval = C_OK; auto timeStart = ustime(); auto lastLogTime = timeStart; @@ -1087,18 +1122,10 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t cbLastUpdate = 0; auto &replBuf = *spreplBuf; - replBuf.addArrayLen(2); // Two sections: Metadata and databases - - // MetaData - replBuf.addArrayLen(1); - replBuf.addArrayLen(2); - replBuf.addString("repl-stream-db", 14); - replBuf.addLong(repl_stream_db); - // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { - std::unique_ptr spsnapshot = g_pserver->db[idb]->CloneStorageCache(); + auto &spsnapshot = (*spvecspsnapshot)[idb]; size_t snapshotDeclaredCount = spsnapshot->count(); replBuf.addArrayLen(snapshotDeclaredCount); size_t count = 0; @@ -1139,7 +1166,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); if (retval == C_OK) { aeAcquireLock(); - replicationScriptCacheFlush(); replBuf.putSlavesOnline(); aeReleaseLock(); } @@ -2302,6 +2328,7 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) { size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { size_t cchNumeral = 0; + if (cch > cchNumeral+1 && rgch[cchNumeral+1] == '-') ++cchNumeral; while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral; if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect @@ -2313,7 +2340,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { throw false; } - if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) { + if (!string2ll(rgch+1, cchNumeral, pvalue)) { serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); throw false; } @@ -2330,6 +2357,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (mi->bulkreadBuffer == nullptr) { mi->bulkreadBuffer = sdsempty(); mi->parseState = new SnapshotPayloadParseState(); + if (g_pserver->aof_state != AOF_OFF) stopAppendOnly(); if (!fUpdate) { int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; @@ -2379,6 +2407,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi // Lets read the array length offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize); + if (arraySize < 0) + throw "Invalid array size"; mi->parseState->pushArray(arraySize); } else if (mi->bulkreadBuffer[offset] == '$') { @@ -2387,6 +2417,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi // Lets read the string length size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize); + if (payloadsize < 0) + throw "Invalid array size"; // OK we know how long the string is, now lets make sure the payload is here. if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) { @@ -2421,7 +2453,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (mi->parseState->depth() != 0) return false; + static_assert(sizeof(long) == sizeof(long long),""); rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); + rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset"); + sds str = mi->parseState->getMetaDataStr("repl-id"); + if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) { + memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1); + } + fFinished = true; break; // We're done!!! } @@ -3347,7 +3386,7 @@ void syncWithMaster(connection *conn) { "capa","psync2", "capa","activeExpire", }; - if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) { + if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { veccapabilities.push_back("capa"); veccapabilities.push_back("rocksdb-snapshot-load"); }