Don't prefetch when lock contention is low, it increases latency
Former-commit-id: 9b2629f6a20368cec8e55f0d006f3a67c8b770b7
This commit is contained in:
parent
5b53c8e88e
commit
c070f6ece2
16
src/ae.cpp
16
src/ae.cpp
@ -870,3 +870,19 @@ int aeThreadOwnsLock()
|
|||||||
{
|
{
|
||||||
return g_lock.fOwnLock();
|
return g_lock.fOwnLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int aeLockContested(int threshold)
|
||||||
|
{
|
||||||
|
return g_lock.m_ticket.m_active < static_cast<uint16_t>(g_lock.m_ticket.m_avail - threshold);
|
||||||
|
}
|
||||||
|
|
||||||
|
int aeLockContention()
|
||||||
|
{
|
||||||
|
ticket ticketT;
|
||||||
|
__atomic_load(&g_lock.m_ticket.u, &ticketT.u, __ATOMIC_RELAXED);
|
||||||
|
int32_t avail = ticketT.m_avail;
|
||||||
|
int32_t active = ticketT.m_active;
|
||||||
|
if (avail < active)
|
||||||
|
avail += 0x10000;
|
||||||
|
return avail - active;
|
||||||
|
}
|
2
src/ae.h
2
src/ae.h
@ -169,6 +169,8 @@ void aeAcquireLock();
|
|||||||
int aeTryAcquireLock(int fWeak);
|
int aeTryAcquireLock(int fWeak);
|
||||||
void aeReleaseLock();
|
void aeReleaseLock();
|
||||||
int aeThreadOwnsLock();
|
int aeThreadOwnsLock();
|
||||||
|
int aeLockContested(int threshold);
|
||||||
|
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
@ -2371,7 +2371,8 @@ void parseClientCommandBuffer(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Prefetch outside the lock for better perf */
|
/* Prefetch outside the lock for better perf */
|
||||||
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) {
|
if (g_pserver->prefetch_enabled && cqueriesStart < c->vecqueuedcmd.size() &&
|
||||||
|
(g_pserver->m_pstorageFactory || aeLockContested(cserver.cthreads/2)) && !GlobalLocksAcquired()) {
|
||||||
auto &query = c->vecqueuedcmd.back();
|
auto &query = c->vecqueuedcmd.back();
|
||||||
if (query.argc > 0 && query.argc == query.argcMax) {
|
if (query.argc > 0 && query.argc == query.argcMax) {
|
||||||
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
|
if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) {
|
||||||
|
@ -2452,6 +2452,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Measure lock contention from a different thread to be more accurate
|
||||||
|
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
||||||
|
g_pserver->rglockSamples[g_pserver->ilockRingHead] = (uint16_t)aeLockContention();
|
||||||
|
++g_pserver->ilockRingHead;
|
||||||
|
if (g_pserver->ilockRingHead >= redisServer::s_lockContentionSamples)
|
||||||
|
g_pserver->ilockRingHead = 0;
|
||||||
|
});
|
||||||
|
|
||||||
g_pserver->cronloops++;
|
g_pserver->cronloops++;
|
||||||
return 1000/g_pserver->hz;
|
return 1000/g_pserver->hz;
|
||||||
}
|
}
|
||||||
@ -5138,6 +5146,11 @@ sds genRedisInfoString(const char *section) {
|
|||||||
|
|
||||||
/* Stats */
|
/* Stats */
|
||||||
if (allsections || defsections || !strcasecmp(section,"stats")) {
|
if (allsections || defsections || !strcasecmp(section,"stats")) {
|
||||||
|
double avgLockContention = 0;
|
||||||
|
for (unsigned i = 0; i < redisServer::s_lockContentionSamples; ++i)
|
||||||
|
avgLockContention += g_pserver->rglockSamples[i];
|
||||||
|
avgLockContention /= redisServer::s_lockContentionSamples;
|
||||||
|
|
||||||
if (sections++) info = sdscat(info,"\r\n");
|
if (sections++) info = sdscat(info,"\r\n");
|
||||||
info = sdscatprintf(info,
|
info = sdscatprintf(info,
|
||||||
"# Stats\r\n"
|
"# Stats\r\n"
|
||||||
@ -5173,7 +5186,9 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"tracking_total_prefixes:%lld\r\n"
|
"tracking_total_prefixes:%lld\r\n"
|
||||||
"unexpected_error_replies:%lld\r\n"
|
"unexpected_error_replies:%lld\r\n"
|
||||||
"total_reads_processed:%lld\r\n"
|
"total_reads_processed:%lld\r\n"
|
||||||
"total_writes_processed:%lld\r\n",
|
"total_writes_processed:%lld\r\n"
|
||||||
|
"instantaneous_lock_contention:%d\r\n"
|
||||||
|
"avg_lock_contention:%f\r\n",
|
||||||
g_pserver->stat_numconnections,
|
g_pserver->stat_numconnections,
|
||||||
g_pserver->stat_numcommands,
|
g_pserver->stat_numcommands,
|
||||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||||
@ -5206,7 +5221,9 @@ sds genRedisInfoString(const char *section) {
|
|||||||
(unsigned long long) trackingGetTotalPrefixes(),
|
(unsigned long long) trackingGetTotalPrefixes(),
|
||||||
g_pserver->stat_unexpected_error_replies,
|
g_pserver->stat_unexpected_error_replies,
|
||||||
g_pserver->stat_total_reads_processed.load(std::memory_order_relaxed),
|
g_pserver->stat_total_reads_processed.load(std::memory_order_relaxed),
|
||||||
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed));
|
g_pserver->stat_total_writes_processed.load(std::memory_order_relaxed),
|
||||||
|
aeLockContention(),
|
||||||
|
avgLockContention);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replication */
|
/* Replication */
|
||||||
|
@ -2404,6 +2404,11 @@ struct redisServer {
|
|||||||
long long repl_batch_offStart = -1;
|
long long repl_batch_offStart = -1;
|
||||||
long long repl_batch_idxStart = -1;
|
long long repl_batch_idxStart = -1;
|
||||||
|
|
||||||
|
/* Lock Contention Ring Buffer */
|
||||||
|
static const size_t s_lockContentionSamples = 64;
|
||||||
|
uint16_t rglockSamples[s_lockContentionSamples];
|
||||||
|
unsigned ilockRingHead = 0;
|
||||||
|
|
||||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user