Merge branch 'async_commands' of https://gitlab.eqalpha.com/keydb-dev/KeyDB-Pro into async_commands
Former-commit-id: a19112ec839da4684aeb1a9d1b41906f4c698944
This commit is contained in:
commit
d502437cf3
@ -219,12 +219,16 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
|||||||
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
locker.arm(serverTL->current_client);
|
locker.arm(serverTL->current_client);
|
||||||
if (serverTL->rgdbSnapshot[idb] != nullptr)
|
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||||
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||||
|
serverTL->rgdbSnapshot[idb] = nullptr;
|
||||||
|
} else {
|
||||||
serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true);
|
serverTL->rgdbSnapshot[idb] = db->createSnapshot(mvccCheckpoint, true);
|
||||||
|
}
|
||||||
if (serverTL->rgdbSnapshot[idb] == nullptr) {
|
if (serverTL->rgdbSnapshot[idb] == nullptr) {
|
||||||
// We still need to service the read
|
// We still need to service the read
|
||||||
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||||
|
serverTL->disable_async_commands = true; // don't try this again
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||||
@ -3159,6 +3163,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
|
|||||||
if (command.argc >= 2) {
|
if (command.argc >= 2) {
|
||||||
const char *cmd = szFromObj(command.argv[0]);
|
const char *cmd = szFromObj(command.argv[0]);
|
||||||
if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) {
|
if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) {
|
||||||
|
if (c->db->m_spdbSnapshotHOLDER != nullptr)
|
||||||
|
return; // this is dangerous enough without a snapshot around
|
||||||
auto h = dictSdsHash(szFromObj(command.argv[1]));
|
auto h = dictSdsHash(szFromObj(command.argv[1]));
|
||||||
for (int iht = 0; iht < 2; ++iht) {
|
for (int iht = 0; iht < 2; ++iht) {
|
||||||
auto hT = h & c->db->m_pdict->ht[iht].sizemask;
|
auto hT = h & c->db->m_pdict->ht[iht].sizemask;
|
||||||
|
@ -200,7 +200,7 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->paused_list_node = NULL;
|
c->paused_list_node = NULL;
|
||||||
c->client_tracking_redirection = 0;
|
c->client_tracking_redirection = 0;
|
||||||
c->casyncOpsPending = 0;
|
c->casyncOpsPending = 0;
|
||||||
c->mvccCheckpoint = 0;
|
c->mvccCheckpoint = getMvccTstamp();
|
||||||
c->master_error = 0;
|
c->master_error = 0;
|
||||||
memset(c->uuid, 0, UUID_BINARY_LEN);
|
memset(c->uuid, 0, UUID_BINARY_LEN);
|
||||||
|
|
||||||
@ -2712,8 +2712,17 @@ void readQueryFromClient(connection *conn) {
|
|||||||
|
|
||||||
if (cserver.cthreads > 1) {
|
if (cserver.cthreads > 1) {
|
||||||
parseClientCommandBuffer(c);
|
parseClientCommandBuffer(c);
|
||||||
if (g_pserver->enable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || g_fTestMode))
|
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
|
||||||
|
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
|
||||||
|
// so we exclude them unless the snapshot we need already exists
|
||||||
|
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
|
||||||
|
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < redisDbPersistentDataSnapshot::msStaleThreshold/2);
|
||||||
|
|
||||||
|
// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
|
||||||
|
if (!fWriteTooRecent || fSnapshotExists) {
|
||||||
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
|
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!c->vecqueuedcmd.empty())
|
if (!c->vecqueuedcmd.empty())
|
||||||
serverTL->vecclientsProcess.push_back(c);
|
serverTL->vecclientsProcess.push_back(c);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2070,6 +2070,7 @@ int hash_spin_worker() {
|
|||||||
* rehashing. */
|
* rehashing. */
|
||||||
void databasesCron(bool fMainThread) {
|
void databasesCron(bool fMainThread) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
if (fMainThread) {
|
if (fMainThread) {
|
||||||
/* Expire keys by random sampling. Not required for slaves
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
* as master will synthesize DELs for us. */
|
* as master will synthesize DELs for us. */
|
||||||
@ -2783,6 +2784,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
locker.arm();
|
locker.arm();
|
||||||
|
|
||||||
|
/* end any snapshots created by fast async commands */
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||||
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
g_pserver->db[idb]->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||||
@ -2991,6 +2993,8 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
|||||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
g_pserver->db[idb]->trackChanges(false);
|
g_pserver->db[idb]->trackChanges(false);
|
||||||
|
|
||||||
|
serverTL->disable_async_commands = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4402,6 +4406,9 @@ void call(client *c, int flags) {
|
|||||||
serverTL->commandsExecuted++;
|
serverTL->commandsExecuted++;
|
||||||
const long duration = elapsedUs(call_timer);
|
const long duration = elapsedUs(call_timer);
|
||||||
c->duration = duration;
|
c->duration = duration;
|
||||||
|
if (flags & CMD_CALL_ASYNC)
|
||||||
|
dirty = 0; // dirty is bogus in this case as there's no synchronization
|
||||||
|
else
|
||||||
dirty = g_pserver->dirty-dirty;
|
dirty = g_pserver->dirty-dirty;
|
||||||
if (dirty < 0) dirty = 0;
|
if (dirty < 0) dirty = 0;
|
||||||
|
|
||||||
|
12
src/server.h
12
src/server.h
@ -1263,6 +1263,8 @@ public:
|
|||||||
// These need to be fixed
|
// These need to be fixed
|
||||||
using redisDbPersistentData::size;
|
using redisDbPersistentData::size;
|
||||||
using redisDbPersistentData::expireSize;
|
using redisDbPersistentData::expireSize;
|
||||||
|
|
||||||
|
static const uint64_t msStaleThreshold = 500;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Redis database representation. There are multiple databases identified
|
/* Redis database representation. There are multiple databases identified
|
||||||
@ -1331,7 +1333,6 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::commitChanges;
|
using redisDbPersistentData::commitChanges;
|
||||||
using redisDbPersistentData::setexpireUnsafe;
|
using redisDbPersistentData::setexpireUnsafe;
|
||||||
using redisDbPersistentData::setexpire;
|
using redisDbPersistentData::setexpire;
|
||||||
using redisDbPersistentData::createSnapshot;
|
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
using redisDbPersistentData::restoreSnapshot;
|
using redisDbPersistentData::restoreSnapshot;
|
||||||
using redisDbPersistentData::removeAllCachedValues;
|
using redisDbPersistentData::removeAllCachedValues;
|
||||||
@ -1342,6 +1343,13 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::FRehashing;
|
using redisDbPersistentData::FRehashing;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
|
||||||
|
auto psnapshot = redisDbPersistentData::createSnapshot(mvccCheckpoint, fOptional);
|
||||||
|
if (psnapshot != nullptr)
|
||||||
|
mvccLastSnapshot = psnapshot->mvccCheckpoint();
|
||||||
|
return psnapshot;
|
||||||
|
}
|
||||||
|
|
||||||
expireset::setiter expireitr;
|
expireset::setiter expireitr;
|
||||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
||||||
dict *ready_keys; /* Blocked keys that received a PUSH */
|
dict *ready_keys; /* Blocked keys that received a PUSH */
|
||||||
@ -1350,6 +1358,7 @@ public:
|
|||||||
long long last_expire_set; /* when the last expire was set */
|
long long last_expire_set; /* when the last expire was set */
|
||||||
double avg_ttl; /* Average TTL, just for stats */
|
double avg_ttl; /* Average TTL, just for stats */
|
||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
|
uint64_t mvccLastSnapshot = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Declare database backup that include redis main DBs and slots to keys map.
|
/* Declare database backup that include redis main DBs and slots to keys map.
|
||||||
@ -2024,6 +2033,7 @@ struct redisServerThreadVars {
|
|||||||
bool fRetrySetAofEvent = false;
|
bool fRetrySetAofEvent = false;
|
||||||
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
||||||
the thread went to sleep? */
|
the thread went to sleep? */
|
||||||
|
bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */
|
||||||
std::vector<client*> vecclientsProcess;
|
std::vector<client*> vecclientsProcess;
|
||||||
dictAsyncRehashCtl *rehashCtl = nullptr;
|
dictAsyncRehashCtl *rehashCtl = nullptr;
|
||||||
|
|
||||||
|
@ -654,9 +654,7 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const
|
|||||||
|
|
||||||
bool redisDbPersistentDataSnapshot::FStale() const
|
bool redisDbPersistentDataSnapshot::FStale() const
|
||||||
{
|
{
|
||||||
// 0.5 seconds considered stale;
|
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= redisDbPersistentDataSnapshot::msStaleThreshold;
|
||||||
static const uint64_t msStale = 500;
|
|
||||||
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dictGCAsyncFree(dictAsyncRehashCtl *async) {
|
void dictGCAsyncFree(dictAsyncRehashCtl *async) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user