Latency normalization with SCAN
Former-commit-id: 237f25d854e70d4d7a3095fdf56aaa80770e492e
This commit is contained in:
parent
6ec308c30f
commit
2c9c4a98ea
16
src/db.cpp
16
src/db.cpp
@ -896,7 +896,10 @@ void keysCommand(client *c) {
|
|||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
|
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
db->endSnapshot(snapshot);
|
|
||||||
|
locker.disarm();
|
||||||
|
lock.unlock();
|
||||||
|
db->endSnapshotAsync(snapshot);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -1044,7 +1047,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
// Do an async version
|
// Do an async version
|
||||||
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
const redisDbPersistentDataSnapshot *snapshot = nullptr;
|
||||||
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
||||||
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
|
snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */);
|
||||||
if (snapshot != nullptr)
|
if (snapshot != nullptr)
|
||||||
{
|
{
|
||||||
aeEventLoop *el = serverTL->el;
|
aeEventLoop *el = serverTL->el;
|
||||||
@ -1082,9 +1085,16 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
|
|||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
|
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
|
mstime_t timeScanFilter;
|
||||||
|
latencyStartMonitor(timeScanFilter);
|
||||||
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
|
scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult);
|
||||||
|
latencyEndMonitor(timeScanFilter);
|
||||||
|
latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter);
|
||||||
|
|
||||||
db->endSnapshot(snapshot);
|
locker.disarm();
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
db->endSnapshotAsync(snapshot);
|
||||||
listSetFreeMethod(keys,decrRefCountVoid);
|
listSetFreeMethod(keys,decrRefCountVoid);
|
||||||
listRelease(keys);
|
listRelease(keys);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
@ -182,7 +182,8 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
|
|||||||
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
|
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
|
||||||
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
||||||
{
|
{
|
||||||
aeAcquireLock();
|
mstime_t latency;
|
||||||
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
||||||
{
|
{
|
||||||
// Free a stale async snapshot so consolidate_children can clean it up later
|
// Free a stale async snapshot so consolidate_children can clean it up later
|
||||||
@ -198,6 +199,8 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
{
|
{
|
||||||
// For small snapshots it makes more sense just to merge it directly
|
// For small snapshots it makes more sense just to merge it directly
|
||||||
endSnapshot(psnapshot);
|
endSnapshot(psnapshot);
|
||||||
|
latencyEndMonitor(latency);
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-synchronous-path", latency);
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -206,19 +209,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
||||||
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
||||||
psnapshot = nullptr;
|
psnapshot = nullptr;
|
||||||
aeReleaseLock();
|
aeReleaseLock(); latencyEndMonitor(latency);
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency);
|
||||||
|
|
||||||
// do the expensive work of merging snapshots outside the ref
|
// do the expensive work of merging snapshots outside the ref
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it
|
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
||||||
|
|
||||||
// Final Cleanup
|
// Final Cleanup
|
||||||
aeAcquireLock();
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
if (m_pdbSnapshotASYNC == nullptr)
|
if (m_pdbSnapshotASYNC == nullptr)
|
||||||
m_pdbSnapshotASYNC = psnapshotT;
|
m_pdbSnapshotASYNC = psnapshotT;
|
||||||
else
|
else
|
||||||
endSnapshot(psnapshotT); // finally clean up our temp snapshot
|
endSnapshot(psnapshotT); // finally clean up our temp snapshot
|
||||||
aeReleaseLock();
|
aeReleaseLock(); latencyEndMonitor(latency);
|
||||||
|
|
||||||
|
latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||||
@ -262,6 +268,9 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mstime_t latency_endsnapshot;
|
||||||
|
latencyStartMonitor(latency_endsnapshot);
|
||||||
|
|
||||||
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
|
// Alright we're ready to be free'd, but first dump all the refs on our child snapshots
|
||||||
if (m_spdbSnapshotHOLDER->m_refCount == 1)
|
if (m_spdbSnapshotHOLDER->m_refCount == 1)
|
||||||
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
|
recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get());
|
||||||
@ -288,9 +297,6 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mstime_t latency_endsnapshot;
|
|
||||||
latencyStartMonitor(latency_endsnapshot);
|
|
||||||
|
|
||||||
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
|
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
|
||||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user