diff --git a/src/db.cpp b/src/db.cpp index 65cec482f..4d14d35a1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -42,6 +42,9 @@ int keyIsExpired(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -565,6 +568,11 @@ void flushdbCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } c->db->removeAllCachedValues(); addReply(c,shared.ok); return; @@ -587,6 +595,11 @@ void flushallCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->removeAllCachedValues(); addReply(c,shared.ok); @@ -2100,9 +2113,24 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database { m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(this, sdsKey, data, cb); + size_t offset = 0; + sds sdsNewKey = sdsdupshared(sdsKey); + auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); + robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); - dictAdd(m_pdict, sdsdupshared(sdsKey), o); + dictAdd(m_pdict, sdsNewKey, o); + + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sdsKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); }); *pde = dictFind(m_pdict, sdsKey); } @@ -2117,7 +2145,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) { - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, szKey, o); m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2162,7 +2190,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() if (de == nullptr) continue; robj *o = (robj*)dictGetVal(de); - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); } } @@ -2273,4 +2301,86 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate) ++m_cnewKeysPending; } } +} + +sds serializeExpire(const expireEntry *pexpire) +{ + sds str = sdsnewlen(nullptr, sizeof(unsigned)); + + if (pexpire == nullptr) + { + unsigned zero = 0; + memcpy(str, &zero, sizeof(unsigned)); + return str; + } + + auto &e = *pexpire; + unsigned celem = (unsigned)e.size(); + memcpy(str, &celem, sizeof(unsigned)); + + for (auto itr = e.begin(); itr != e.end(); ++itr) + { + unsigned subkeylen = itr.subkey() ? (unsigned)sdslen(itr.subkey()) : 0; + size_t strOffset = sdslen(str); + str = sdsgrowzero(str, sdslen(str) + sizeof(unsigned) + subkeylen + sizeof(long long)); + memcpy(str + strOffset, &subkeylen, sizeof(unsigned)); + if (itr.subkey()) + memcpy(str + strOffset + sizeof(unsigned), itr.subkey(), subkeylen); + long long when = itr.when(); + memcpy(str + strOffset + sizeof(unsigned) + subkeylen, &when, sizeof(when)); + } + return str; +} + +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset) +{ + unsigned celem; + if (cch < sizeof(unsigned)) + throw "Corrupt expire entry"; + memcpy(&celem, str, sizeof(unsigned)); + std::unique_ptr spexpire; + + size_t offset = sizeof(unsigned); + for (; celem > 0; --celem) + { + serverAssert(cch > (offset+sizeof(unsigned))); + + unsigned subkeylen; + memcpy(&subkeylen, str + offset, sizeof(unsigned)); + offset += sizeof(unsigned); + + sds subkey = nullptr; + if (subkeylen != 0) + { + serverAssert(cch > (offset + subkeylen)); + subkey = sdsnewlen(nullptr, subkeylen); + memcpy(subkey, str + offset, subkeylen); + offset += subkeylen; + } + + long long when; + serverAssert(cch >= (offset + sizeof(long long))); + memcpy(&when, str + offset, sizeof(long long)); + offset += sizeof(long long); + + if (spexpire == nullptr) + spexpire = std::make_unique(key, subkey, when); + else + spexpire->update(subkey, when); + } + + *poffset = offset; + return spexpire; +} + +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o) +{ + auto itrExpire = db->setexpire()->find(key); + const expireEntry *pexpire = nullptr; + if (itrExpire != db->setexpire()->end()) + pexpire = &(*itrExpire); + + sds str = serializeExpire(pexpire); + str = serializeStoredObject(o, str); + return str; } \ No newline at end of file diff --git a/src/defrag.cpp b/src/defrag.cpp index 242acd184..4b4b38997 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,7 +407,7 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); if (itr != set.end()) { @@ -415,7 +415,10 @@ void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); + serverAssert(set.find(newkey) != set.end()); + return true; } + return false; } long activeDefragQuickListNodes(quicklist *ql) { @@ -777,18 +780,21 @@ long defragKey(redisDb *db, dictEntry *de) { long defragged = 0; sds newsds; + ob = (robj*)dictGetVal(de); + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) + { defragged++, de->key = newsds; - if (!db->setexpire()->empty()) { - replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); + if (!db->setexpire()->empty()) { + bool fReplaced = replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); + serverAssert(fReplaced == ob->FExpires()); + } else { + serverAssert(!ob->FExpires()); + } } - /* Try to defrag robj and / or string value. */ - ob = (robj*)dictGetVal(de); - if (ob == nullptr) - return defragged; if ((newob = activeDefragStringOb(ob, &defragged))) { de->v.val = newob; ob = newob; @@ -841,6 +847,7 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown object type"); } + return defragged; } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 9acc854b3..afa4e4b36 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -293,7 +293,7 @@ extern "C" void fastlock_lock(struct fastlock *lock) #elif defined(__arm__) __asm__ __volatile__ ("yield"); #endif - if ((++cloops % 0x10000) == 0) + if ((++cloops % 0x100000) == 0) { fastlock_sleep(lock, tid, ticketT.u, mask); } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index f7ab6316e..7c9990a6d 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -45,7 +45,7 @@ fastlock_lock: cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause - add ecx, 0x10000 # Have we been waiting a long time? (oflow if we have) + add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) jnc .LLoop # If so, give up our timeslice to someone who's doing real work # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" diff --git a/src/object.cpp b/src/object.cpp index bfbd1524f..450e202c9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1619,20 +1619,30 @@ robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, return o; } -sds serializeStoredObject(robj_roptr o) +sds serializeStoredObject(robj_roptr o, sds sdsPrefix) { switch (o->type) { case OBJ_STRING: { - sds sdsT = sdsnewlen(nullptr, 1); - sdsT[0] = RDB_TYPE_STRING; + sds sdsT = nullptr; + if (sdsPrefix) + sdsT = sdsgrowzero(sdsPrefix, sdslen(sdsPrefix)+1); + else + sdsT = sdsnewlen(nullptr, 1); + sdsT[sdslen(sdsT)-1] = RDB_TYPE_STRING; return serializeStoredStringObject(sdsT, o); } default: rio rdb; createDumpPayload(&rdb,o,nullptr); + if (sdsPrefix) + { + sds rval = sdscatsds(sdsPrefix, (sds)rdb.io.buffer.ptr); + sdsfree((sds)rdb.io.buffer.ptr); + return rval; + } return (sds)rdb.io.buffer.ptr; } serverPanic("Attempting to store unknown object type"); diff --git a/src/replication.cpp b/src/replication.cpp index f4f5b8c0b..44fa91452 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -985,6 +985,19 @@ LError: return; } +void processReplconfLicense(client *c, robj *arg) +{ + if (cserver.license_key != nullptr) + { + if (strcmp(cserver.license_key, szFromObj(arg)) == 0) { + addReplyError(c, "Each replica must have a unique license key"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + } + addReply(c, shared.ok); +} + /* REPLCONF