Make inserts in flight a shared_ptr to avoid double free (#198)
* remove keyproxy test from machamp * Update build.yaml * make insertsinflight shared
This commit is contained in:
parent
2adf93ba0e
commit
9e310b1bea
@ -136,12 +136,12 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
|
|||||||
int idb = current_database;
|
int idb = current_database;
|
||||||
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
|
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
|
||||||
auto sizePrev = vecqueuedKeys.size();
|
auto sizePrev = vecqueuedKeys.size();
|
||||||
++insertsInFlight;
|
(*insertsInFlight)++;
|
||||||
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
std::weak_ptr<std::atomic<int>> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
||||||
if (current_database < cserver.dbnum) {
|
if (current_database < cserver.dbnum) {
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
||||||
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
||||||
--insertsInFlightTmp;
|
(*(insertsInFlightTmp.lock()))--;
|
||||||
delete pallocator;
|
delete pallocator;
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
@ -172,7 +172,7 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() {
|
|||||||
|
|
||||||
dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr);
|
dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr);
|
||||||
dictMetaData = dictCreate(&metadataDictType, nullptr);
|
dictMetaData = dictCreate(&metadataDictType, nullptr);
|
||||||
insertsInFlight = 0;
|
insertsInFlight = std::make_shared<std::atomic<int>>();
|
||||||
m_spallocator = std::make_unique<SlabAllocator>();
|
m_spallocator = std::make_unique<SlabAllocator>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +214,7 @@ void SnapshotPayloadParseState::trimState() {
|
|||||||
|
|
||||||
if (stackParse.empty()) {
|
if (stackParse.empty()) {
|
||||||
flushQueuedKeys();
|
flushQueuedKeys();
|
||||||
while (insertsInFlight > 0) {
|
while (*insertsInFlight > 0) {
|
||||||
// TODO: ProcessEventsWhileBlocked
|
// TODO: ProcessEventsWhileBlocked
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
@ -36,7 +36,7 @@ class SnapshotPayloadParseState {
|
|||||||
std::vector<size_t> vecqueuedValsCb;
|
std::vector<size_t> vecqueuedValsCb;
|
||||||
|
|
||||||
|
|
||||||
std::atomic<int> insertsInFlight;
|
std::shared_ptr<std::atomic<int>> insertsInFlight;
|
||||||
std::unique_ptr<SlabAllocator> m_spallocator;
|
std::unique_ptr<SlabAllocator> m_spallocator;
|
||||||
dict *dictLongLongMetaData = nullptr;
|
dict *dictLongLongMetaData = nullptr;
|
||||||
dict *dictMetaData = nullptr;
|
dict *dictMetaData = nullptr;
|
||||||
@ -62,5 +62,5 @@ public:
|
|||||||
void pushArray(long long size);
|
void pushArray(long long size);
|
||||||
void pushValue(const char *rgch, long long cch);
|
void pushValue(const char *rgch, long long cch);
|
||||||
void pushValue(long long value);
|
void pushValue(long long value);
|
||||||
bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); }
|
bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); }
|
||||||
};
|
};
|
Loading…
x
Reference in New Issue
Block a user