diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d4cfc42a6..d908a5fcc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,9 +12,9 @@ jobs: submodules: true - name: make run: | + sudo apt-get update sudo apt-get -y remove libzstd || true sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev - sudo apt-get -y install uuid-dev libcurl4-openssl-dev make BUILD_TLS=yes -j2 - name: gen-cert run: ./utils/gen-test-certs.sh diff --git a/TLS.md b/TLS.md index ae1a066db..1afa2e8fa 100644 --- a/TLS.md +++ b/TLS.md @@ -56,8 +56,6 @@ Note that unlike Redis, KeyDB fully supports multithreading of TLS connections. To-Do List ---------- -- [ ] Add session caching support. Check if/how it's handled by clients to - assess how useful/important it is. - [ ] redis-benchmark support. The current implementation is a mix of using hiredis for parsing and basic networking (establishing connections), but directly manipulating sockets for most actions. This will need to be cleaned diff --git a/keydb.conf b/keydb.conf index bc7f90093..c0d3535ec 100644 --- a/keydb.conf +++ b/keydb.conf @@ -199,6 +199,22 @@ tcp-keepalive 300 # # tls-prefer-server-ciphers yes +# By default, TLS session caching is enabled to allow faster and less expensive +# reconnections by clients that support it. Use the following directive to disable +# caching. +# +# tls-session-caching no + +# Change the default number of TLS sessions cached. A zero value sets the cache +# to unlimited size. The default size is 20480. +# +# tls-session-cache-size 5000 + +# Change the default timeout of cached TLS sessions. The default timeout is 300 +# seconds. +# +# tls-session-cache-timeout 60 + ################################# GENERAL ##################################### # By default KeyDB does not run as a daemon. Use 'yes' if you need it. @@ -401,6 +417,20 @@ dir ./ # replica-serve-stale-data yes +# Active Replicas will allow read only data access while loading remote RDBs +# provided they are permitted to serve stale data. As an option you may also +# permit them to accept write commands. This is an EXPERIMENTAL feature and +# may result in commands not being fully synchronized +# +# allow-write-during-load no + +# You can modify the number of masters necessary to form a replica quorum when +# multi-master is enabled and replica-serve-stale-data is "no". By default +# this is set to -1 which implies the number of known masters (e.g. those +# you added with replicaof) +# +# replica-quorum -1 + # You can configure a replica instance to accept writes or not. Writing against # a replica instance may be useful to store some ephemeral data (because data # written on a replica will be easily deleted after resync with the master) but diff --git a/src/Makefile b/src/Makefile index d21a2052f..ce3ab9ab9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,11 +47,6 @@ endif USEASM?=true -ifeq ($(NOMVCC),) - CFLAGS+= -DENABLE_MVCC - CXXFLAGS+= -DENABLE_MVCC -endif - ifneq ($(SANITIZE),) CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE diff --git a/src/aelocker.h b/src/aelocker.h index de99b8991..01954dc12 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -9,11 +9,14 @@ public: { } - void arm(client *c) // if a client is passed, then the client is already locked + void arm(client *c, bool fIfNeeded = false) // if a client is passed, then the client is already locked { if (m_fArmed) return; + if (fIfNeeded && aeThreadOwnsLock()) + return; + serverAssertDebug(!GlobalLocksAcquired()); if (c != nullptr) diff --git a/src/aof.cpp b/src/aof.cpp index edb008399..83eb05506 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1432,8 +1432,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Iterate this DB writing every entry */ bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{ - robj key; - + redisObjectStack key; initStaticStringObject(key,(sds)keystr); /* Save the key and associated value */ diff --git a/src/cluster.cpp b/src/cluster.cpp index a9e1cc9ef..29eae6162 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4920,11 +4920,10 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) { rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key)); -#ifdef ENABLE_MVCC char szT[32]; - snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); + uint64_t mvcc = mvccFromObj(o); + snprintf(szT, 32, "%" PRIu64, mvcc); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); -#endif /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -5066,11 +5065,9 @@ void restoreCommand(client *c) { decrRefCount(auxkey); goto eoferr; } -#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { - obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + setMvccTstamp(obj, strtoull(szFromObj(auxval), nullptr, 10)); } -#endif decrRefCount(auxkey); decrRefCount(auxval); } diff --git a/src/config.cpp b/src/config.cpp index 303f496dd..e8c584344 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2256,7 +2256,7 @@ static int updateTlsCfg(char *val, char *prev, const char **err) { UNUSED(prev); UNUSED(err); if (tlsConfigure(&g_pserver->tls_ctx_config) == C_ERR) { - *err = "Unable to configure tls-cert-file. Check server logs."; + *err = "Unable to update TLS configuration. Check server logs."; return 0; } return 1; @@ -2266,6 +2266,12 @@ static int updateTlsCfgBool(int val, int prev, const char **err) { UNUSED(prev); return updateTlsCfg(NULL, NULL, err); } + +static int updateTlsCfgInt(long long val, long long prev, const char **err) { + UNUSED(val); + UNUSED(prev); + return updateTlsCfg(NULL, NULL, err); +} #endif /* USE_OPENSSL */ int fDummy = false; @@ -2306,9 +2312,10 @@ standardConfig configs[] = { createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL), - createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0,NULL, NULL), // Not applicable to KeyDB, just there for compatibility createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL), + createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL), + createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), @@ -2369,6 +2376,7 @@ standardConfig configs[] = { createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL), createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL), + createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), @@ -2406,10 +2414,13 @@ standardConfig configs[] = { #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, IMMUTABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* TCP port. */ + createIntConfig("tls-session-cache-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_size, 20*1024, INTEGER_CONFIG, NULL, updateTlsCfgInt), + createIntConfig("tls-session-cache-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_timeout, 300, INTEGER_CONFIG, NULL, updateTlsCfgInt), createBoolConfig("tls-cluster", NULL, MODIFIABLE_CONFIG, g_pserver->tls_cluster, 0, NULL, NULL), createBoolConfig("tls-replication", NULL, MODIFIABLE_CONFIG, g_pserver->tls_replication, 0, NULL, NULL), createBoolConfig("tls-auth-clients", NULL, MODIFIABLE_CONFIG, g_pserver->tls_auth_clients, 1, NULL, NULL), createBoolConfig("tls-prefer-server-ciphers", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.prefer_server_ciphers, 0, NULL, updateTlsCfgBool), + createBoolConfig("tls-session-caching", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.session_caching, 1, NULL, updateTlsCfgBool), createStringConfig("tls-cert-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.cert_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-key-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.key_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-dh-params-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.dh_params_file, NULL, NULL, updateTlsCfg), diff --git a/src/db.cpp b/src/db.cpp index 1cc7e66f7..add04e44c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -91,9 +91,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { robj *val = itr.val(); lookupKeyUpdateObj(val, flags); if (flags & LOOKUP_UPDATEMVCC) { -#ifdef ENABLE_MVCC - val->mvcc_tstamp = getMvccTstamp(); -#endif + setMvccTstamp(val, getMvccTstamp()); db->trackkey(key, true /* fUpdate */); } return val; @@ -220,10 +218,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { serverAssert(!val->FExpires()); sds copy = sdsdupshared(szFromObj(key)); -#ifdef ENABLE_MVCC - if (g_pserver->fActiveReplica) - val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); -#endif + + uint64_t mvcc = getMvccTstamp(); + setMvccTstamp(key, mvcc); + setMvccTstamp(val, mvcc); bool fInserted = db->insert(copy, val, fAssumeNew); @@ -274,9 +272,7 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); -#ifdef ENABLE_MVCC - val->mvcc_tstamp = getMvccTstamp(); -#endif + setMvccTstamp(val, getMvccTstamp()); } if (g_pserver->lazyfree_lazy_server_del) @@ -309,14 +305,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) if (itr == nullptr) return (dbAddCore(db, key, val) == true); -#ifdef ENABLE_MVCC robj *old = itr.val(); - if (old->mvcc_tstamp <= val->mvcc_tstamp) + if (mvccFromObj(old) <= mvccFromObj(val)) { db->dbOverwriteCore(itr, key, val, false, true); return true; } -#endif return false; } @@ -1635,7 +1629,7 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) /* Update the expire set */ db->setExpire(key, subkey, when); - int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } @@ -1670,7 +1664,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) kde.val()->SetFExpires(true); - int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } @@ -1724,7 +1718,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) { robj *argv[3]; - robj objT; redisCommand *cmd = nullptr; switch (type) { @@ -2498,9 +2491,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) sdsfree(strT); dictAdd(m_pdict, keyNew, objNew); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); -#ifdef ENABLE_MVCC - serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); -#endif + serverAssert(mvccFromObj(objNew) == mvccFromObj(itr.val())); } } else diff --git a/src/defrag.cpp b/src/defrag.cpp index 65aecfa34..657a82962 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); * returns NULL in case the allocatoin wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -void* activeDefragAlloc(void *ptr) { +template +TPTR* activeDefragAlloc(TPTR *ptr) { size_t size; void *newptr; if(!je_get_defrag_hint(ptr)) { @@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) { newptr = zmalloc_no_tcache(size); memcpy(newptr, ptr, size); zfree_no_tcache(ptr); - return newptr; + return (TPTR*)newptr; +} + +template<> +robj* activeDefragAlloc(robj *o) { + void *pvSrc = allocPtrFromObj(o); + void *pvDst = activeDefragAlloc(pvSrc); + return objFromAllocPtr(pvDst); } /*Defrag helper for sds strings diff --git a/src/expire.cpp b/src/expire.cpp index b442038d4..6316707ae 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -81,7 +81,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { robj *val = db->find(e.key()); int deleted = 0; - robj objKey; + redisObjectStack objKey; initStaticStringObject(objKey, (char*)e.key()); bool fTtlChanged = false; @@ -146,7 +146,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { serverAssert(false); } - robj objSubkey; + redisObjectStack objSubkey; initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); @@ -744,3 +744,103 @@ void touchCommand(client *c) { addReplyLongLong(c,touched); } +expireEntryFat::~expireEntryFat() +{ + if (m_dictIndex != nullptr) + dictRelease(m_dictIndex); +} + +expireEntryFat::expireEntryFat(const expireEntryFat &e) + : m_keyPrimary(e.m_keyPrimary), m_vecexpireEntries(e.m_vecexpireEntries) +{ + // Note: dictExpires is not copied +} + +expireEntryFat::expireEntryFat(expireEntryFat &&e) + : m_keyPrimary(std::move(e.m_keyPrimary)), m_vecexpireEntries(std::move(e.m_vecexpireEntries)) +{ + m_dictIndex = e.m_dictIndex; + e.m_dictIndex = nullptr; +} + +void expireEntryFat::createIndex() +{ + serverAssert(m_dictIndex == nullptr); + m_dictIndex = dictCreate(&keyptrDictType, nullptr); + + for (auto &entry : m_vecexpireEntries) + { + if (entry.spsubkey != nullptr) + { + dictEntry *de = dictAddRaw(m_dictIndex, (void*)entry.spsubkey.get(), nullptr); + de->v.s64 = entry.when; + } + } +} + +void expireEntryFat::expireSubKey(const char *szSubkey, long long when) +{ + if (m_vecexpireEntries.size() >= INDEX_THRESHOLD && m_dictIndex == nullptr) + createIndex(); + + // First check if the subkey already has an expiration + if (m_dictIndex != nullptr && szSubkey != nullptr) + { + dictEntry *de = dictFind(m_dictIndex, szSubkey); + if (de != nullptr) + { + auto itr = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), de->v.u64); + while (itr != m_vecexpireEntries.end() && itr->when == de->v.s64) + { + bool fFound = false; + if (szSubkey == nullptr && itr->spsubkey == nullptr) { + fFound = true; + } else if (szSubkey != nullptr && itr->spsubkey != nullptr && sdscmp((sds)itr->spsubkey.get(), (sds)szSubkey) == 0) { + fFound = true; + } + if (fFound) { + m_vecexpireEntries.erase(itr); + dictDelete(m_dictIndex, szSubkey); + break; + } + ++itr; + } + } + } + else + { + for (auto &entry : m_vecexpireEntries) + { + if (szSubkey != nullptr) + { + // if this is a subkey expiry then its not a match if the expireEntry is either for the + // primary key or a different subkey + if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0) + continue; + } + else + { + if (entry.spsubkey != nullptr) + continue; + } + m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data())); + break; + } + } + auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); + const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr; + auto itr = m_vecexpireEntries.emplace(itrInsert, when, subkey); + if (m_dictIndex && subkey) { + dictEntry *de = dictAddRaw(m_dictIndex, (void*)itr->spsubkey.get(), nullptr); + de->v.s64 = when; + } +} + +void expireEntryFat::popfrontExpireEntry() +{ + if (m_dictIndex != nullptr && m_vecexpireEntries.begin()->spsubkey) { + int res = dictDelete(m_dictIndex, (void*)m_vecexpireEntries.begin()->spsubkey.get()); + serverAssert(res == DICT_OK); + } + m_vecexpireEntries.erase(m_vecexpireEntries.begin()); +} \ No newline at end of file diff --git a/src/expire.h b/src/expire.h new file mode 100644 index 000000000..227d712d3 --- /dev/null +++ b/src/expire.h @@ -0,0 +1,256 @@ +#pragma once + +class expireEntryFat +{ + friend class expireEntry; + static const int INDEX_THRESHOLD = 16; +public: + struct subexpireEntry + { + long long when; + std::unique_ptr spsubkey; + + subexpireEntry(long long when, const char *subkey) + : when(when), spsubkey(subkey, sdsfree) + {} + + subexpireEntry(const subexpireEntry &e) + : when(e.when), spsubkey(nullptr, sdsfree) + { + if (e.spsubkey) + spsubkey = std::unique_ptr((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree); + } + + subexpireEntry(subexpireEntry &&e) = default; + subexpireEntry& operator=(subexpireEntry &&e) = default; + + bool operator<(long long when) const noexcept { return this->when < when; } + bool operator<(const subexpireEntry &se) { return this->when < se.when; } + }; + +private: + sdsimmutablestring m_keyPrimary; + std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key + dict *m_dictIndex = nullptr; + + void createIndex(); +public: + expireEntryFat(const sdsimmutablestring &keyPrimary) + : m_keyPrimary(keyPrimary) + {} + ~expireEntryFat(); + + expireEntryFat(const expireEntryFat &e); + expireEntryFat(expireEntryFat &&e); + + long long when() const noexcept { return m_vecexpireEntries.front().when; } + const char *key() const noexcept { return static_cast(m_keyPrimary); } + + bool operator<(long long when) const noexcept { return this->when() < when; } + + void expireSubKey(const char *szSubkey, long long when); + + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } + const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } + void popfrontExpireEntry(); + const subexpireEntry &operator[](size_t idx) const { return m_vecexpireEntries[idx]; } + size_t size() const noexcept { return m_vecexpireEntries.size(); } +}; + +class expireEntry { + struct + { + sdsimmutablestring m_key; + expireEntryFat *m_pfatentry = nullptr; + } u; + long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer + +public: + class iter + { + friend class expireEntry; + const expireEntry *m_pentry = nullptr; + size_t m_idx = 0; + + public: + iter(const expireEntry *pentry, size_t idx) + : m_pentry(pentry), m_idx(idx) + {} + + iter &operator++() { ++m_idx; return *this; } + + const char *subkey() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); + return nullptr; + } + long long when() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].when; + return m_pentry->when(); + } + + bool operator!=(const iter &other) + { + return m_idx != other.m_idx; + } + + const iter &operator*() const { return *this; } + }; + + expireEntry(sds key, const char *subkey, long long when) + { + if (subkey != nullptr) + { + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key))); + u.m_pfatentry->expireSubKey(subkey, when); + } + else + { + u.m_key = sdsimmutablestring(sdsdupshared(key)); + m_when = when; + } + } + + expireEntry(const expireEntry &e) + { + *this = e; + } + expireEntry(expireEntry &&e) + { + u.m_key = std::move(e.u.m_key); + u.m_pfatentry = std::move(e.u.m_pfatentry); + m_when = e.m_when; + e.m_when = 0; + e.u.m_pfatentry = nullptr; + } + + expireEntry(expireEntryFat *pfatentry) + { + u.m_pfatentry = pfatentry; + m_when = LLONG_MIN; + } + + ~expireEntry() + { + if (FFat()) + delete u.m_pfatentry; + } + + expireEntry &operator=(const expireEntry &e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + if (e.FFat()) + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); + return *this; + } + + void setKeyUnsafe(sds key) + { + if (FFat()) + u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key)); + else + u.m_key = sdsimmutablestring(sdsdupshared(key)); + } + + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } + expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } + const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; } + + + bool operator==(const sdsview &key) const noexcept + { + return key == this->key(); + } + + bool operator<(const expireEntry &e) const noexcept + { + return when() < e.when(); + } + bool operator<(long long when) const noexcept + { + return this->when() < when; + } + + const char *key() const noexcept + { + if (FFat()) + return u.m_pfatentry->key(); + return static_cast(u.m_key); + } + long long when() const noexcept + { + if (FFat()) + return u.m_pfatentry->when(); + return m_when; + } + + void update(const char *subkey, long long when) + { + if (!FFat()) + { + if (subkey == nullptr) + { + m_when = when; + return; + } + else + { + // we have to upgrade to a fat entry + long long whenT = m_when; + sdsimmutablestring keyPrimary = u.m_key; + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); + u.m_pfatentry->expireSubKey(nullptr, whenT); + // at this point we're fat so fall through + } + } + u.m_pfatentry->expireSubKey(subkey, when); + } + + iter begin() const { return iter(this, 0); } + iter end() const + { + if (FFat()) + return iter(this, u.m_pfatentry->size()); + return iter(this, 1); + } + + void erase(iter &itr) + { + if (!FFat()) + throw -1; // assert + pfatentry()->m_vecexpireEntries.erase( + pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); + } + + size_t size() const + { + if (FFat()) + return u.m_pfatentry->size(); + return 1; + } + + bool FGetPrimaryExpire(long long *pwhen) const + { + *pwhen = -1; + for (auto itr : *this) + { + if (itr.subkey() == nullptr) + { + *pwhen = itr.when(); + return true; + } + } + return false; + } + + explicit operator sdsview() const noexcept { return key(); } + explicit operator long long() const noexcept { return when(); } +}; +typedef semiorderedset expireset; +extern fastlock g_expireLock; \ No newline at end of file diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 60eb653bf..4cada72cc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -35,7 +35,11 @@ #include #include #include +#ifdef __FreeBSD__ +#include +#else #include +#endif #include #include #ifdef __linux__ @@ -167,7 +171,12 @@ extern "C" pid_t gettid() #else if (pidCache == -1) { uint64_t tidT; +#ifdef __FreeBSD__ +// Check https://github.com/ClickHouse/ClickHouse/commit/8d51824ddcb604b6f179a0216f0d32ba5612bd2e + tidT = pthread_getthreadid_np(); +#else pthread_threadid_np(nullptr, &tidT); +#endif serverAssert(tidT < UINT_MAX); pidCache = (int)tidT; } @@ -343,7 +352,9 @@ extern "C" void fastlock_lock(struct fastlock *lock) unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); unsigned cloops = 0; ticket ticketT; - unsigned loopLimit = g_fHighCpuPressure ? 0x10000 : 0x100000; + int fHighPressure; + __atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED); + unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000; for (;;) { @@ -478,19 +489,22 @@ void fastlock_auto_adjust_waits() { #ifdef __linux__ struct sysinfo sysinf; - auto fHighPressurePrev = g_fHighCpuPressure; + int fHighPressurePrev, fHighPressureNew; + __atomic_load(&g_fHighCpuPressure, &fHighPressurePrev, __ATOMIC_RELAXED); + fHighPressureNew = fHighPressurePrev; memset(&sysinf, 0, sizeof sysinf); if (!sysinfo(&sysinf)) { auto avgCoreLoad = sysinf.loads[0] / get_nprocs(); - g_fHighCpuPressure = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); - if (g_fHighCpuPressure) + int fHighPressureNew = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); + __atomic_store(&g_fHighCpuPressure, &fHighPressureNew, __ATOMIC_RELEASE); + if (fHighPressureNew) serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0); } - if (!g_fHighCpuPressure && fHighPressurePrev) { + if (!fHighPressureNew && fHighPressurePrev) { serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced"); } #else g_fHighCpuPressure = g_fTestMode; #endif -} \ No newline at end of file +} diff --git a/src/networking.cpp b/src/networking.cpp index 328be5c73..206956bcb 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) { serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); switch(o->encoding) { case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o)); - case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); + case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj); default: return 0; /* Just integer encoding for now. */ } } @@ -1266,9 +1266,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) void acceptOnThread(connection *conn, int flags, char *cip) { int ielCur = ielFromEventLoop(serverTL->el); + bool fBootLoad = (g_pserver->loading == LOADING_BOOT); int ielTarget = 0; - if (g_pserver->loading) + if (fBootLoad) { ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active } @@ -1292,10 +1293,10 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] { connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); @@ -1304,7 +1305,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) return; // If res != AE_OK we can still try to accept on the local thread } - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); aeAcquireLock(); @@ -2266,7 +2267,7 @@ int processMultibulkBuffer(client *c) { * 1. The client is reset unless there are reasons to avoid doing it. * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ -void commandProcessed(client *c) { +void commandProcessed(client *c, int flags) { long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -2294,7 +2295,7 @@ void commandProcessed(client *c) { ae.arm(c); long long applied = c->reploff - prev_offset; if (applied) { - if (!g_pserver->fActiveReplica) + if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) { replicationFeedSlavesFromMasterStream(g_pserver->slaves, c->pending_querybuf, applied); @@ -2315,9 +2316,10 @@ void commandProcessed(client *c) { int processCommandAndResetClient(client *c, int flags) { int deadclient = 0; serverTL->current_client = c; - AeLocker locker; - if (processCommand(c, flags, locker) == C_OK) { - commandProcessed(c); + serverAssert(GlobalLocksAcquired()); + + if (processCommand(c, flags) == C_OK) { + commandProcessed(c, flags); } if (serverTL->current_client == NULL) deadclient = 1; serverTL->current_client = NULL; @@ -2468,14 +2470,26 @@ void readQueryFromClient(connection *conn) { return; } - /* There is more data in the client input buffer, continue parsing it - * in case to check if there is a full command to execute. */ - processInputBuffer(c, CMD_CALL_FULL); + serverTL->vecclientsProcess.push_back(c); +} + +void processClients() +{ + serverAssert(GlobalLocksAcquired()); + + for (client *c : serverTL->vecclientsProcess) { + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. */ + std::unique_lock ul(c->lock); + processInputBuffer(c, CMD_CALL_FULL); + } + if (listLength(serverTL->clients_pending_asyncwrite)) { - aelock.arm(c); ProcessPendingAsyncWrites(); } + + serverTL->vecclientsProcess.clear(); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2851,7 +2865,7 @@ NULL if (c->name) addReplyBulk(c,c->name); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); } else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) { /* CLIENT PAUSE */ long long duration; @@ -3433,7 +3447,7 @@ void processEventsWhileBlocked(int iel) { aeReleaseLock(); - serverAssertDebug(!GlobalLocksAcquired()); + serverAssert(!GlobalLocksAcquired()); try { while (iterations--) { diff --git a/src/object.cpp b/src/object.cpp index fbbbae339..c4201e6c1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -41,14 +41,15 @@ /* ===================== Creation and parsing of objects ==================== */ robj *createObject(int type, void *ptr) { - robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); + o->type = type; o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); -#ifdef ENABLE_MVCC - o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif + setMvccTstamp(o, OBJ_MVCC_INVALID); /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -98,15 +99,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); - robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); + + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); -#ifdef ENABLE_MVCC - o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif + setMvccTstamp(o, OBJ_MVCC_INVALID); if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -134,13 +136,8 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ -#ifdef ENABLE_MVCC -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 -#else -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 56 -#endif +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52 -static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); @@ -400,7 +397,11 @@ void decrRefCount(robj_roptr o) { case OBJ_CRON: freeCronObject(o); break; default: serverPanic("Unknown object type"); break; } - zfree(o.unsafe_robjcast()); + if (g_pserver->fActiveReplica) { + zfree(reinterpret_cast(o.unsafe_robjcast())-1); + } else { + zfree(o.unsafe_robjcast()); + } } else { if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); } @@ -1325,13 +1326,12 @@ NULL * in case of the key has not been accessed for a long time, * because we update the access time only * when the key is read or overwritten. */ - addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); -#ifdef ENABLE_MVCC + addReplyLongLong(c,LFUDecrAndReturn(o)); } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == nullptr) return; - addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); -#endif + uint64_t mvcc = mvccFromObj(o); + addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000); } else { addReplySubcommandSyntaxError(c); } @@ -1374,7 +1374,7 @@ NULL auto itr = c->db->find(c->argv[2]); if (itr == nullptr) { - addReplyNull(c, shared.nullbulk); + addReplyNull(c); return; } size_t usage = objectComputeSize(itr.val(),samples); @@ -1515,7 +1515,11 @@ void redisObject::setrefcount(unsigned ref) sds serializeStoredStringObject(sds str, robj_roptr o) { + uint64_t mvcc; + mvcc = mvccFromObj(o); + str = sdscatlen(str, &mvcc, sizeof(mvcc)); str = sdscatlen(str, &(*o), sizeof(robj)); + static_assert((sizeof(robj) + sizeof(mvcc)) == sizeof(redisObjectStack), ""); switch (o->encoding) { case OBJ_ENCODING_RAW: @@ -1539,31 +1543,34 @@ sds serializeStoredStringObject(sds str, robj_roptr o) robj *deserializeStoredStringObject(const char *data, size_t cb) { - const robj *oT = (const robj*)data; + uint64_t mvcc = *reinterpret_cast(data); + const robj *oT = (const robj*)(data+sizeof(uint64_t)); robj *newObject = nullptr; switch (oT->encoding) { case OBJ_ENCODING_INT: - serverAssert(cb == sizeof(robj)); - [[fallthrough]]; + newObject = createObject(OBJ_STRING, nullptr); + newObject->encoding = oT->encoding; + newObject->m_ptr = oT->m_ptr; + return newObject; + case OBJ_ENCODING_EMBSTR: - newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); - memcpy(newObject, data, cb); - newObject->SetFExpires(false); - newObject->setrefcount(1); + newObject = createEmbeddedStringObject(szFromObj(oT), sdslen(szFromObj(oT))); return newObject; case OBJ_ENCODING_RAW: - newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED); - memcpy(newObject, data, sizeof(robj)); - newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); - memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); - newObject->SetFExpires(false); - newObject->setrefcount(1); + newObject = createObject(OBJ_STRING, sdsnewlen(SDS_NOINIT,cb-sizeof(robj)-sizeof(uint64_t))); + newObject->lru = oT->lru; + memcpy(newObject->m_ptr, data+sizeof(robj)+sizeof(mvcc), cb-sizeof(robj)-sizeof(mvcc)); return newObject; + + default: + serverPanic("Unknown string object encoding from storage"); } - serverPanic("Unknown string object encoding from storage"); - return nullptr; + setMvccTstamp(newObject, mvcc); + newObject->setrefcount(1); + + return newObject; } robj *deserializeStoredObjectCore(const void *data, size_t cb) @@ -1591,11 +1598,9 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb) decrRefCount(auxkey); goto eoferr; } -#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { - obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + setMvccTstamp(obj, strtoull(szFromObj(auxval), nullptr, 10)); } -#endif decrRefCount(auxkey); decrRefCount(auxval); } @@ -1639,4 +1644,40 @@ sds serializeStoredObject(robj_roptr o, sds sdsPrefix) return (sds)rdb.io.buffer.ptr; } serverPanic("Attempting to store unknown object type"); -} \ No newline at end of file +} + +redisObjectStack::redisObjectStack() +{ + // We need to ensure the Extended Object is first in the class layout + serverAssert(reinterpret_cast(static_cast(this)) != reinterpret_cast(this)); +} + +void *allocPtrFromObj(robj_roptr o) { + if (g_pserver->fActiveReplica) + return reinterpret_cast(o.unsafe_robjcast()) - 1; + return o.unsafe_robjcast(); +} + +robj *objFromAllocPtr(void *pv) { + if (g_pserver->fActiveReplica) { + return reinterpret_cast(reinterpret_cast(pv)+1); + } + return reinterpret_cast(pv); +} + +uint64_t mvccFromObj(robj_roptr o) +{ + if (g_pserver->fActiveReplica) { + redisObjectExtended *oe = reinterpret_cast(o.unsafe_robjcast()) - 1; + return oe->mvcc_tstamp; + } + return OBJ_MVCC_INVALID; +} + +void setMvccTstamp(robj *o, uint64_t mvcc) +{ + if (!g_pserver->fActiveReplica) + return; + redisObjectExtended *oe = reinterpret_cast(o) - 1; + oe->mvcc_tstamp = mvcc; +} diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 07e0e85f6..3ccbb6a66 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -89,7 +89,7 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel) { if (channel) addReplyBulk(c,channel); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); } @@ -117,7 +117,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { if (pattern) addReplyBulk(c,pattern); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); } diff --git a/src/rdb.cpp b/src/rdb.cpp index ffbd030bf..004fa010f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1096,12 +1096,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn } char szT[32]; -#ifdef ENABLE_MVCC if (g_pserver->fActiveReplica) { - snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + snprintf(szT, 32, "%" PRIu64, mvccFromObj(val)); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; } -#endif /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1155,7 +1153,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o) { - robj key; + redisObjectStack key; initStaticStringObject(key,(char*)keystr); std::unique_lock ul(g_expireLock, std::defer_lock); @@ -2136,7 +2134,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { exit(1); } RedisModuleIO io; - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); moduleInitIOContext(io,mt,rdb,&keyobj); io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; @@ -2185,9 +2183,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } -#ifdef ENABLE_MVCC - o->mvcc_tstamp = mvcc_tstamp; -#endif + setMvccTstamp(o, mvcc_tstamp); serverAssert(!o->FExpires()); return o; } @@ -2196,7 +2192,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { * needed to provide loading stats. */ void startLoading(size_t size, int rdbflags) { /* Load the DB */ - g_pserver->loading = 1; + g_pserver->loading = (rdbflags & RDBFLAGS_REPLICATION) ? LOADING_REPLICATION : LOADING_BOOT; g_pserver->loading_start_time = time(NULL); g_pserver->loading_loaded_bytes = 0; g_pserver->loading_total_bytes = size; @@ -2461,7 +2457,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else { - redisObject keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); decrRefCount(subexpireKey); @@ -2545,18 +2541,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } -#ifdef ENABLE_MVCC - bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; -#else - bool fStaleMvccKey = false; -#endif + bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; if (fStaleMvccKey || fExpiredKey) { diff --git a/src/replication.cpp b/src/replication.cpp index a905fe0fe..7568f6b3d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1179,6 +1179,7 @@ void processReplconfLicense(client *c, robj *arg) * full resync. */ void replconfCommand(client *c) { int j; + bool fCapaCommand = false; if ((c->argc % 2) == 0) { /* Number of arguments must be odd to make sure that every @@ -1189,6 +1190,7 @@ void replconfCommand(client *c) { /* Process every option-value pair. */ for (j = 1; j < c->argc; j+=2) { + fCapaCommand = false; if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) { long port; @@ -1213,6 +1215,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + + fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1258,7 +1262,16 @@ void replconfCommand(client *c) { return; } } - addReply(c,shared.ok); + + if (fCapaCommand) { + sds reply = sdsnew("+OK"); + if (g_pserver->fActiveReplica) + reply = sdscat(reply, " active-replica"); + reply = sdscat(reply, "\r\n"); + addReplySds(c, reply); + } else { + addReply(c,shared.ok); + } } /* This function puts a replica in the online state, and should be called just @@ -2571,6 +2584,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read return PSYNC_NOT_SUPPORTED; } +void parseMasterCapa(redisMaster *mi, sds strcapa) +{ + if (sdslen(strcapa) < 1 || strcapa[0] != '+') + return; + + char *szStart = strcapa + 1; // skip the + + char *pchEnd = szStart; + + mi->isActive = false; + for (;;) + { + if (*pchEnd == ' ' || *pchEnd == '\0') { + // Parse the word + if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { + mi->isActive = true; + } + szStart = pchEnd + 1; + } + if (*pchEnd == '\0') + break; + ++pchEnd; + } +} + /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(connection *conn) { @@ -2799,16 +2836,8 @@ void syncWithMaster(connection *conn) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - if (g_pserver->fActiveReplica) - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); - } - else - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2",NULL); - } + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; @@ -2823,6 +2852,8 @@ void syncWithMaster(connection *conn) { if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); + } else { + parseMasterCapa(mi, err); } sdsfree(err); mi->repl_state = REPL_STATE_SEND_PSYNC; @@ -4056,12 +4087,20 @@ int FBrokenLinkToMaster() listNode *ln; listRewind(g_pserver->masters, &li); + int connected = 0; while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (mi->repl_state != REPL_STATE_CONNECTED) - return true; + if (mi->repl_state == REPL_STATE_CONNECTED) + ++connected; } + + if (g_pserver->repl_quorum < 0) { + return connected < (int)listLength(g_pserver->masters); + } else { + return connected < g_pserver->repl_quorum; + } + return false; } diff --git a/src/server.cpp b/src/server.cpp index 80af48bad..92bdcfa3e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1847,6 +1847,18 @@ void clientsCron(int iel) { freeClientsInAsyncFreeQueue(iel); } +bool expireOwnKeys() +{ + if (iAmMaster()) { + return true; + } else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + if (mi->isActive) + return true; + } + return false; +} + /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ @@ -1854,7 +1866,7 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (g_pserver->active_expire_enabled) { - if (iAmMaster()) { + if (expireOwnKeys()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); @@ -2383,6 +2395,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData } extern int ProcessingEventsWhileBlocked; +void processClients(); /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep @@ -2408,7 +2421,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { aeSetDontWait(eventLoop, tlsHasPendingData()); aeAcquireLock(); - + processClients(); + /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2669,6 +2683,7 @@ void initMasterInfo(redisMaster *master) master->cached_master = NULL; master->master_initial_offset = -1; + master->isActive = false; master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ @@ -3830,7 +3845,7 @@ void call(client *c, int flags) { !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; - if (c->cmd->flags & CMD_SKIP_PROPOGATE) + if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica) propagate_flags &= ~PROPAGATE_REPL; /* Call propagate() only if at least one of AOF / replication @@ -3915,12 +3930,12 @@ void call(client *c, int flags) { * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ -int processCommand(client *c, int callFlags, AeLocker &locker) { +int processCommand(client *c, int callFlags) { AssertCorrectThread(c); + serverAssert(GlobalLocksAcquired()); if (moduleHasCommandFilters()) { - locker.arm(c); moduleCallCommandFilters(c); } @@ -3955,9 +3970,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { return C_OK; } - if (!locker.isArmed()) - c->db->prefetchKeysAsync(locker, c); - /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || @@ -3975,9 +3987,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { /* Check if the user can run this command according to the current * ACLs. */ - if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS)) - locker.arm(c); // ACLs require the lock - int acl_keypos; int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { @@ -4005,7 +4014,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { - locker.arm(c); int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, @@ -4021,6 +4029,8 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { } } + incrementMvccTstamp(); + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -4028,7 +4038,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { * condition, to avoid mixing the propagation of scripts with the * propagation of DELs due to eviction. */ if (g_pserver->maxmemory && !g_pserver->lua_timedout) { - locker.arm(c); int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR; /* freeMemoryIfNeeded may flush replica output buffers. This may result * into a replica, that may be the active client, to be freed. */ @@ -4064,7 +4073,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { * and if this is a master instance. */ if (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand) { - locker.arm(c); int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && listLength(g_pserver->masters) == 0 && @@ -4124,7 +4132,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { if (listLength(g_pserver->masters)) { - locker.arm(c); /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, * when replica-serve-stale-data is no and we are a replica with a broken * link with master. */ @@ -4142,8 +4149,12 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) { - addReply(c, shared.loadingerr); - return C_OK; + /* Active Replicas can execute read only commands, and optionally write commands */ + if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad))) + { + addReply(c, shared.loadingerr); + return C_OK; + } } /* Lua script too slow? Only allow a limited number of commands. @@ -4181,8 +4192,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { queueMultiCommand(c); addReply(c,shared.queued); } else { - locker.arm(c); - incrementMvccTstamp(); call(c,callFlags); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) @@ -4809,7 +4818,7 @@ sds genRedisInfoString(const char *section) { "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", - g_pserver->loading.load(std::memory_order_relaxed), + !!g_pserver->loading.load(std::memory_order_relaxed), /* Note: libraries expect 1 or 0 here so coerce our enum */ g_pserver->dirty, g_pserver->FRdbSaveInProgress(), (intmax_t)g_pserver->lastsave, @@ -4958,64 +4967,61 @@ sds genRedisInfoString(const char *section) { listLength(g_pserver->masters) == 0 ? "master" : g_pserver->fActiveReplica ? "active-replica" : "slave"); if (listLength(g_pserver->masters)) { - listIter li; - listNode *ln; - listRewind(g_pserver->masters, &li); - bool fAllUp = true; - while ((ln = listNext(&li))) { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - fAllUp = fAllUp && mi->repl_state == REPL_STATE_CONNECTED; - } - info = sdscatprintf(info, "master_global_link_status:%s\r\n", - fAllUp ? "up" : "down"); + FBrokenLinkToMaster() ? "down" : "up"); int cmasters = 0; + listIter li; + listNode *ln; listRewind(g_pserver->masters, &li); while ((ln = listNext(&li))) { long long slave_repl_offset = 1; redisMaster *mi = (redisMaster*)listNodeValue(ln); - info = sdscatprintf(info, "Master %d: \r\n", cmasters); - ++cmasters; if (mi->master) slave_repl_offset = mi->master->reploff; else if (mi->cached_master) slave_repl_offset = mi->cached_master->reploff; + char master_prefix[128] = ""; + if (cmasters != 0) { + snprintf(master_prefix, sizeof(master_prefix), "_%d", cmasters); + } + info = sdscatprintf(info, - "master_host:%s\r\n" - "master_port:%d\r\n" - "master_link_status:%s\r\n" - "master_last_io_seconds_ago:%d\r\n" - "master_sync_in_progress:%d\r\n" + "master%s_host:%s\r\n" + "master%s_port:%d\r\n" + "master%s_link_status:%s\r\n" + "master%s_last_io_seconds_ago:%d\r\n" + "master%s_sync_in_progress:%d\r\n" "slave_repl_offset:%lld\r\n" - ,mi->masterhost, - mi->masterport, - (mi->repl_state == REPL_STATE_CONNECTED) ? + ,master_prefix, mi->masterhost, + master_prefix, mi->masterport, + master_prefix, (mi->repl_state == REPL_STATE_CONNECTED) ? "up" : "down", - mi->master ? + master_prefix, mi->master ? ((int)(g_pserver->unixtime-mi->master->lastinteraction)) : -1, - mi->repl_state == REPL_STATE_TRANSFER, + master_prefix, mi->repl_state == REPL_STATE_TRANSFER, slave_repl_offset ); if (mi->repl_state == REPL_STATE_TRANSFER) { info = sdscatprintf(info, - "master_sync_left_bytes:%lld\r\n" - "master_sync_last_io_seconds_ago:%d\r\n" - , (long long) + "master%s_sync_left_bytes:%lld\r\n" + "master%s_sync_last_io_seconds_ago:%d\r\n" + , master_prefix, (long long) (mi->repl_transfer_size - mi->repl_transfer_read), - (int)(g_pserver->unixtime-mi->repl_transfer_lastio) + master_prefix, (int)(g_pserver->unixtime-mi->repl_transfer_lastio) ); } if (mi->repl_state != REPL_STATE_CONNECTED) { info = sdscatprintf(info, - "master_link_down_since_seconds:%jd\r\n", - (intmax_t)g_pserver->unixtime-mi->repl_down_since); + "master%s_link_down_since_seconds:%jd\r\n", + master_prefix, (intmax_t)g_pserver->unixtime-mi->repl_down_since); } + ++cmasters; } info = sdscatprintf(info, "slave_priority:%d\r\n" diff --git a/src/server.h b/src/server.h index fb4c6634e..fcac61c1f 100644 --- a/src/server.h +++ b/src/server.h @@ -96,6 +96,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "semiorderedset.h" #include "connection.h" /* Connection abstraction */ #include "serverassert.h" +#include "expire.h" #define REDISMODULE_CORE 1 #include "redismodule.h" /* Redis modules API defines. */ @@ -112,6 +113,9 @@ typedef long long ustime_t; /* microsecond time type. */ #define FImplies(x, y) (!(x) || (y)) +#define LOADING_BOOT 1 +#define LOADING_REPLICATION 2 + extern int g_fTestMode; extern struct redisServer *g_pserver; @@ -868,7 +872,16 @@ typedef struct RedisModuleDigest { #define MVCC_MS_SHIFT 20 -typedef struct redisObject { +// This struct will be allocated ahead of the ROBJ when needed +struct redisObjectExtended { + uint64_t mvcc_tstamp; +}; + +typedef class redisObject { +protected: + redisObject() {} + +public: unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or @@ -877,9 +890,6 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: -#ifdef ENABLE_MVCC - uint64_t mvcc_tstamp; -#endif void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -890,11 +900,18 @@ public: void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; -#ifdef ENABLE_MVCC -static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); -#else static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); -#endif + +class redisObjectStack : public redisObjectExtended, public redisObject +{ +public: + redisObjectStack(); +}; + +uint64_t mvccFromObj(robj_roptr o); +void setMvccTstamp(redisObject *o, uint64_t mvcc); +void *allocPtrFromObj(robj_roptr o); +robj *objFromAllocPtr(void *pv); __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { @@ -920,281 +937,6 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } -class expireEntryFat -{ - friend class expireEntry; -public: - struct subexpireEntry - { - long long when; - std::unique_ptr spsubkey; - - subexpireEntry(long long when, const char *subkey) - : when(when), spsubkey(subkey, sdsfree) - {} - - subexpireEntry(const subexpireEntry &e) - : when(e.when), spsubkey(nullptr, sdsfree) - { - if (e.spsubkey) - spsubkey = std::unique_ptr((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree); - } - - subexpireEntry(subexpireEntry &&e) = default; - subexpireEntry& operator=(subexpireEntry &&e) = default; - - bool operator<(long long when) const noexcept { return this->when < when; } - bool operator<(const subexpireEntry &se) { return this->when < se.when; } - }; - -private: - sdsimmutablestring m_keyPrimary; - std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key - -public: - expireEntryFat(const sdsimmutablestring &keyPrimary) - : m_keyPrimary(keyPrimary) - {} - - expireEntryFat(const expireEntryFat &e) = default; - expireEntryFat(expireEntryFat &&e) = default; - - long long when() const noexcept { return m_vecexpireEntries.front().when; } - const char *key() const noexcept { return static_cast(m_keyPrimary); } - - bool operator<(long long when) const noexcept { return this->when() < when; } - - void expireSubKey(const char *szSubkey, long long when) - { - // First check if the subkey already has an expiration - for (auto &entry : m_vecexpireEntries) - { - if (szSubkey != nullptr) - { - // if this is a subkey expiry then its not a match if the expireEntry is either for the - // primary key or a different subkey - if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0) - continue; - } - else - { - if (entry.spsubkey != nullptr) - continue; - } - m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data())); - break; - } - auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); - const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr; - m_vecexpireEntries.emplace(itrInsert, when, subkey); - } - - bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } - const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } - void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - const subexpireEntry &operator[](size_t idx) const { return m_vecexpireEntries[idx]; } - size_t size() const noexcept { return m_vecexpireEntries.size(); } -}; - -class expireEntry { - struct - { - sdsimmutablestring m_key; - expireEntryFat *m_pfatentry = nullptr; - } u; - long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer - -public: - class iter - { - friend class expireEntry; - const expireEntry *m_pentry = nullptr; - size_t m_idx = 0; - - public: - iter(const expireEntry *pentry, size_t idx) - : m_pentry(pentry), m_idx(idx) - {} - - iter &operator++() { ++m_idx; return *this; } - - const char *subkey() const - { - if (m_pentry->FFat()) - return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); - return nullptr; - } - long long when() const - { - if (m_pentry->FFat()) - return (*m_pentry->pfatentry())[m_idx].when; - return m_pentry->when(); - } - - bool operator!=(const iter &other) - { - return m_idx != other.m_idx; - } - - const iter &operator*() const { return *this; } - }; - - expireEntry(sds key, const char *subkey, long long when) - { - if (subkey != nullptr) - { - m_when = LLONG_MIN; - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key))); - u.m_pfatentry->expireSubKey(subkey, when); - } - else - { - u.m_key = sdsimmutablestring(sdsdupshared(key)); - m_when = when; - } - } - - expireEntry(expireEntryFat *pfatentry) - { - u.m_pfatentry = pfatentry; - m_when = LLONG_MIN; - } - - expireEntry(const expireEntry &e) - { - *this = e; - } - - expireEntry(expireEntry &&e) - { - u.m_key = std::move(e.u.m_key); - u.m_pfatentry = std::move(e.u.m_pfatentry); - m_when = e.m_when; - e.m_when = 0; - e.u.m_pfatentry = nullptr; - } - - ~expireEntry() - { - if (FFat()) - delete u.m_pfatentry; - } - - expireEntry &operator=(const expireEntry &e) - { - u.m_key = e.u.m_key; - m_when = e.m_when; - if (e.FFat()) - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); - return *this; - } - - void setKeyUnsafe(sds key) - { - if (FFat()) - u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key)); - else - u.m_key = sdsimmutablestring(sdsdupshared(key)); - } - - inline bool FFat() const noexcept { return m_when == LLONG_MIN; } - expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } - const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; } - - - bool operator==(const sdsview &key) const noexcept - { - return key == this->key(); - } - - bool operator<(const expireEntry &e) const noexcept - { - return when() < e.when(); - } - bool operator<(long long when) const noexcept - { - return this->when() < when; - } - - const char *key() const noexcept - { - if (FFat()) - return u.m_pfatentry->key(); - return static_cast(u.m_key); - } - long long when() const noexcept - { - if (FFat()) - return u.m_pfatentry->when(); - return m_when; - } - - void update(const char *subkey, long long when) - { - if (!FFat()) - { - if (subkey == nullptr) - { - m_when = when; - return; - } - else - { - // we have to upgrade to a fat entry - long long whenT = m_when; - sdsimmutablestring keyPrimary = u.m_key; - m_when = LLONG_MIN; - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); - u.m_pfatentry->expireSubKey(nullptr, whenT); - // at this point we're fat so fall through - } - } - u.m_pfatentry->expireSubKey(subkey, when); - } - - iter begin() const { return iter(this, 0); } - iter end() const - { - if (FFat()) - return iter(this, u.m_pfatentry->size()); - return iter(this, 1); - } - - void erase(iter &itr) - { - if (!FFat()) - throw -1; // assert - pfatentry()->m_vecexpireEntries.erase( - pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); - } - - size_t size() const - { - if (FFat()) - return u.m_pfatentry->size(); - return 1; - } - - bool FGetPrimaryExpire(long long *pwhen) const - { - *pwhen = -1; - for (auto itr : *this) - { - if (itr.subkey() == nullptr) - { - *pwhen = itr.when(); - return true; - } - } - return false; - } - - explicit operator sdsview() const noexcept { return key(); } - explicit operator long long() const noexcept { return when(); } -}; -typedef semiorderedset expireset; -extern fastlock g_expireLock; - /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * and Module types have their registered name returned. */ @@ -1940,6 +1682,9 @@ typedef struct redisTLSContextConfig { char *ciphers; char *ciphersuites; int prefer_server_ciphers; + int session_caching; + int session_cache_size; + int session_cache_timeout; } redisTLSContextConfig; /*----------------------------------------------------------------------------- @@ -2038,6 +1783,7 @@ struct redisServerThreadVars { GarbageCollectorCollection::Epoch gcEpoch; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; bool fRetrySetAofEvent = false; + std::vector vecclientsProcess; int getRdbKeySaveDelay(); private: @@ -2057,6 +1803,7 @@ struct redisMaster { char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ long long master_initial_offset; /* Master PSYNC offset. */ + bool isActive = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ @@ -2360,6 +2107,7 @@ struct redisServer { int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int repl_serve_stale_data; /* Serve stale data when link is down? */ + int repl_quorum; /* For multimaster what do we consider a quorum? -1 means all master must be online */ int repl_slave_ro; /* Slave is read only? */ int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ int slave_priority; /* Reported in INFO and used by Sentinel. */ @@ -2480,6 +2228,7 @@ struct redisServer { int watchdog_period; /* Software watchdog period in ms. 0 = off */ int fActiveReplica; /* Can this replica also be a master? */ + int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */ // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond @@ -3071,7 +2820,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot); int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot); -int processCommand(client *c, int callFlags, class AeLocker &locker); +int processCommand(client *c, int callFlags); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(const char *s); diff --git a/src/t_list.cpp b/src/t_list.cpp index f95852aac..e9bac8211 100644 --- a/src/t_list.cpp +++ b/src/t_list.cpp @@ -331,7 +331,7 @@ void lindexCommand(client *c) { addReplyBulk(c,value); decrRefCount(value); } else { - addReplyNull(c,shared.nullbulk); + addReplyNull(c); } } else { serverPanic("Unknown list encoding"); diff --git a/src/tls.cpp b/src/tls.cpp index 25ca0bd31..5a128b596 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -150,8 +150,6 @@ void tlsInit(void) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } - /* Server configuration */ - g_pserver->tls_auth_clients = 1; /* Secure by default */ tlsInitThread(); } @@ -193,6 +191,15 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) { SSL_CTX_set_options(ctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS); #endif + if (ctx_config->session_caching) { + SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_SERVER); + SSL_CTX_sess_set_cache_size(ctx, ctx_config->session_cache_size); + SSL_CTX_set_timeout(ctx, ctx_config->session_cache_timeout); + SSL_CTX_set_session_id_context(ctx, (const unsigned char*) "KeyDB", 5); + } else { + SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF); + } + protocols = parseProtocolsConfig(ctx_config->protocols); if (protocols == -1) goto error; diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl index 0e753c147..0f0d34b7f 100644 --- a/tests/integration/replication-multimaster.tcl +++ b/tests/integration/replication-multimaster.tcl @@ -96,6 +96,26 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } } + + # Keep this test last since it muchs with the config + if [string equal $topology "mesh"] { + test "$topology_name quorum respected" { + $R(0) config set replica-serve-stale-data no + + # No issues when all nodes are connected with default settings + $R(0) get testkey + + # No issues when quorum is equal to the number of nodes + $R(0) config set replica-quorum 3 + $R(0) get testkey + + $R(0) config set replica-quorum 4 + catch { + $R(0) get testkey + } e + assert_match {*MASTER is down*} $e + } + } } } } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 42cbfd64d..55bac6d9d 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -78,17 +78,8 @@ start_server {tags {"introspection"}} { syslog-facility databases port - io-threads tls-port - tls-prefer-server-ciphers - tls-cert-file - tls-key-file - tls-dh-params-file - tls-ca-cert-file - tls-ca-cert-dir - tls-protocols - tls-ciphers - tls-ciphersuites + io-threads logfile unixsocketperm slaveof @@ -104,6 +95,23 @@ start_server {tags {"introspection"}} { use-fork } + if {!$::tls} { + append skip_configs { + tls-prefer-server-ciphers + tls-session-cache-timeout + tls-session-cache-size + tls-session-caching + tls-cert-file + tls-key-file + tls-dh-params-file + tls-ca-cert-file + tls-ca-cert-dir + tls-protocols + tls-ciphers + tls-ciphersuites + } + } + set configs {} foreach {k v} [r config get *] { if {[lsearch $skip_configs $k] != -1} { diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 6c991ac97..7bcd7753b 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -109,6 +109,7 @@ start_server {tags {"pubsub"}} { unsubscribe $rd1 # Wait for a response to the unsub __consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3} + after 1 assert_equal 0 [r publish chan1 hello] assert_equal 0 [r publish chan2 hello] assert_equal 0 [r publish chan3 hello]