Don't prefetch when lock contention is low, it increases latency
Former-commit-id: 0d21614e0e5aba28acd364231823d51a3073081f
This commit is contained in:
parent
e6782d8f1b
commit
1b121723e2
16
src/ae.cpp
16
src/ae.cpp
@ -870,3 +870,19 @@ int aeThreadOwnsLock()
|
||||
{
|
||||
return g_lock.fOwnLock();
|
||||
}
|
||||
|
||||
int aeLockContested(int threshold)
|
||||
{
|
||||
return g_lock.m_ticket.m_active < static_cast<uint16_t>(g_lock.m_ticket.m_avail - threshold);
|
||||
}
|
||||
|
||||
int aeLockContention()
|
||||
{
|
||||
ticket ticketT;
|
||||
__atomic_load(&g_lock.m_ticket.u, &ticketT.u, __ATOMIC_RELAXED);
|
||||
int32_t avail = ticketT.m_avail;
|
||||
int32_t active = ticketT.m_active;
|
||||
if (avail < active)
|
||||
avail += 0x10000;
|
||||
return avail - active;
|
||||
}
|
2
src/ae.h
2
src/ae.h
@ -169,6 +169,8 @@ void aeAcquireLock();
|
||||
int aeTryAcquireLock(int fWeak);
|
||||
void aeReleaseLock();
|
||||
int aeThreadOwnsLock();
|
||||
int aeLockContested(int threshold);
|
||||
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
@ -2369,7 +2369,8 @@ void parseClientCommandBuffer(client *c) {
|
||||
}
|
||||
|
||||
/* Prefetch outside the lock for better perf */
|
||||
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
|
||||
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() &&
|
||||
(g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2)) && !GlobalLocksAcquired()) {
|
||||
auto &query = c->vecqueuedcmd.back();
|
||||
if (query.argc > 0 && query.argc == query.argcMax) {
|
||||
c->db->prefetchKeysAsync(c, query);
|
||||
|
@ -2442,6 +2442,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
/* CRON functions may trigger async writes, so do this last */
|
||||
ProcessPendingAsyncWrites();
|
||||
|
||||
// Measure lock contention from a different thread to be more accurate
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
||||
g_pserver->rglockSamples[g_pserver->ilockRingHead] = (uint16_t)aeLockContention();
|
||||
++g_pserver->ilockRingHead;
|
||||
if (g_pserver->ilockRingHead >= redisServer::s_lockContentionSamples)
|
||||
g_pserver->ilockRingHead = 0;
|
||||
});
|
||||
|
||||
g_pserver->cronloops++;
|
||||
return 1000/g_pserver->hz;
|
||||
}
|
||||
@ -5134,6 +5142,11 @@ sds genRedisInfoString(const char *section) {
|
||||
|
||||
/* Stats */
|
||||
if (allsections || defsections || !strcasecmp(section,"stats")) {
|
||||
double avgLockContention = 0;
|
||||
for (unsigned i = 0; i < redisServer::s_lockContentionSamples; ++i)
|
||||
avgLockContention += g_pserver->rglockSamples[i];
|
||||
avgLockContention /= redisServer::s_lockContentionSamples;
|
||||
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
info = sdscatprintf(info,
|
||||
"# Stats\r\n"
|
||||
@ -5169,7 +5182,9 @@ sds genRedisInfoString(const char *section) {
|
||||
"tracking_total_prefixes:%lld\r\n"
|
||||
"unexpected_error_replies:%lld\r\n"
|
||||
"total_reads_processed:%lld\r\n"
|
||||
"total_writes_processed:%lld\r\n",
|
||||
"total_writes_processed:%lld\r\n"
|
||||
"instantaneous_lock_contention:%d\r\n"
|
||||
"avg_lock_contention:%f\r\n",
|
||||
g_pserver->stat_numconnections,
|
||||
g_pserver->stat_numcommands,
|
||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||
@ -5202,7 +5217,9 @@ sds genRedisInfoString(const char *section) {
|
||||
(unsigned long long) trackingGetTotalPrefixes(),
|
||||
g_pserver->stat_unexpected_error_replies,
|
||||
g_pserver->stat_total_reads_processed.load(std::memory_order_relaxed),
|
||||
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed));
|
||||
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed),
|
||||
aeLockContention(),
|
||||
avgLockContention);
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
@ -2396,6 +2396,11 @@ struct redisServer {
|
||||
long long repl_batch_offStart = -1;
|
||||
long long repl_batch_idxStart = -1;
|
||||
|
||||
/* Lock Contention Ring Buffer */
|
||||
static const size_t s_lockContentionSamples = 64;
|
||||
uint16_t rglockSamples[s_lockContentionSamples];
|
||||
unsigned ilockRingHead = 0;
|
||||
|
||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user