Fix multiple test failures

Former-commit-id: ba99418643e7b4a12e79bb08e4a7f152da28b861
This commit is contained in:
John Sully 2021-11-10 18:47:36 +00:00
parent 3ab09b9781
commit 0b607ecfc3
4 changed files with 154 additions and 57 deletions

View File

@ -14,4 +14,3 @@ then
exit 1
fi
$TCLSH tests/test_helper.tcl "${@}"
$TCLSH tests/test_helper.tcl "${@}--flash"

View File

@ -84,19 +84,31 @@ dictType metadataLongLongDictType = {
nullptr /* async free destructor */
};
dictType metadataDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor, /* val destructor */
nullptr, /* allow to expand */
nullptr /* async free destructor */
};
SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() {
if (stackParse.empty())
return ParseStageName::Global;
switch (stackParse.top().name)
switch (stackParse.back().name)
{
case ParseStageName::None:
return ParseStageName::Global;
case ParseStageName::Global:
if (stackParse.top().arraycur == 0)
if (stackParse.back().arraycur == 0)
return ParseStageName::MetaData;
else if (stackParse.top().arraycur == 1)
else if (stackParse.back().arraycur == 1)
return ParseStageName::Databases;
break;
@ -118,6 +130,7 @@ SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStag
void SnapshotPayloadParseState::flushQueuedKeys() {
if (vecqueuedKeys.empty())
return;
serverAssert(current_database >= 0);
// TODO: We can't finish parse until all the work functions finish
int idb = current_database;
@ -125,11 +138,20 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
auto sizePrev = vecqueuedKeys.size();
++insertsInFlight;
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
if (current_database < cserver.dbnum) {
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
--insertsInFlightTmp;
delete pallocator;
});
} else {
// else drop the data
vecqueuedKeys.clear();
vecqueuedKeysCb.clear();
vecqueuedVals.clear();
vecqueuedValsCb.clear();
// Note: m_spallocator will get free'd when overwritten below
}
m_spallocator = std::make_unique<SlabAllocator>();
cbQueued = 0;
vecqueuedKeys.reserve(sizePrev);
@ -146,19 +168,21 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() {
ParseStage stage;
stage.name = ParseStageName::None;
stage.arraylen = 1;
stackParse.push(stage);
stackParse.push_back(stage);
dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr);
dictMetaData = dictCreate(&metadataDictType, nullptr);
insertsInFlight = 0;
m_spallocator = std::make_unique<SlabAllocator>();
}
SnapshotPayloadParseState::~SnapshotPayloadParseState() {
dictRelease(dictLongLongMetaData);
dictRelease(dictMetaData);
}
const char *SnapshotPayloadParseState::getStateDebugName() {
switch (stackParse.top().name) {
const char *SnapshotPayloadParseState::getStateDebugName(ParseStage stage) {
switch (stage.name) {
case ParseStageName::None:
return "None";
@ -185,8 +209,8 @@ const char *SnapshotPayloadParseState::getStateDebugName() {
size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); }
void SnapshotPayloadParseState::trimState() {
while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen))
stackParse.pop();
while (!stackParse.empty() && (stackParse.back().arraycur == stackParse.back().arraylen))
stackParse.pop_back();
if (stackParse.empty()) {
flushQueuedKeys();
@ -203,13 +227,13 @@ void SnapshotPayloadParseState::pushArray(long long size) {
throw "Bad Protocol: unexpected trailing data";
if (size == 0) {
stackParse.top().arraycur++;
stackParse.back().arraycur++;
return;
}
if (stackParse.top().name == ParseStageName::Databases) {
if (stackParse.back().name == ParseStageName::Databases) {
flushQueuedKeys();
current_database = stackParse.top().arraycur;
current_database = stackParse.back().arraycur;
}
ParseStage stage;
@ -221,24 +245,43 @@ void SnapshotPayloadParseState::pushArray(long long size) {
}
// Note: This affects getNextStage so ensure its after
stackParse.top().arraycur++;
stackParse.push(stage);
stackParse.back().arraycur++;
stackParse.push_back(stage);
}
void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) {
if (stackParse.empty())
throw "Bad Protocol: unexpected trailing bulk string";
if (stackParse.top().arraycur >= static_cast<int>(stackParse.top().arrvalues.size()))
if (stackParse.back().arraycur >= static_cast<int>(stackParse.back().arrvalues.size()))
throw "Bad protocol: Unexpected value";
auto &stage = stackParse.top();
stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch);
stage.arrvalues[stackParse.top().arraycur].second = cch;
memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch);
auto &stage = stackParse.back();
stage.arrvalues[stackParse.back().arraycur].first = (char*)m_spallocator->allocate(cch);
stage.arrvalues[stackParse.back().arraycur].second = cch;
memcpy(stage.arrvalues[stackParse.back().arraycur].first, rgch, cch);
stage.arraycur++;
switch (stage.name) {
case ParseStageName::KeyValuePair:
if (stackParse.size() < 2)
throw "Bad Protocol: unexpected bulk string";
if (stackParse[stackParse.size()-2].name == ParseStageName::MetaData) {
if (stage.arraycur == 2) {
// We loaded both pairs
if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr)
throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small
if (!strcasecmp(stage.arrvalues[0].first, "lua")) {
/* Load the script back in memory. */
robj *auxval = createStringObject(stage.arrvalues[1].first, stage.arrvalues[1].second);
if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) {
throw "Can't load Lua script";
}
} else {
dictAdd(dictMetaData, sdsnewlen(stage.arrvalues[0].first, stage.arrvalues[0].second), sdsnewlen(stage.arrvalues[1].first, stage.arrvalues[1].second));
}
}
} else if (stackParse[stackParse.size()-2].name == ParseStageName::Dataset) {
if (stage.arraycur == 2) {
// We loaded both pairs
if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr)
@ -254,10 +297,13 @@ void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) {
if (cbQueued >= queuedBatchLimit)
flushQueuedKeys();
}
} else {
throw "Bad Protocol: unexpected bulk string";
}
break;
default:
throw "Bad Protocol: unexpected bulk string";
throw "Bad Protocol: unexpected bulk string out of KV pair";
}
}
@ -265,12 +311,12 @@ void SnapshotPayloadParseState::pushValue(long long value) {
if (stackParse.empty())
throw "Bad Protocol: unexpected integer value";
stackParse.top().arraycur++;
stackParse.back().arraycur++;
if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr)
if (stackParse.back().arraycur != 2 || stackParse.back().arrvalues[0].first == nullptr)
throw "Bad Protocol: unexpected integer value";
dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr);
dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.back().arrvalues[0].first, stackParse.back().arrvalues[0].second), nullptr);
if (de == nullptr)
throw "Bad Protocol: metadata field sent twice";
de->v.s64 = value;
@ -285,3 +331,14 @@ long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) co
}
return de->v.s64;
}
sds SnapshotPayloadParseState::getMetaDataStr(const char *szField) const {
sdsstring str(szField, strlen(szField));
dictEntry *de = dictFind(dictMetaData, str.get());
if (de == nullptr) {
serverLog(LL_WARNING, "Master did not send field: %s", szField);
throw false;
}
return (sds)de->v.val;
}

View File

@ -27,7 +27,7 @@ class SnapshotPayloadParseState {
}
};
std::stack<ParseStage> stackParse;
std::vector<ParseStage> stackParse;
std::vector<char*> vecqueuedKeys;
@ -39,6 +39,7 @@ class SnapshotPayloadParseState {
std::atomic<int> insertsInFlight;
std::unique_ptr<SlabAllocator> m_spallocator;
dict *dictLongLongMetaData = nullptr;
dict *dictMetaData = nullptr;
size_t cbQueued = 0;
static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB
int current_database = -1;
@ -51,8 +52,9 @@ public:
~SnapshotPayloadParseState();
long long getMetaDataLongLong(const char *field) const;
sds getMetaDataStr(const char *szField) const;
const char *getStateDebugName();
static const char *getStateDebugName(ParseStage stage);
size_t depth() const;

View File

@ -1079,7 +1079,42 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
}
serverAssert(replica != nullptr);
g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{
spreplBuf->addArrayLen(2); // Two sections: Metadata and databases
// MetaData
aeAcquireLock();
spreplBuf->addArrayLen(3 + dictSize(g_pserver->lua_scripts));
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-stream-db", 14);
spreplBuf->addLong(rsi->repl_stream_db);
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-id", 7);
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE+1);
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-offset", 11);
spreplBuf->addLong(rsi->master_repl_offset);
if (dictSize(g_pserver->lua_scripts)) {
dictEntry *de;
auto di = dictGetIterator(g_pserver->lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = (robj*)dictGetVal(de);
spreplBuf->addArrayLen(2);
spreplBuf->addString("lua", 3);
spreplBuf->addString(szFromObj(body), sdslen(szFromObj(body)));
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
std::shared_ptr<std::vector<std::unique_ptr<const StorageCache>>> spvecspsnapshot = std::make_shared<std::vector<std::unique_ptr<const StorageCache>>>();
for (int idb = 0; idb < cserver.dbnum; ++idb) {
spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache());
}
aeReleaseLock();
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{
int retval = C_OK;
auto timeStart = ustime();
auto lastLogTime = timeStart;
@ -1087,18 +1122,10 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf;
replBuf.addArrayLen(2); // Two sections: Metadata and databases
// MetaData
replBuf.addArrayLen(1);
replBuf.addArrayLen(2);
replBuf.addString("repl-stream-db", 14);
replBuf.addLong(repl_stream_db);
// Databases
replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
std::unique_ptr<const StorageCache> spsnapshot = g_pserver->db[idb]->CloneStorageCache();
auto &spsnapshot = (*spvecspsnapshot)[idb];
size_t snapshotDeclaredCount = spsnapshot->count();
replBuf.addArrayLen(snapshotDeclaredCount);
size_t count = 0;
@ -1139,7 +1166,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0);
if (retval == C_OK) {
aeAcquireLock();
replicationScriptCacheFlush();
replBuf.putSlavesOnline();
aeReleaseLock();
}
@ -2302,6 +2328,7 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) {
size_t parseCount(const char *rgch, size_t cch, long long *pvalue) {
size_t cchNumeral = 0;
if (cch > cchNumeral+1 && rgch[cchNumeral+1] == '-') ++cchNumeral;
while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral;
if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect
@ -2313,7 +2340,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) {
throw false;
}
if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) {
if (!string2ll(rgch+1, cchNumeral, pvalue)) {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch);
throw false;
}
@ -2330,6 +2357,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
if (mi->bulkreadBuffer == nullptr) {
mi->bulkreadBuffer = sdsempty();
mi->parseState = new SnapshotPayloadParseState();
if (g_pserver->aof_state != AOF_OFF) stopAppendOnly();
if (!fUpdate) {
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
@ -2379,6 +2407,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
// Lets read the array length
offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize);
if (arraySize < 0)
throw "Invalid array size";
mi->parseState->pushArray(arraySize);
} else if (mi->bulkreadBuffer[offset] == '$') {
@ -2387,6 +2417,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
// Lets read the string length
size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize);
if (payloadsize < 0)
throw "Invalid array size";
// OK we know how long the string is, now lets make sure the payload is here.
if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) {
@ -2421,7 +2453,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
if (mi->parseState->depth() != 0)
return false;
static_assert(sizeof(long) == sizeof(long long),"");
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset");
sds str = mi->parseState->getMetaDataStr("repl-id");
if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) {
memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1);
}
fFinished = true;
break; // We're done!!!
}
@ -3347,7 +3386,7 @@ void syncWithMaster(connection *conn) {
"capa","psync2",
"capa","activeExpire",
};
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) {
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
veccapabilities.push_back("capa");
veccapabilities.push_back("rocksdb-snapshot-load");
}