Rework the tuning to be better for mixed read/write workloads
Former-commit-id: a4fdd3e3cb41160b20e92e1b1f4f4ebd2ee86a4a
This commit is contained in:
parent
3294e4f788
commit
154c9d1f79
@ -220,12 +220,16 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
||||
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
||||
AeLocker locker;
|
||||
locker.arm(serverTL->current_client);
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr)
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true);
|
||||
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||
} else {
|
||||
serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true);
|
||||
}
|
||||
if (serverTL->rgdbSnapshot[idb] == nullptr) {
|
||||
// We still need to service the read
|
||||
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||
serverTL->disable_async_commands = true; // don't try this again
|
||||
}
|
||||
}
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
|
@ -2712,8 +2712,17 @@ void readQueryFromClient(connection *conn) {
|
||||
|
||||
if (cserver.cthreads > 1) {
|
||||
parseClientCommandBuffer(c);
|
||||
if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode))
|
||||
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
|
||||
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
|
||||
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
|
||||
// so we exclude them unless the snapshot we need already exists
|
||||
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
|
||||
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2);
|
||||
|
||||
// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
|
||||
if (!fWriteTooRecent || fSnapshotExists) {
|
||||
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
|
||||
}
|
||||
}
|
||||
if (!c->vecqueuedcmd.empty())
|
||||
serverTL->vecclientsProcess.push_back(c);
|
||||
} else {
|
||||
|
@ -2071,14 +2071,6 @@ int hash_spin_worker() {
|
||||
void databasesCron(bool fMainThread) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* end any snapshots created by fast async commands */
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (fMainThread) {
|
||||
/* Expire keys by random sampling. Not required for slaves
|
||||
* as master will synthesize DELs for us. */
|
||||
@ -2792,6 +2784,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
|
||||
locker.arm();
|
||||
|
||||
/* end any snapshots created by fast async commands */
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
size_t zmalloc_used = zmalloc_used_memory();
|
||||
if (zmalloc_used > g_pserver->stat_peak_memory)
|
||||
g_pserver->stat_peak_memory = zmalloc_used;
|
||||
@ -2993,6 +2993,8 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
g_pserver->db[idb]->trackChanges(false);
|
||||
|
||||
serverTL->disable_async_commands = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1333,7 +1333,6 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
||||
using redisDbPersistentData::commitChanges;
|
||||
using redisDbPersistentData::setexpireUnsafe;
|
||||
using redisDbPersistentData::setexpire;
|
||||
using redisDbPersistentData::createSnapshot;
|
||||
using redisDbPersistentData::endSnapshot;
|
||||
using redisDbPersistentData::restoreSnapshot;
|
||||
using redisDbPersistentData::removeAllCachedValues;
|
||||
@ -1344,6 +1343,13 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
||||
using redisDbPersistentData::FRehashing;
|
||||
|
||||
public:
|
||||
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
|
||||
auto psnapshot = redisDbPersistentData::createSnapshot(mvccCheckpoint, fOptional);
|
||||
if (psnapshot != nullptr)
|
||||
mvccLastSnapshot = psnapshot->mvccCheckpoint();
|
||||
return psnapshot;
|
||||
}
|
||||
|
||||
expireset::setiter expireitr;
|
||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
||||
dict *ready_keys; /* Blocked keys that received a PUSH */
|
||||
@ -2026,6 +2032,7 @@ struct redisServerThreadVars {
|
||||
bool fRetrySetAofEvent = false;
|
||||
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
||||
the thread went to sleep? */
|
||||
bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */
|
||||
std::vector<client*> vecclientsProcess;
|
||||
dictAsyncRehashCtl *rehashCtl = nullptr;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user