improve overwrite key performance

Former-commit-id: 56f9d5528385ea78074a308c6d3987b920d6cc35
This commit is contained in:
malavan 2021-09-14 17:06:04 +00:00
parent 8210d67c24
commit 86784fe9ba
5 changed files with 28 additions and 14 deletions

View File

@ -242,7 +242,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o; return o;
} }
bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false) { bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdupshared(key); sds copy = sdsdupshared(key);
@ -251,7 +251,7 @@ bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNe
setMvccTstamp(val, mvcc); setMvccTstamp(val, mvcc);
} }
bool fInserted = db->insert(copy, val, fAssumeNew); bool fInserted = db->insert(copy, val, fAssumeNew, piterExisting);
if (fInserted) if (fInserted)
{ {
@ -321,8 +321,12 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, sds keySds, robj *val, bool fUp
* This function does not modify the expire time of the existing key. * This function does not modify the expire time of the existing key.
* *
* The program is aborted if the key was not already present. */ * The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire) { void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire, dict_iter *pitrExisting) {
auto itr = db->find(key); redisDb::iter itr;
if (pitrExisting != nullptr)
itr = *pitrExisting;
else
itr = db->find(key);
serverAssertWithInfo(NULL,key,itr != nullptr); serverAssertWithInfo(NULL,key,itr != nullptr);
lookupKeyUpdateObj(itr.val(), LOOKUP_NONE); lookupKeyUpdateObj(itr.val(), LOOKUP_NONE);
@ -366,8 +370,9 @@ int dbMerge(redisDb *db, sds key, robj *val, int fReplace)
* in a context where there is no clear client performing the operation. */ * in a context where there is no clear client performing the operation. */
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
db->prepOverwriteForSnapshot(szFromObj(key)); db->prepOverwriteForSnapshot(szFromObj(key));
if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */)) { dict_iter iter;
dbOverwrite(db, key, val, !keepttl); if (!dbAddCore(db, szFromObj(key), val, true /* fUpdateMvcc */, false /*fAssumeNew*/, &iter)) {
dbOverwrite(db, key, val, !keepttl, &iter);
} }
incrRefCount(val); incrRefCount(val);
if (signal) signalModifiedKey(c,db,key); if (signal) signalModifiedKey(c,db,key);
@ -2594,11 +2599,12 @@ void redisDb::storageProviderInitialize()
} }
} }
bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew) bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_iter *piterExisting)
{ {
if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr)) if (!fAssumeNew && (g_pserver->m_pstorageFactory != nullptr || m_pdbSnapshot != nullptr))
ensure(key); ensure(key);
int res = dictAdd(m_pdict, key, o); dictEntry *de;
int res = dictAdd(m_pdict, key, o, &de);
serverAssert(FImplies(fAssumeNew, res == DICT_OK)); serverAssert(FImplies(fAssumeNew, res == DICT_OK));
if (res == DICT_OK) if (res == DICT_OK)
{ {
@ -2610,6 +2616,11 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)
#endif #endif
trackkey(key, false /* fUpdate */); trackkey(key, false /* fUpdate */);
} }
else
{
if (piterExisting)
*piterExisting = dict_iter(m_pdict, de);
}
return (res == DICT_OK); return (res == DICT_OK);
} }

View File

@ -573,9 +573,9 @@ static void _dictRehashStep(dict *d) {
} }
/* Add an element to the target hash table */ /* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val) int dictAdd(dict *d, void *key, void *val, dictEntry **existing)
{ {
dictEntry *entry = dictAddRaw(d,key,NULL); dictEntry *entry = dictAddRaw(d,key,existing);
if (!entry) return DICT_ERR; if (!entry) return DICT_ERR;
dictSetVal(d, entry, val); dictSetVal(d, entry, val);

View File

@ -205,7 +205,7 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
dict *dictCreate(dictType *type, void *privDataPtr); dict *dictCreate(dictType *type, void *privDataPtr);
int dictExpand(dict *d, unsigned long size, bool fShrink = false); int dictExpand(dict *d, unsigned long size, bool fShrink = false);
int dictTryExpand(dict *d, unsigned long size, bool fShrink); int dictTryExpand(dict *d, unsigned long size, bool fShrink);
int dictAdd(dict *d, void *key, void *val); int dictAdd(dict *d, void *key, void *val, dictEntry **existing = nullptr);
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing); dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
dictEntry *dictAddOrFind(dict *d, void *key); dictEntry *dictAddOrFind(dict *d, void *key);
int dictReplace(dict *d, void *key, void *val); int dictReplace(dict *d, void *key, void *val);

View File

@ -2086,7 +2086,7 @@ void databasesCron(bool fMainThread) {
/* Perform hash tables rehashing if needed, but only if there are no /* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad * other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */ * as will cause a lot of copy-on-write of memory pages. */
if (!hasActiveChildProcess() || g_pserver->FRdbSaveInProgress()) { if (!(hasActiveChildProcess() || g_pserver->FRdbSaveInProgress())) {
/* We use global counters so if we stop the computation at a given /* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next * DB we'll be able to start from the successive in the next
* cron loop iteration. */ * cron loop iteration. */

View File

@ -1067,6 +1067,9 @@ class dict_iter : public dict_const_iter
{ {
dict *m_dict = nullptr; dict *m_dict = nullptr;
public: public:
dict_iter()
: dict_const_iter(nullptr)
{}
explicit dict_iter(nullptr_t) explicit dict_iter(nullptr_t)
: dict_const_iter(nullptr) : dict_const_iter(nullptr)
{} {}
@ -1131,7 +1134,7 @@ public:
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); }
void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); }
bool insert(char *k, robj *o, bool fAssumeNew = false); bool insert(char *k, robj *o, bool fAssumeNew = false, dict_iter *existing = nullptr);
void tryResize(); void tryResize();
int incrementallyRehash(); int incrementallyRehash();
void updateValue(dict_iter itr, robj *val); void updateValue(dict_iter itr, robj *val);
@ -3325,7 +3328,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NONOTIFY (1<<1) #define LOOKUP_NONOTIFY (1<<1)
#define LOOKUP_UPDATEMVCC (1<<2) #define LOOKUP_UPDATEMVCC (1<<2)
void dbAdd(redisDb *db, robj *key, robj *val); void dbAdd(redisDb *db, robj *key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire = false); void dbOverwrite(redisDb *db, robj *key, robj *val, bool fRemoveExpire = false, dict_iter *pitrExisting = nullptr);
int dbMerge(redisDb *db, sds key, robj *val, int fReplace); int dbMerge(redisDb *db, sds key, robj *val, int fReplace);
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal); void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(client *c, redisDb *db, robj *key, robj *val); void setKey(client *c, redisDb *db, robj *key, robj *val);