diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 9da163653..ef28e54f1 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -136,12 +136,12 @@ void SnapshotPayloadParseState::flushQueuedKeys() { int idb = current_database; serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); auto sizePrev = vecqueuedKeys.size(); - ++insertsInFlight; - auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + (*insertsInFlight)++; + std::weak_ptr> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous if (current_database < cserver.dbnum) { - g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); - --insertsInFlightTmp; + (*(insertsInFlightTmp.lock()))--; delete pallocator; }); } else { @@ -172,7 +172,7 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() { dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); dictMetaData = dictCreate(&metadataDictType, nullptr); - insertsInFlight = 0; + insertsInFlight = std::make_shared>(); m_spallocator = std::make_unique(); } @@ -214,7 +214,7 @@ void SnapshotPayloadParseState::trimState() { if (stackParse.empty()) { flushQueuedKeys(); - while (insertsInFlight > 0) { + while (*insertsInFlight > 0) { // TODO: ProcessEventsWhileBlocked aeReleaseLock(); aeAcquireLock(); diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index cb1e0420a..29ac28fb6 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -36,7 +36,7 @@ class SnapshotPayloadParseState { std::vector vecqueuedValsCb; - std::atomic insertsInFlight; + std::shared_ptr> insertsInFlight; std::unique_ptr m_spallocator; dict *dictLongLongMetaData = nullptr; dict *dictMetaData = nullptr; @@ -62,5 +62,5 @@ public: void pushArray(long long size); void pushValue(const char *rgch, long long cch); void pushValue(long long value); - bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); } + bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); } }; \ No newline at end of file