Fix remaining test failures

Former-commit-id: 37e607f9b13b6601ff52e74e613fb369cab22b56
This commit is contained in:
John Sully 2021-05-03 02:37:02 +00:00
parent 4677c62428
commit 5583fbb838
4 changed files with 105 additions and 85 deletions

View File

@ -867,7 +867,7 @@ void aeReleaseLock()
g_lock.unlock();
}
void aeSetThreadOwnsLockOverride(bool fOverride)
void aeSetThreadOwnsLockOverride(int fOverride)
{
fOwnLockOverride = fOverride;
}

View File

@ -169,7 +169,7 @@ void aeAcquireLock();
int aeTryAcquireLock(int fWeak);
void aeReleaseLock();
int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(bool fOverride);
void aeSetThreadOwnsLockOverride(int fOverride);
int aeLockContested(int threshold);
int aeLockContention(); // returns the number of instantaneous threads waiting on the lock

View File

@ -519,7 +519,12 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
if (len == RDB_LENERR) return NULL;
if (plain || sds) {
void *buf = plain ? zmalloc(len, MALLOC_SHARED) : sdsnewlen(SDS_NOINIT,len);
ssize_t lenSigned = (ssize_t)len;
if (flags & RDB_LOAD_SDS_SHARED)
lenSigned = -lenSigned;
void *buf = plain ? zmalloc(len, MALLOC_SHARED) : sdsnewlen(SDS_NOINIT, lenSigned);
if (buf == nullptr)
return nullptr;
if (lenptr) *lenptr = len;
if (len && rioRead(rdb,buf,len) == 0) {
if (plain)
@ -2355,30 +2360,29 @@ class rdbAsyncWorkThread
std::vector<std::function<void()>> queuefn; // for custom jobs
std::mutex mutex;
std::condition_variable cv;
bool fLaunched = false;
bool fExit = false;
std::atomic<size_t> ckeysLoaded;
std::thread m_thread;
list *clients_pending_async_write = nullptr;
long long now;
public:
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags)
: rsi(rsi), rdbflags(rdbflags)
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now)
: rsi(rsi), rdbflags(rdbflags), now(now)
{
ckeysLoaded = 0;
}
~rdbAsyncWorkThread() {
if (!fExit && m_thread.joinable())
endWork();
if (clients_pending_async_write)
listRelease(clients_pending_async_write);
if (m_thread.joinable())
endWork();
}
void start() {
if (clients_pending_async_write == nullptr)
clients_pending_async_write = listCreate();
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this, clients_pending_async_write);
serverAssert(!fLaunched);
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this);
fLaunched = true;
}
void enqueue(rdbInsertJob &job) {
@ -2401,25 +2405,28 @@ public:
size_t endWork() {
std::unique_lock<std::mutex> l(mutex);
serverAssert(fLaunched);
fExit = true;
cv.notify_one();
l.unlock();
m_thread.join();
listJoin(serverTL->clients_pending_asyncwrite, clients_pending_async_write);
ProcessPendingAsyncWrites();
fLaunched = false;
fExit = false;
serverAssert(queuejobs.empty());
serverAssert(queuefn.empty());
return ckeysLoaded;
}
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue, list *clients_pending_asyncwrite) {
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) {
rdbAsyncWorkThread &queue = *pqueue;
redisServerThreadVars vars;
vars.clients_pending_asyncwrite = clients_pending_asyncwrite;
redisServerThreadVars vars = {};
vars.clients_pending_asyncwrite = listCreate();
serverTL = &vars;
aeSetThreadOwnsLockOverride(true);
for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex);
if (queue.queuejobs.empty() && queue.queuefn.empty()) {
if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit)
if (queue.fExit)
break;
queue.cv.wait(lock);
if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit)
@ -2430,47 +2437,70 @@ public:
queue.queuejobs.reserve(1024);
auto queuefn = std::move(queue.queuefn);
lock.unlock();
for (auto &fn : queuefn) {
fn();
}
bool f1024thKey = false;
for (auto &job : queuejobs) {
redisObjectStack keyobj;
initStaticStringObject(keyobj,job.key);
/* Add the new object in the hash table */
int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
bool fStaleMvccKey = (pqueue->rsi) ? mvccFromObj(job.val) < pqueue->rsi->mvccMinThreshold : false;
if (fInserted)
{
auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = f1024thKey || (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
{
setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime);
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(pqueue->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != -1 && job.expiretime < pqueue->now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && pqueue->rsi != nullptr && pqueue->rsi->mi != nullptr && pqueue->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
pqueue->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
}
else
{
sdsfree(job.key);
job.key = nullptr;
decrRefCount(job.val);
job.val = nullptr;
} else {
/* Add the new object in the hash table */
int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
if (fInserted)
{
auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = f1024thKey || (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
{
setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
}
else
{
decrRefCount(job.val);
}
}
if (job.key != nullptr)
{
sdsfree(job.key);
}
}
for (auto &fn : queuefn) {
fn();
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
@ -2494,6 +2524,11 @@ public:
}
}
}
std::unique_lock<std::mutex> lock(queue.mutex);
serverAssert(queue.queuefn.empty());
serverAssert(queue.queuejobs.empty());
ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite);
aeSetThreadOwnsLockOverride(false);
}
};
@ -2547,7 +2582,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
rdbAsyncWorkThread wqueue(rsi, rdbflags);
now = mstime();
rdbAsyncWorkThread wqueue(rsi, rdbflags, now);
robj *subexpireKey = nullptr;
sds key = nullptr;
bool fLastKeyExpired = false;
@ -2574,7 +2610,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
return C_ERR;
}
now = mstime();
lru_clock = LRU_CLOCK();
wqueue.start();
@ -2711,12 +2746,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
else {
long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
wqueue.enqueue([dbCur, subexpireKey, key, expireT]{
sds keyT = sdsdupshared(key);
wqueue.enqueue([dbCur, subexpireKey, keyT, expireT]{
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
initStaticStringObject(keyobj,keyT);
setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey);
sdsfree(keyT);
});
subexpireKey = nullptr;
}
@ -2790,7 +2827,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
}
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS_SHARED,NULL)) == NULL)
goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key,mvcc_tstamp)) == NULL) {
@ -2798,45 +2835,27 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr;
goto eoferr;
}
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
rdbInsertJob job;
job.db = dbCur;
job.key = sdsdupshared(key);
job.val = val;
job.lru_clock = lru_clock;
job.expiretime = expiretime;
job.lru_idle = lru_idle;
job.lfu_freq = lfu_freq;
wqueue.enqueue(job);
val = nullptr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) {
#if 0 // TODO!
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(key, sdslen(key)));
rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup);
}
fLastKeyExpired = true;
sdsfree(key);
key = nullptr;
decrRefCount(val);
val = nullptr;
#endif
} else {
fLastKeyExpired = false;
rdbInsertJob job;
job.db = dbCur;
job.key = key;
job.val = val;
job.lru_clock = lru_clock;
job.expiretime = expiretime;
job.lru_idle = lru_idle;
job.lfu_freq = lfu_freq;
wqueue.enqueue(job);
key = nullptr;
val = nullptr;
}
fLastKeyExpired = fStaleMvccKey || fExpiredKey;
if (g_pserver->key_load_delay)
usleep(g_pserver->key_load_delay);

View File

@ -119,10 +119,11 @@
#define RDB_MODULE_OPCODE_STRING 5 /* String. */
/* rdbLoad...() functions flags. */
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_NONE 0
#define RDB_LOAD_ENC (1<<0)
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_LOAD_SDS_SHARED ((1 << 3) | RDB_LOAD_SDS)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0 /* No special RDB loading. */