Don't prefetch when lock contention is low, it increases latency

Former-commit-id: 9b2629f6a20368cec8e55f0d006f3a67c8b770b7
This commit is contained in:
John Sully 2021-04-12 03:23:49 +00:00
parent 2d2ae90f30
commit 5ccaa9265c
5 changed files with 44 additions and 3 deletions

View File

@ -870,3 +870,19 @@ int aeThreadOwnsLock()
{
return g_lock.fOwnLock();
}
int aeLockContested(int threshold)
{
return g_lock.m_ticket.m_active < static_cast<uint16_t>(g_lock.m_ticket.m_avail - threshold);
}
int aeLockContention()
{
ticket ticketT;
__atomic_load(&g_lock.m_ticket.u, &ticketT.u, __ATOMIC_RELAXED);
int32_t avail = ticketT.m_avail;
int32_t active = ticketT.m_active;
if (avail < active)
avail += 0x10000;
return avail - active;
}

View File

@ -169,6 +169,8 @@ void aeAcquireLock();
int aeTryAcquireLock(int fWeak);
void aeReleaseLock();
int aeThreadOwnsLock();
int aeLockContested(int threshold);
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock
#ifdef __cplusplus
}

View File

@ -2371,7 +2371,8 @@ void parseClientCommandBuffer(client *c) {
}
/* Prefetch outside the lock for better perf */
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() &&
(g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2)) && !GlobalLocksAcquired()) {
auto &query = c->vecqueuedcmd.back();
if (query.argc > 0 && query.argc == query.argcMax) {
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {

View File

@ -2452,6 +2452,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
// Measure lock contention from a different thread to be more accurate
g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->rglockSamples[g_pserver->ilockRingHead] = (uint16_t)aeLockContention();
++g_pserver->ilockRingHead;
if (g_pserver->ilockRingHead >= redisServer::s_lockContentionSamples)
g_pserver->ilockRingHead = 0;
});
g_pserver->cronloops++;
return 1000/g_pserver->hz;
}
@ -5138,6 +5146,11 @@ sds genRedisInfoString(const char *section) {
/* Stats */
if (allsections || defsections || !strcasecmp(section,"stats")) {
double avgLockContention = 0;
for (unsigned i = 0; i < redisServer::s_lockContentionSamples; ++i)
avgLockContention += g_pserver->rglockSamples[i];
avgLockContention /= redisServer::s_lockContentionSamples;
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Stats\r\n"
@ -5173,7 +5186,9 @@ sds genRedisInfoString(const char *section) {
"tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n"
"total_reads_processed:%lld\r\n"
"total_writes_processed:%lld\r\n",
"total_writes_processed:%lld\r\n"
"instantaneous_lock_contention:%d\r\n"
"avg_lock_contention:%f\r\n",
g_pserver->stat_numconnections,
g_pserver->stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -5206,7 +5221,9 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalPrefixes(),
g_pserver->stat_unexpected_error_replies,
g_pserver->stat_total_reads_processed.load(std::memory_order_relaxed),
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed));
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed),
aeLockContention(),
avgLockContention);
}
/* Replication */

View File

@ -2404,6 +2404,11 @@ struct redisServer {
long long repl_batch_offStart = -1;
long long repl_batch_idxStart = -1;
/* Lock Contention Ring Buffer */
static const size_t s_lockContentionSamples = 64;
uint16_t rglockSamples[s_lockContentionSamples];
unsigned ilockRingHead = 0;
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
};