Merge branch 'unstable' into keydbpro

Former-commit-id: d1986d9fcdfd56f2c30ee95edef6d5260e0aa777
This commit is contained in:
John Sully 2020-09-24 22:02:30 +00:00
commit cb6bc67a87
25 changed files with 758 additions and 479 deletions

View File

@ -12,9 +12,9 @@ jobs:
submodules: true submodules: true
- name: make - name: make
run: | run: |
sudo apt-get update
sudo apt-get -y remove libzstd || true sudo apt-get -y remove libzstd || true
sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make BUILD_TLS=yes -j2 make BUILD_TLS=yes -j2
- name: gen-cert - name: gen-cert
run: ./utils/gen-test-certs.sh run: ./utils/gen-test-certs.sh

2
TLS.md
View File

@ -56,8 +56,6 @@ Note that unlike Redis, KeyDB fully supports multithreading of TLS connections.
To-Do List To-Do List
---------- ----------
- [ ] Add session caching support. Check if/how it's handled by clients to
assess how useful/important it is.
- [ ] redis-benchmark support. The current implementation is a mix of using - [ ] redis-benchmark support. The current implementation is a mix of using
hiredis for parsing and basic networking (establishing connections), but hiredis for parsing and basic networking (establishing connections), but
directly manipulating sockets for most actions. This will need to be cleaned directly manipulating sockets for most actions. This will need to be cleaned

View File

@ -199,6 +199,22 @@ tcp-keepalive 300
# #
# tls-prefer-server-ciphers yes # tls-prefer-server-ciphers yes
# By default, TLS session caching is enabled to allow faster and less expensive
# reconnections by clients that support it. Use the following directive to disable
# caching.
#
# tls-session-caching no
# Change the default number of TLS sessions cached. A zero value sets the cache
# to unlimited size. The default size is 20480.
#
# tls-session-cache-size 5000
# Change the default timeout of cached TLS sessions. The default timeout is 300
# seconds.
#
# tls-session-cache-timeout 60
################################# GENERAL ##################################### ################################# GENERAL #####################################
# By default KeyDB does not run as a daemon. Use 'yes' if you need it. # By default KeyDB does not run as a daemon. Use 'yes' if you need it.
@ -401,6 +417,20 @@ dir ./
# #
replica-serve-stale-data yes replica-serve-stale-data yes
# Active Replicas will allow read only data access while loading remote RDBs
# provided they are permitted to serve stale data. As an option you may also
# permit them to accept write commands. This is an EXPERIMENTAL feature and
# may result in commands not being fully synchronized
#
# allow-write-during-load no
# You can modify the number of masters necessary to form a replica quorum when
# multi-master is enabled and replica-serve-stale-data is "no". By default
# this is set to -1 which implies the number of known masters (e.g. those
# you added with replicaof)
#
# replica-quorum -1
# You can configure a replica instance to accept writes or not. Writing against # You can configure a replica instance to accept writes or not. Writing against
# a replica instance may be useful to store some ephemeral data (because data # a replica instance may be useful to store some ephemeral data (because data
# written on a replica will be easily deleted after resync with the master) but # written on a replica will be easily deleted after resync with the master) but

View File

@ -47,11 +47,6 @@ endif
USEASM?=true USEASM?=true
ifeq ($(NOMVCC),)
CFLAGS+= -DENABLE_MVCC
CXXFLAGS+= -DENABLE_MVCC
endif
ifneq ($(SANITIZE),) ifneq ($(SANITIZE),)
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE

View File

@ -9,11 +9,14 @@ public:
{ {
} }
void arm(client *c) // if a client is passed, then the client is already locked void arm(client *c, bool fIfNeeded = false) // if a client is passed, then the client is already locked
{ {
if (m_fArmed) if (m_fArmed)
return; return;
if (fIfNeeded && aeThreadOwnsLock())
return;
serverAssertDebug(!GlobalLocksAcquired()); serverAssertDebug(!GlobalLocksAcquired());
if (c != nullptr) if (c != nullptr)

View File

@ -1432,8 +1432,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
/* Iterate this DB writing every entry */ /* Iterate this DB writing every entry */
bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{ bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{
robj key; redisObjectStack key;
initStaticStringObject(key,(sds)keystr); initStaticStringObject(key,(sds)keystr);
/* Save the key and associated value */ /* Save the key and associated value */

View File

@ -4920,11 +4920,10 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) {
rioInitWithBuffer(payload,sdsempty()); rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key)); serverAssert(rdbSaveObject(payload,o,key));
#ifdef ENABLE_MVCC
char szT[32]; char szT[32];
snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); uint64_t mvcc = mvccFromObj(o);
snprintf(szT, 32, "%" PRIu64, mvcc);
serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1);
#endif
/* Write the footer, this is how it looks like: /* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+ * ----------------+---------------------+---------------+
@ -5066,11 +5065,9 @@ void restoreCommand(client *c) {
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); setMvccTstamp(obj, strtoull(szFromObj(auxval), nullptr, 10));
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }

View File

@ -2256,7 +2256,7 @@ static int updateTlsCfg(char *val, char *prev, const char **err) {
UNUSED(prev); UNUSED(prev);
UNUSED(err); UNUSED(err);
if (tlsConfigure(&g_pserver->tls_ctx_config) == C_ERR) { if (tlsConfigure(&g_pserver->tls_ctx_config) == C_ERR) {
*err = "Unable to configure tls-cert-file. Check server logs."; *err = "Unable to update TLS configuration. Check server logs.";
return 0; return 0;
} }
return 1; return 1;
@ -2266,6 +2266,12 @@ static int updateTlsCfgBool(int val, int prev, const char **err) {
UNUSED(prev); UNUSED(prev);
return updateTlsCfg(NULL, NULL, err); return updateTlsCfg(NULL, NULL, err);
} }
static int updateTlsCfgInt(long long val, long long prev, const char **err) {
UNUSED(val);
UNUSED(prev);
return updateTlsCfg(NULL, NULL, err);
}
#endif /* USE_OPENSSL */ #endif /* USE_OPENSSL */
int fDummy = false; int fDummy = false;
@ -2306,9 +2312,10 @@ standardConfig configs[] = {
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL), createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0,NULL, NULL), // Not applicable to KeyDB, just there for compatibility
createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL),
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL), createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL),
createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
/* String Configs */ /* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),
@ -2369,6 +2376,7 @@ standardConfig configs[] = {
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL), createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL),
createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL), createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */ /* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
@ -2406,10 +2414,13 @@ standardConfig configs[] = {
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
createIntConfig("tls-port", NULL, IMMUTABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* TCP port. */ createIntConfig("tls-port", NULL, IMMUTABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* TCP port. */
createIntConfig("tls-session-cache-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_size, 20*1024, INTEGER_CONFIG, NULL, updateTlsCfgInt),
createIntConfig("tls-session-cache-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_timeout, 300, INTEGER_CONFIG, NULL, updateTlsCfgInt),
createBoolConfig("tls-cluster", NULL, MODIFIABLE_CONFIG, g_pserver->tls_cluster, 0, NULL, NULL), createBoolConfig("tls-cluster", NULL, MODIFIABLE_CONFIG, g_pserver->tls_cluster, 0, NULL, NULL),
createBoolConfig("tls-replication", NULL, MODIFIABLE_CONFIG, g_pserver->tls_replication, 0, NULL, NULL), createBoolConfig("tls-replication", NULL, MODIFIABLE_CONFIG, g_pserver->tls_replication, 0, NULL, NULL),
createBoolConfig("tls-auth-clients", NULL, MODIFIABLE_CONFIG, g_pserver->tls_auth_clients, 1, NULL, NULL), createBoolConfig("tls-auth-clients", NULL, MODIFIABLE_CONFIG, g_pserver->tls_auth_clients, 1, NULL, NULL),
createBoolConfig("tls-prefer-server-ciphers", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.prefer_server_ciphers, 0, NULL, updateTlsCfgBool), createBoolConfig("tls-prefer-server-ciphers", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.prefer_server_ciphers, 0, NULL, updateTlsCfgBool),
createBoolConfig("tls-session-caching", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.session_caching, 1, NULL, updateTlsCfgBool),
createStringConfig("tls-cert-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.cert_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-cert-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.cert_file, NULL, NULL, updateTlsCfg),
createStringConfig("tls-key-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.key_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-key-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.key_file, NULL, NULL, updateTlsCfg),
createStringConfig("tls-dh-params-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.dh_params_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-dh-params-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.dh_params_file, NULL, NULL, updateTlsCfg),

View File

@ -91,9 +91,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
robj *val = itr.val(); robj *val = itr.val();
lookupKeyUpdateObj(val, flags); lookupKeyUpdateObj(val, flags);
if (flags & LOOKUP_UPDATEMVCC) { if (flags & LOOKUP_UPDATEMVCC) {
#ifdef ENABLE_MVCC setMvccTstamp(val, getMvccTstamp());
val->mvcc_tstamp = getMvccTstamp();
#endif
db->trackkey(key, true /* fUpdate */); db->trackkey(key, true /* fUpdate */);
} }
return val; return val;
@ -220,10 +218,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdupshared(szFromObj(key)); sds copy = sdsdupshared(szFromObj(key));
#ifdef ENABLE_MVCC
if (g_pserver->fActiveReplica) uint64_t mvcc = getMvccTstamp();
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); setMvccTstamp(key, mvcc);
#endif setMvccTstamp(val, mvcc);
bool fInserted = db->insert(copy, val, fAssumeNew); bool fInserted = db->insert(copy, val, fAssumeNew);
@ -274,9 +272,7 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd
if (fUpdateMvcc) { if (fUpdateMvcc) {
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
val = dupStringObject(val); val = dupStringObject(val);
#ifdef ENABLE_MVCC setMvccTstamp(val, getMvccTstamp());
val->mvcc_tstamp = getMvccTstamp();
#endif
} }
if (g_pserver->lazyfree_lazy_server_del) if (g_pserver->lazyfree_lazy_server_del)
@ -309,14 +305,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
if (itr == nullptr) if (itr == nullptr)
return (dbAddCore(db, key, val) == true); return (dbAddCore(db, key, val) == true);
#ifdef ENABLE_MVCC
robj *old = itr.val(); robj *old = itr.val();
if (old->mvcc_tstamp <= val->mvcc_tstamp) if (mvccFromObj(old) <= mvccFromObj(val))
{ {
db->dbOverwriteCore(itr, key, val, false, true); db->dbOverwriteCore(itr, key, val, false, true);
return true; return true;
} }
#endif
return false; return false;
} }
@ -1635,7 +1629,7 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
/* Update the expire set */ /* Update the expire set */
db->setExpire(key, subkey, when); db->setExpire(key, subkey, when);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica;
if (c && writable_slave && !(c->flags & CLIENT_MASTER)) if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key); rememberSlaveKeyWithExpire(db,key);
} }
@ -1670,7 +1664,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
kde.val()->SetFExpires(true); kde.val()->SetFExpires(true);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica;
if (c && writable_slave && !(c->flags & CLIENT_MASTER)) if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key); rememberSlaveKeyWithExpire(db,key);
} }
@ -1724,7 +1718,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
{ {
robj *argv[3]; robj *argv[3];
robj objT;
redisCommand *cmd = nullptr; redisCommand *cmd = nullptr;
switch (type) switch (type)
{ {
@ -2498,9 +2491,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
sdsfree(strT); sdsfree(strT);
dictAdd(m_pdict, keyNew, objNew); dictAdd(m_pdict, keyNew, objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
#ifdef ENABLE_MVCC serverAssert(mvccFromObj(objNew) == mvccFromObj(itr.val()));
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
#endif
} }
} }
else else

View File

@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
* returns NULL in case the allocatoin wasn't moved. * returns NULL in case the allocatoin wasn't moved.
* when it returns a non-null value, the old pointer was already released * when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */ * and should NOT be accessed. */
void* activeDefragAlloc(void *ptr) { template<typename TPTR>
TPTR* activeDefragAlloc(TPTR *ptr) {
size_t size; size_t size;
void *newptr; void *newptr;
if(!je_get_defrag_hint(ptr)) { if(!je_get_defrag_hint(ptr)) {
@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) {
newptr = zmalloc_no_tcache(size); newptr = zmalloc_no_tcache(size);
memcpy(newptr, ptr, size); memcpy(newptr, ptr, size);
zfree_no_tcache(ptr); zfree_no_tcache(ptr);
return newptr; return (TPTR*)newptr;
}
template<>
robj* activeDefragAlloc(robj *o) {
void *pvSrc = allocPtrFromObj(o);
void *pvDst = activeDefragAlloc(pvSrc);
return objFromAllocPtr(pvDst);
} }
/*Defrag helper for sds strings /*Defrag helper for sds strings

View File

@ -81,7 +81,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
robj *val = db->find(e.key()); robj *val = db->find(e.key());
int deleted = 0; int deleted = 0;
robj objKey; redisObjectStack objKey;
initStaticStringObject(objKey, (char*)e.key()); initStaticStringObject(objKey, (char*)e.key());
bool fTtlChanged = false; bool fTtlChanged = false;
@ -146,7 +146,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
serverAssert(false); serverAssert(false);
} }
robj objSubkey; redisObjectStack objSubkey;
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
@ -744,3 +744,103 @@ void touchCommand(client *c) {
addReplyLongLong(c,touched); addReplyLongLong(c,touched);
} }
expireEntryFat::~expireEntryFat()
{
if (m_dictIndex != nullptr)
dictRelease(m_dictIndex);
}
expireEntryFat::expireEntryFat(const expireEntryFat &e)
: m_keyPrimary(e.m_keyPrimary), m_vecexpireEntries(e.m_vecexpireEntries)
{
// Note: dictExpires is not copied
}
expireEntryFat::expireEntryFat(expireEntryFat &&e)
: m_keyPrimary(std::move(e.m_keyPrimary)), m_vecexpireEntries(std::move(e.m_vecexpireEntries))
{
m_dictIndex = e.m_dictIndex;
e.m_dictIndex = nullptr;
}
void expireEntryFat::createIndex()
{
serverAssert(m_dictIndex == nullptr);
m_dictIndex = dictCreate(&keyptrDictType, nullptr);
for (auto &entry : m_vecexpireEntries)
{
if (entry.spsubkey != nullptr)
{
dictEntry *de = dictAddRaw(m_dictIndex, (void*)entry.spsubkey.get(), nullptr);
de->v.s64 = entry.when;
}
}
}
void expireEntryFat::expireSubKey(const char *szSubkey, long long when)
{
if (m_vecexpireEntries.size() >= INDEX_THRESHOLD && m_dictIndex == nullptr)
createIndex();
// First check if the subkey already has an expiration
if (m_dictIndex != nullptr && szSubkey != nullptr)
{
dictEntry *de = dictFind(m_dictIndex, szSubkey);
if (de != nullptr)
{
auto itr = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), de->v.u64);
while (itr != m_vecexpireEntries.end() && itr->when == de->v.s64)
{
bool fFound = false;
if (szSubkey == nullptr && itr->spsubkey == nullptr) {
fFound = true;
} else if (szSubkey != nullptr && itr->spsubkey != nullptr && sdscmp((sds)itr->spsubkey.get(), (sds)szSubkey) == 0) {
fFound = true;
}
if (fFound) {
m_vecexpireEntries.erase(itr);
dictDelete(m_dictIndex, szSubkey);
break;
}
++itr;
}
}
}
else
{
for (auto &entry : m_vecexpireEntries)
{
if (szSubkey != nullptr)
{
// if this is a subkey expiry then its not a match if the expireEntry is either for the
// primary key or a different subkey
if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0)
continue;
}
else
{
if (entry.spsubkey != nullptr)
continue;
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data()));
break;
}
}
auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr;
auto itr = m_vecexpireEntries.emplace(itrInsert, when, subkey);
if (m_dictIndex && subkey) {
dictEntry *de = dictAddRaw(m_dictIndex, (void*)itr->spsubkey.get(), nullptr);
de->v.s64 = when;
}
}
void expireEntryFat::popfrontExpireEntry()
{
if (m_dictIndex != nullptr && m_vecexpireEntries.begin()->spsubkey) {
int res = dictDelete(m_dictIndex, (void*)m_vecexpireEntries.begin()->spsubkey.get());
serverAssert(res == DICT_OK);
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin());
}

256
src/expire.h Normal file
View File

@ -0,0 +1,256 @@
#pragma once
class expireEntryFat
{
friend class expireEntry;
static const int INDEX_THRESHOLD = 16;
public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
subexpireEntry(const subexpireEntry &e)
: when(e.when), spsubkey(nullptr, sdsfree)
{
if (e.spsubkey)
spsubkey = std::unique_ptr<const char, void(*)(const char*)>((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree);
}
subexpireEntry(subexpireEntry &&e) = default;
subexpireEntry& operator=(subexpireEntry &&e) = default;
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sdsimmutablestring m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
dict *m_dictIndex = nullptr;
void createIndex();
public:
expireEntryFat(const sdsimmutablestring &keyPrimary)
: m_keyPrimary(keyPrimary)
{}
~expireEntryFat();
expireEntryFat(const expireEntryFat &e);
expireEntryFat(expireEntryFat &&e);
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return static_cast<const char*>(m_keyPrimary); }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when);
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry();
const subexpireEntry &operator[](size_t idx) const { return m_vecexpireEntries[idx]; }
size_t size() const noexcept { return m_vecexpireEntries.size(); }
};
class expireEntry {
struct
{
sdsimmutablestring m_key;
expireEntryFat *m_pfatentry = nullptr;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
class iter
{
friend class expireEntry;
const expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(const expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
iter &operator++() { ++m_idx; return *this; }
const char *subkey() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].spsubkey.get();
return nullptr;
}
long long when() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].when;
return m_pentry->when();
}
bool operator!=(const iter &other)
{
return m_idx != other.m_idx;
}
const iter &operator*() const { return *this; }
};
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key)));
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = sdsimmutablestring(sdsdupshared(key));
m_when = when;
}
}
expireEntry(const expireEntry &e)
{
*this = e;
}
expireEntry(expireEntry &&e)
{
u.m_key = std::move(e.u.m_key);
u.m_pfatentry = std::move(e.u.m_pfatentry);
m_when = e.m_when;
e.m_when = 0;
e.u.m_pfatentry = nullptr;
}
expireEntry(expireEntryFat *pfatentry)
{
u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
}
~expireEntry()
{
if (FFat())
delete u.m_pfatentry;
}
expireEntry &operator=(const expireEntry &e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
if (e.FFat())
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry);
return *this;
}
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key));
else
u.m_key = sdsimmutablestring(sdsdupshared(key));
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; }
bool operator==(const sdsview &key) const noexcept
{
return key == this->key();
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return static_cast<const char*>(u.m_key);
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sdsimmutablestring keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() const { return iter(this, 0); }
iter end() const
{
if (FFat())
return iter(this, u.m_pfatentry->size());
return iter(this, 1);
}
void erase(iter &itr)
{
if (!FFat())
throw -1; // assert
pfatentry()->m_vecexpireEntries.erase(
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
size_t size() const
{
if (FFat())
return u.m_pfatentry->size();
return 1;
}
bool FGetPrimaryExpire(long long *pwhen) const
{
*pwhen = -1;
for (auto itr : *this)
{
if (itr.subkey() == nullptr)
{
*pwhen = itr.when();
return true;
}
}
return false;
}
explicit operator sdsview() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
};
typedef semiorderedset<expireEntry, sdsview, true /*expireEntry can be memmoved*/> expireset;
extern fastlock g_expireLock;

View File

@ -35,7 +35,11 @@
#include <sched.h> #include <sched.h>
#include <atomic> #include <atomic>
#include <assert.h> #include <assert.h>
#ifdef __FreeBSD__
#include <pthread_np.h>
#else
#include <pthread.h> #include <pthread.h>
#endif
#include <limits.h> #include <limits.h>
#include <map> #include <map>
#ifdef __linux__ #ifdef __linux__
@ -167,7 +171,12 @@ extern "C" pid_t gettid()
#else #else
if (pidCache == -1) { if (pidCache == -1) {
uint64_t tidT; uint64_t tidT;
#ifdef __FreeBSD__
// Check https://github.com/ClickHouse/ClickHouse/commit/8d51824ddcb604b6f179a0216f0d32ba5612bd2e
tidT = pthread_getthreadid_np();
#else
pthread_threadid_np(nullptr, &tidT); pthread_threadid_np(nullptr, &tidT);
#endif
serverAssert(tidT < UINT_MAX); serverAssert(tidT < UINT_MAX);
pidCache = (int)tidT; pidCache = (int)tidT;
} }
@ -343,7 +352,9 @@ extern "C" void fastlock_lock(struct fastlock *lock)
unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
unsigned cloops = 0; unsigned cloops = 0;
ticket ticketT; ticket ticketT;
unsigned loopLimit = g_fHighCpuPressure ? 0x10000 : 0x100000; int fHighPressure;
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
for (;;) for (;;)
{ {
@ -478,19 +489,22 @@ void fastlock_auto_adjust_waits()
{ {
#ifdef __linux__ #ifdef __linux__
struct sysinfo sysinf; struct sysinfo sysinf;
auto fHighPressurePrev = g_fHighCpuPressure; int fHighPressurePrev, fHighPressureNew;
__atomic_load(&g_fHighCpuPressure, &fHighPressurePrev, __ATOMIC_RELAXED);
fHighPressureNew = fHighPressurePrev;
memset(&sysinf, 0, sizeof sysinf); memset(&sysinf, 0, sizeof sysinf);
if (!sysinfo(&sysinf)) { if (!sysinfo(&sysinf)) {
auto avgCoreLoad = sysinf.loads[0] / get_nprocs(); auto avgCoreLoad = sysinf.loads[0] / get_nprocs();
g_fHighCpuPressure = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); int fHighPressureNew = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9));
if (g_fHighCpuPressure) __atomic_store(&g_fHighCpuPressure, &fHighPressureNew, __ATOMIC_RELEASE);
if (fHighPressureNew)
serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0); serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0);
} }
if (!g_fHighCpuPressure && fHighPressurePrev) { if (!fHighPressureNew && fHighPressurePrev) {
serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced"); serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced");
} }
#else #else
g_fHighCpuPressure = g_fTestMode; g_fHighCpuPressure = g_fTestMode;
#endif #endif
} }

View File

@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
switch(o->encoding) { switch(o->encoding) {
case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o)); case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o));
case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj);
default: return 0; /* Just integer encoding for now. */ default: return 0; /* Just integer encoding for now. */
} }
} }
@ -1266,9 +1266,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
void acceptOnThread(connection *conn, int flags, char *cip) void acceptOnThread(connection *conn, int flags, char *cip)
{ {
int ielCur = ielFromEventLoop(serverTL->el); int ielCur = ielFromEventLoop(serverTL->el);
bool fBootLoad = (g_pserver->loading == LOADING_BOOT);
int ielTarget = 0; int ielTarget = 0;
if (g_pserver->loading) if (fBootLoad)
{ {
ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active
} }
@ -1292,10 +1293,10 @@ void acceptOnThread(connection *conn, int flags, char *cip)
szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN); memcpy(szT, cip, NET_IP_STR_LEN);
} }
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] {
connMarshalThread(conn); connMarshalThread(conn);
acceptCommonHandler(conn,flags,szT,ielTarget); acceptCommonHandler(conn,flags,szT,ielTarget);
if (!g_fTestMode && !g_pserver->loading) if (!g_fTestMode && !fBootLoad)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT); zfree(szT);
}); });
@ -1304,7 +1305,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
return; return;
// If res != AE_OK we can still try to accept on the local thread // If res != AE_OK we can still try to accept on the local thread
} }
if (!g_fTestMode && !g_pserver->loading) if (!g_fTestMode && !fBootLoad)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
aeAcquireLock(); aeAcquireLock();
@ -2266,7 +2267,7 @@ int processMultibulkBuffer(client *c) {
* 1. The client is reset unless there are reasons to avoid doing it. * 1. The client is reset unless there are reasons to avoid doing it.
* 2. In the case of master clients, the replication offset is updated. * 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */ * 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) { void commandProcessed(client *c, int flags) {
long long prev_offset = c->reploff; long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */ /* Update the applied replication offset of our master. */
@ -2294,7 +2295,7 @@ void commandProcessed(client *c) {
ae.arm(c); ae.arm(c);
long long applied = c->reploff - prev_offset; long long applied = c->reploff - prev_offset;
if (applied) { if (applied) {
if (!g_pserver->fActiveReplica) if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE))
{ {
replicationFeedSlavesFromMasterStream(g_pserver->slaves, replicationFeedSlavesFromMasterStream(g_pserver->slaves,
c->pending_querybuf, applied); c->pending_querybuf, applied);
@ -2315,9 +2316,10 @@ void commandProcessed(client *c) {
int processCommandAndResetClient(client *c, int flags) { int processCommandAndResetClient(client *c, int flags) {
int deadclient = 0; int deadclient = 0;
serverTL->current_client = c; serverTL->current_client = c;
AeLocker locker; serverAssert(GlobalLocksAcquired());
if (processCommand(c, flags, locker) == C_OK) {
commandProcessed(c); if (processCommand(c, flags) == C_OK) {
commandProcessed(c, flags);
} }
if (serverTL->current_client == NULL) deadclient = 1; if (serverTL->current_client == NULL) deadclient = 1;
serverTL->current_client = NULL; serverTL->current_client = NULL;
@ -2468,14 +2470,26 @@ void readQueryFromClient(connection *conn) {
return; return;
} }
/* There is more data in the client input buffer, continue parsing it serverTL->vecclientsProcess.push_back(c);
* in case to check if there is a full command to execute. */ }
processInputBuffer(c, CMD_CALL_FULL);
void processClients()
{
serverAssert(GlobalLocksAcquired());
for (client *c : serverTL->vecclientsProcess) {
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
std::unique_lock<fastlock> ul(c->lock);
processInputBuffer(c, CMD_CALL_FULL);
}
if (listLength(serverTL->clients_pending_asyncwrite)) if (listLength(serverTL->clients_pending_asyncwrite))
{ {
aelock.arm(c);
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
} }
serverTL->vecclientsProcess.clear();
} }
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,
@ -2851,7 +2865,7 @@ NULL
if (c->name) if (c->name)
addReplyBulk(c,c->name); addReplyBulk(c,c->name);
else else
addReplyNull(c, shared.nullbulk); addReplyNull(c);
} else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) {
/* CLIENT PAUSE */ /* CLIENT PAUSE */
long long duration; long long duration;
@ -3433,7 +3447,7 @@ void processEventsWhileBlocked(int iel) {
aeReleaseLock(); aeReleaseLock();
serverAssertDebug(!GlobalLocksAcquired()); serverAssert(!GlobalLocksAcquired());
try try
{ {
while (iterations--) { while (iterations--) {

View File

@ -41,14 +41,15 @@
/* ===================== Creation and parsing of objects ==================== */ /* ===================== Creation and parsing of objects ==================== */
robj *createObject(int type, void *ptr) { robj *createObject(int type, void *ptr) {
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED);
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
o->type = type; o->type = type;
o->encoding = OBJ_ENCODING_RAW; o->encoding = OBJ_ENCODING_RAW;
o->m_ptr = ptr; o->m_ptr = ptr;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC setMvccTstamp(o, OBJ_MVCC_INVALID);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
/* Set the LRU to the current lruclock (minutes resolution), or /* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */ * alternatively the LFU counter. */
@ -98,15 +99,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
size_t allocsize = sizeof(struct sdshdr8)+len+1; size_t allocsize = sizeof(struct sdshdr8)+len+1;
if (allocsize < sizeof(void*)) if (allocsize < sizeof(void*))
allocsize = sizeof(void*); allocsize = sizeof(void*);
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED);
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
o->type = OBJ_STRING; o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR; o->encoding = OBJ_ENCODING_EMBSTR;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC setMvccTstamp(o, OBJ_MVCC_INVALID);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
@ -134,13 +136,8 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
* *
* The current limit of 52 is chosen so that the biggest string object * The current limit of 52 is chosen so that the biggest string object
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
#ifdef ENABLE_MVCC #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
#else
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 56
#endif
static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
robj *createStringObject(const char *ptr, size_t len) { robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len); return createEmbeddedStringObject(ptr,len);
@ -400,7 +397,11 @@ void decrRefCount(robj_roptr o) {
case OBJ_CRON: freeCronObject(o); break; case OBJ_CRON: freeCronObject(o); break;
default: serverPanic("Unknown object type"); break; default: serverPanic("Unknown object type"); break;
} }
zfree(o.unsafe_robjcast()); if (g_pserver->fActiveReplica) {
zfree(reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast())-1);
} else {
zfree(o.unsafe_robjcast());
}
} else { } else {
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
} }
@ -1325,13 +1326,12 @@ NULL
* in case of the key has not been accessed for a long time, * in case of the key has not been accessed for a long time,
* because we update the access time only * because we update the access time only
* when the key is read or overwritten. */ * when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); addReplyLongLong(c,LFUDecrAndReturn(o));
#ifdef ENABLE_MVCC
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== nullptr) return; == nullptr) return;
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); uint64_t mvcc = mvccFromObj(o);
#endif addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000);
} else { } else {
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
} }
@ -1374,7 +1374,7 @@ NULL
auto itr = c->db->find(c->argv[2]); auto itr = c->db->find(c->argv[2]);
if (itr == nullptr) { if (itr == nullptr) {
addReplyNull(c, shared.nullbulk); addReplyNull(c);
return; return;
} }
size_t usage = objectComputeSize(itr.val(),samples); size_t usage = objectComputeSize(itr.val(),samples);
@ -1515,7 +1515,11 @@ void redisObject::setrefcount(unsigned ref)
sds serializeStoredStringObject(sds str, robj_roptr o) sds serializeStoredStringObject(sds str, robj_roptr o)
{ {
uint64_t mvcc;
mvcc = mvccFromObj(o);
str = sdscatlen(str, &mvcc, sizeof(mvcc));
str = sdscatlen(str, &(*o), sizeof(robj)); str = sdscatlen(str, &(*o), sizeof(robj));
static_assert((sizeof(robj) + sizeof(mvcc)) == sizeof(redisObjectStack), "");
switch (o->encoding) switch (o->encoding)
{ {
case OBJ_ENCODING_RAW: case OBJ_ENCODING_RAW:
@ -1539,31 +1543,34 @@ sds serializeStoredStringObject(sds str, robj_roptr o)
robj *deserializeStoredStringObject(const char *data, size_t cb) robj *deserializeStoredStringObject(const char *data, size_t cb)
{ {
const robj *oT = (const robj*)data; uint64_t mvcc = *reinterpret_cast<const uint64_t*>(data);
const robj *oT = (const robj*)(data+sizeof(uint64_t));
robj *newObject = nullptr; robj *newObject = nullptr;
switch (oT->encoding) switch (oT->encoding)
{ {
case OBJ_ENCODING_INT: case OBJ_ENCODING_INT:
serverAssert(cb == sizeof(robj)); newObject = createObject(OBJ_STRING, nullptr);
[[fallthrough]]; newObject->encoding = oT->encoding;
newObject->m_ptr = oT->m_ptr;
return newObject;
case OBJ_ENCODING_EMBSTR: case OBJ_ENCODING_EMBSTR:
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); newObject = createEmbeddedStringObject(szFromObj(oT), sdslen(szFromObj(oT)));
memcpy(newObject, data, cb);
newObject->SetFExpires(false);
newObject->setrefcount(1);
return newObject; return newObject;
case OBJ_ENCODING_RAW: case OBJ_ENCODING_RAW:
newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED); newObject = createObject(OBJ_STRING, sdsnewlen(SDS_NOINIT,cb-sizeof(robj)-sizeof(uint64_t)));
memcpy(newObject, data, sizeof(robj)); newObject->lru = oT->lru;
newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); memcpy(newObject->m_ptr, data+sizeof(robj)+sizeof(mvcc), cb-sizeof(robj)-sizeof(mvcc));
memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj));
newObject->SetFExpires(false);
newObject->setrefcount(1);
return newObject; return newObject;
default:
serverPanic("Unknown string object encoding from storage");
} }
serverPanic("Unknown string object encoding from storage"); setMvccTstamp(newObject, mvcc);
return nullptr; newObject->setrefcount(1);
return newObject;
} }
robj *deserializeStoredObjectCore(const void *data, size_t cb) robj *deserializeStoredObjectCore(const void *data, size_t cb)
@ -1591,11 +1598,9 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb)
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); setMvccTstamp(obj, strtoull(szFromObj(auxval), nullptr, 10));
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }
@ -1639,4 +1644,40 @@ sds serializeStoredObject(robj_roptr o, sds sdsPrefix)
return (sds)rdb.io.buffer.ptr; return (sds)rdb.io.buffer.ptr;
} }
serverPanic("Attempting to store unknown object type"); serverPanic("Attempting to store unknown object type");
} }
redisObjectStack::redisObjectStack()
{
// We need to ensure the Extended Object is first in the class layout
serverAssert(reinterpret_cast<ptrdiff_t>(static_cast<redisObject*>(this)) != reinterpret_cast<ptrdiff_t>(this));
}
void *allocPtrFromObj(robj_roptr o) {
if (g_pserver->fActiveReplica)
return reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
return o.unsafe_robjcast();
}
robj *objFromAllocPtr(void *pv) {
if (g_pserver->fActiveReplica) {
return reinterpret_cast<robj*>(reinterpret_cast<redisObjectExtended*>(pv)+1);
}
return reinterpret_cast<robj*>(pv);
}
uint64_t mvccFromObj(robj_roptr o)
{
if (g_pserver->fActiveReplica) {
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
return oe->mvcc_tstamp;
}
return OBJ_MVCC_INVALID;
}
void setMvccTstamp(robj *o, uint64_t mvcc)
{
if (!g_pserver->fActiveReplica)
return;
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o) - 1;
oe->mvcc_tstamp = mvcc;
}

View File

@ -89,7 +89,7 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel) {
if (channel) if (channel)
addReplyBulk(c,channel); addReplyBulk(c,channel);
else else
addReplyNull(c, shared.nullbulk); addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c)); addReplyLongLong(c,clientSubscriptionsCount(c));
} }
@ -117,7 +117,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
if (pattern) if (pattern)
addReplyBulk(c,pattern); addReplyBulk(c,pattern);
else else
addReplyNull(c, shared.nullbulk); addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c)); addReplyLongLong(c,clientSubscriptionsCount(c));
} }

View File

@ -1096,12 +1096,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn
} }
char szT[32]; char szT[32];
#ifdef ENABLE_MVCC
if (g_pserver->fActiveReplica) { if (g_pserver->fActiveReplica) {
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, mvccFromObj(val));
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
} }
#endif
/* Save type, key, value */ /* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveObjectType(rdb,val) == -1) return -1;
@ -1155,7 +1153,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o) int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o)
{ {
robj key; redisObjectStack key;
initStaticStringObject(key,(char*)keystr); initStaticStringObject(key,(char*)keystr);
std::unique_lock<fastlock> ul(g_expireLock, std::defer_lock); std::unique_lock<fastlock> ul(g_expireLock, std::defer_lock);
@ -2136,7 +2134,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
exit(1); exit(1);
} }
RedisModuleIO io; RedisModuleIO io;
robj keyobj; redisObjectStack keyobj;
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
moduleInitIOContext(io,mt,rdb,&keyobj); moduleInitIOContext(io,mt,rdb,&keyobj);
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
@ -2185,9 +2183,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
return NULL; return NULL;
} }
#ifdef ENABLE_MVCC setMvccTstamp(o, mvcc_tstamp);
o->mvcc_tstamp = mvcc_tstamp;
#endif
serverAssert(!o->FExpires()); serverAssert(!o->FExpires());
return o; return o;
} }
@ -2196,7 +2192,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
* needed to provide loading stats. */ * needed to provide loading stats. */
void startLoading(size_t size, int rdbflags) { void startLoading(size_t size, int rdbflags) {
/* Load the DB */ /* Load the DB */
g_pserver->loading = 1; g_pserver->loading = (rdbflags & RDBFLAGS_REPLICATION) ? LOADING_REPLICATION : LOADING_BOOT;
g_pserver->loading_start_time = time(NULL); g_pserver->loading_start_time = time(NULL);
g_pserver->loading_loaded_bytes = 0; g_pserver->loading_loaded_bytes = 0;
g_pserver->loading_total_bytes = size; g_pserver->loading_total_bytes = size;
@ -2461,7 +2457,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} }
} }
else { else {
redisObject keyobj; redisObjectStack keyobj;
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
decrRefCount(subexpireKey); decrRefCount(subexpireKey);
@ -2545,18 +2541,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr; key = nullptr;
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false;
#else
bool fStaleMvccKey = false;
#endif
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is * received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the * responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */ * snapshot taken by the master may not be reflected on the replica. */
robj keyobj; redisObjectStack keyobj;
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) { if (fStaleMvccKey || fExpiredKey) {

View File

@ -1179,6 +1179,7 @@ void processReplconfLicense(client *c, robj *arg)
* full resync. */ * full resync. */
void replconfCommand(client *c) { void replconfCommand(client *c) {
int j; int j;
bool fCapaCommand = false;
if ((c->argc % 2) == 0) { if ((c->argc % 2) == 0) {
/* Number of arguments must be odd to make sure that every /* Number of arguments must be odd to make sure that every
@ -1189,6 +1190,7 @@ void replconfCommand(client *c) {
/* Process every option-value pair. */ /* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) { for (j = 1; j < c->argc; j+=2) {
fCapaCommand = false;
if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) { if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) {
long port; long port;
@ -1213,6 +1215,8 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_PSYNC2; c->slave_capa |= SLAVE_CAPA_PSYNC2;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
fCapaCommand = true;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
/* REPLCONF ACK is used by replica to inform the master the amount /* REPLCONF ACK is used by replica to inform the master the amount
* of replication stream that it processed so far. It is an * of replication stream that it processed so far. It is an
@ -1258,7 +1262,16 @@ void replconfCommand(client *c) {
return; return;
} }
} }
addReply(c,shared.ok);
if (fCapaCommand) {
sds reply = sdsnew("+OK");
if (g_pserver->fActiveReplica)
reply = sdscat(reply, " active-replica");
reply = sdscat(reply, "\r\n");
addReplySds(c, reply);
} else {
addReply(c,shared.ok);
}
} }
/* This function puts a replica in the online state, and should be called just /* This function puts a replica in the online state, and should be called just
@ -2571,6 +2584,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
return PSYNC_NOT_SUPPORTED; return PSYNC_NOT_SUPPORTED;
} }
void parseMasterCapa(redisMaster *mi, sds strcapa)
{
if (sdslen(strcapa) < 1 || strcapa[0] != '+')
return;
char *szStart = strcapa + 1; // skip the +
char *pchEnd = szStart;
mi->isActive = false;
for (;;)
{
if (*pchEnd == ' ' || *pchEnd == '\0') {
// Parse the word
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
mi->isActive = true;
}
szStart = pchEnd + 1;
}
if (*pchEnd == '\0')
break;
++pchEnd;
}
}
/* This handler fires when the non blocking connect was able to /* This handler fires when the non blocking connect was able to
* establish a connection with the master. */ * establish a connection with the master. */
void syncWithMaster(connection *conn) { void syncWithMaster(connection *conn) {
@ -2799,16 +2836,8 @@ void syncWithMaster(connection *conn) {
* *
* The master will ignore capabilities it does not understand. */ * The master will ignore capabilities it does not understand. */
if (mi->repl_state == REPL_STATE_SEND_CAPA) { if (mi->repl_state == REPL_STATE_SEND_CAPA) {
if (g_pserver->fActiveReplica) err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
{ "capa","eof","capa","psync2","capa","activeExpire",NULL);
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
}
else
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
}
if (err) goto write_error; if (err) goto write_error;
sdsfree(err); sdsfree(err);
mi->repl_state = REPL_STATE_RECEIVE_CAPA; mi->repl_state = REPL_STATE_RECEIVE_CAPA;
@ -2823,6 +2852,8 @@ void syncWithMaster(connection *conn) {
if (err[0] == '-') { if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand " serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err); "REPLCONF capa: %s", err);
} else {
parseMasterCapa(mi, err);
} }
sdsfree(err); sdsfree(err);
mi->repl_state = REPL_STATE_SEND_PSYNC; mi->repl_state = REPL_STATE_SEND_PSYNC;
@ -4056,12 +4087,20 @@ int FBrokenLinkToMaster()
listNode *ln; listNode *ln;
listRewind(g_pserver->masters, &li); listRewind(g_pserver->masters, &li);
int connected = 0;
while ((ln = listNext(&li))) while ((ln = listNext(&li)))
{ {
redisMaster *mi = (redisMaster*)listNodeValue(ln); redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (mi->repl_state != REPL_STATE_CONNECTED) if (mi->repl_state == REPL_STATE_CONNECTED)
return true; ++connected;
} }
if (g_pserver->repl_quorum < 0) {
return connected < (int)listLength(g_pserver->masters);
} else {
return connected < g_pserver->repl_quorum;
}
return false; return false;
} }

View File

@ -1847,6 +1847,18 @@ void clientsCron(int iel) {
freeClientsInAsyncFreeQueue(iel); freeClientsInAsyncFreeQueue(iel);
} }
bool expireOwnKeys()
{
if (iAmMaster()) {
return true;
} else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) {
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
if (mi->isActive)
return true;
}
return false;
}
/* This function handles 'background' operations we are required to do /* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing, * incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */ * rehashing. */
@ -1854,7 +1866,7 @@ void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves /* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */ * as master will synthesize DELs for us. */
if (g_pserver->active_expire_enabled) { if (g_pserver->active_expire_enabled) {
if (iAmMaster()) { if (expireOwnKeys()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else { } else {
expireSlaveKeys(); expireSlaveKeys();
@ -2383,6 +2395,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
} }
extern int ProcessingEventsWhileBlocked; extern int ProcessingEventsWhileBlocked;
void processClients();
/* This function gets called every time Redis is entering the /* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep * main loop of the event driven library, that is, before to sleep
@ -2408,7 +2421,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
aeSetDontWait(eventLoop, tlsHasPendingData()); aeSetDontWait(eventLoop, tlsHasPendingData());
aeAcquireLock(); aeAcquireLock();
processClients();
/* Handle precise timeouts of blocked clients. */ /* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout(); handleBlockedClientsTimeout();
@ -2669,6 +2683,7 @@ void initMasterInfo(redisMaster *master)
master->cached_master = NULL; master->cached_master = NULL;
master->master_initial_offset = -1; master->master_initial_offset = -1;
master->isActive = false;
master->repl_state = REPL_STATE_NONE; master->repl_state = REPL_STATE_NONE;
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
@ -3830,7 +3845,7 @@ void call(client *c, int flags) {
!(flags & CMD_CALL_PROPAGATE_AOF)) !(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF; propagate_flags &= ~PROPAGATE_AOF;
if (c->cmd->flags & CMD_SKIP_PROPOGATE) if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica)
propagate_flags &= ~PROPAGATE_REPL; propagate_flags &= ~PROPAGATE_REPL;
/* Call propagate() only if at least one of AOF / replication /* Call propagate() only if at least one of AOF / replication
@ -3915,12 +3930,12 @@ void call(client *c, int flags) {
* If C_OK is returned the client is still alive and valid and * If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise * other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */ * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c, int callFlags, AeLocker &locker) { int processCommand(client *c, int callFlags) {
AssertCorrectThread(c); AssertCorrectThread(c);
serverAssert(GlobalLocksAcquired());
if (moduleHasCommandFilters()) if (moduleHasCommandFilters())
{ {
locker.arm(c);
moduleCallCommandFilters(c); moduleCallCommandFilters(c);
} }
@ -3955,9 +3970,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
return C_OK; return C_OK;
} }
if (!locker.isArmed())
c->db->prefetchKeysAsync(locker, c);
/* Check if the user is authenticated. This check is skipped in case /* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */ * the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
@ -3975,9 +3987,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
/* Check if the user can run this command according to the current /* Check if the user can run this command according to the current
* ACLs. */ * ACLs. */
if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS))
locker.arm(c); // ACLs require the lock
int acl_keypos; int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) { if (acl_retval != ACL_OK) {
@ -4005,7 +4014,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand)) c->cmd->proc != execCommand))
{ {
locker.arm(c);
int hashslot; int hashslot;
int error_code; int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
@ -4021,6 +4029,8 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
} }
} }
incrementMvccTstamp();
/* Handle the maxmemory directive. /* Handle the maxmemory directive.
* *
* Note that we do not want to reclaim memory if we are here re-entering * Note that we do not want to reclaim memory if we are here re-entering
@ -4028,7 +4038,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
* condition, to avoid mixing the propagation of scripts with the * condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */ * propagation of DELs due to eviction. */
if (g_pserver->maxmemory && !g_pserver->lua_timedout) { if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
locker.arm(c);
int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR; int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR;
/* freeMemoryIfNeeded may flush replica output buffers. This may result /* freeMemoryIfNeeded may flush replica output buffers. This may result
* into a replica, that may be the active client, to be freed. */ * into a replica, that may be the active client, to be freed. */
@ -4064,7 +4073,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
* and if this is a master instance. */ * and if this is a master instance. */
if (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand) if (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)
{ {
locker.arm(c);
int deny_write_type = writeCommandsDeniedByDiskError(); int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE && if (deny_write_type != DISK_ERROR_TYPE_NONE &&
listLength(g_pserver->masters) == 0 && listLength(g_pserver->masters) == 0 &&
@ -4124,7 +4132,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
if (listLength(g_pserver->masters)) if (listLength(g_pserver->masters))
{ {
locker.arm(c);
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when replica-serve-stale-data is no and we are a replica with a broken * when replica-serve-stale-data is no and we are a replica with a broken
* link with master. */ * link with master. */
@ -4142,8 +4149,12 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
/* Loading DB? Return an error if the command has not the /* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */ * CMD_LOADING flag. */
if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) { if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr); /* Active Replicas can execute read only commands, and optionally write commands */
return C_OK; if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad)))
{
addReply(c, shared.loadingerr);
return C_OK;
}
} }
/* Lua script too slow? Only allow a limited number of commands. /* Lua script too slow? Only allow a limited number of commands.
@ -4181,8 +4192,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
locker.arm(c);
incrementMvccTstamp();
call(c,callFlags); call(c,callFlags);
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
if (listLength(g_pserver->ready_keys)) if (listLength(g_pserver->ready_keys))
@ -4809,7 +4818,7 @@ sds genRedisInfoString(const char *section) {
"aof_last_cow_size:%zu\r\n" "aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n" "module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n", "module_fork_last_cow_size:%zu\r\n",
g_pserver->loading.load(std::memory_order_relaxed), !!g_pserver->loading.load(std::memory_order_relaxed), /* Note: libraries expect 1 or 0 here so coerce our enum */
g_pserver->dirty, g_pserver->dirty,
g_pserver->FRdbSaveInProgress(), g_pserver->FRdbSaveInProgress(),
(intmax_t)g_pserver->lastsave, (intmax_t)g_pserver->lastsave,
@ -4958,64 +4967,61 @@ sds genRedisInfoString(const char *section) {
listLength(g_pserver->masters) == 0 ? "master" listLength(g_pserver->masters) == 0 ? "master"
: g_pserver->fActiveReplica ? "active-replica" : "slave"); : g_pserver->fActiveReplica ? "active-replica" : "slave");
if (listLength(g_pserver->masters)) { if (listLength(g_pserver->masters)) {
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
bool fAllUp = true;
while ((ln = listNext(&li))) {
redisMaster *mi = (redisMaster*)listNodeValue(ln);
fAllUp = fAllUp && mi->repl_state == REPL_STATE_CONNECTED;
}
info = sdscatprintf(info, "master_global_link_status:%s\r\n", info = sdscatprintf(info, "master_global_link_status:%s\r\n",
fAllUp ? "up" : "down"); FBrokenLinkToMaster() ? "down" : "up");
int cmasters = 0; int cmasters = 0;
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li); listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li))) while ((ln = listNext(&li)))
{ {
long long slave_repl_offset = 1; long long slave_repl_offset = 1;
redisMaster *mi = (redisMaster*)listNodeValue(ln); redisMaster *mi = (redisMaster*)listNodeValue(ln);
info = sdscatprintf(info, "Master %d: \r\n", cmasters);
++cmasters;
if (mi->master) if (mi->master)
slave_repl_offset = mi->master->reploff; slave_repl_offset = mi->master->reploff;
else if (mi->cached_master) else if (mi->cached_master)
slave_repl_offset = mi->cached_master->reploff; slave_repl_offset = mi->cached_master->reploff;
char master_prefix[128] = "";
if (cmasters != 0) {
snprintf(master_prefix, sizeof(master_prefix), "_%d", cmasters);
}
info = sdscatprintf(info, info = sdscatprintf(info,
"master_host:%s\r\n" "master%s_host:%s\r\n"
"master_port:%d\r\n" "master%s_port:%d\r\n"
"master_link_status:%s\r\n" "master%s_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n" "master%s_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n" "master%s_sync_in_progress:%d\r\n"
"slave_repl_offset:%lld\r\n" "slave_repl_offset:%lld\r\n"
,mi->masterhost, ,master_prefix, mi->masterhost,
mi->masterport, master_prefix, mi->masterport,
(mi->repl_state == REPL_STATE_CONNECTED) ? master_prefix, (mi->repl_state == REPL_STATE_CONNECTED) ?
"up" : "down", "up" : "down",
mi->master ? master_prefix, mi->master ?
((int)(g_pserver->unixtime-mi->master->lastinteraction)) : -1, ((int)(g_pserver->unixtime-mi->master->lastinteraction)) : -1,
mi->repl_state == REPL_STATE_TRANSFER, master_prefix, mi->repl_state == REPL_STATE_TRANSFER,
slave_repl_offset slave_repl_offset
); );
if (mi->repl_state == REPL_STATE_TRANSFER) { if (mi->repl_state == REPL_STATE_TRANSFER) {
info = sdscatprintf(info, info = sdscatprintf(info,
"master_sync_left_bytes:%lld\r\n" "master%s_sync_left_bytes:%lld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n" "master%s_sync_last_io_seconds_ago:%d\r\n"
, (long long) , master_prefix, (long long)
(mi->repl_transfer_size - mi->repl_transfer_read), (mi->repl_transfer_size - mi->repl_transfer_read),
(int)(g_pserver->unixtime-mi->repl_transfer_lastio) master_prefix, (int)(g_pserver->unixtime-mi->repl_transfer_lastio)
); );
} }
if (mi->repl_state != REPL_STATE_CONNECTED) { if (mi->repl_state != REPL_STATE_CONNECTED) {
info = sdscatprintf(info, info = sdscatprintf(info,
"master_link_down_since_seconds:%jd\r\n", "master%s_link_down_since_seconds:%jd\r\n",
(intmax_t)g_pserver->unixtime-mi->repl_down_since); master_prefix, (intmax_t)g_pserver->unixtime-mi->repl_down_since);
} }
++cmasters;
} }
info = sdscatprintf(info, info = sdscatprintf(info,
"slave_priority:%d\r\n" "slave_priority:%d\r\n"

View File

@ -96,6 +96,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "semiorderedset.h" #include "semiorderedset.h"
#include "connection.h" /* Connection abstraction */ #include "connection.h" /* Connection abstraction */
#include "serverassert.h" #include "serverassert.h"
#include "expire.h"
#define REDISMODULE_CORE 1 #define REDISMODULE_CORE 1
#include "redismodule.h" /* Redis modules API defines. */ #include "redismodule.h" /* Redis modules API defines. */
@ -112,6 +113,9 @@ typedef long long ustime_t; /* microsecond time type. */
#define FImplies(x, y) (!(x) || (y)) #define FImplies(x, y) (!(x) || (y))
#define LOADING_BOOT 1
#define LOADING_REPLICATION 2
extern int g_fTestMode; extern int g_fTestMode;
extern struct redisServer *g_pserver; extern struct redisServer *g_pserver;
@ -868,7 +872,16 @@ typedef struct RedisModuleDigest {
#define MVCC_MS_SHIFT 20 #define MVCC_MS_SHIFT 20
typedef struct redisObject { // This struct will be allocated ahead of the ROBJ when needed
struct redisObjectExtended {
uint64_t mvcc_tstamp;
};
typedef class redisObject {
protected:
redisObject() {}
public:
unsigned type:4; unsigned type:4;
unsigned encoding:4; unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
@ -877,9 +890,6 @@ typedef struct redisObject {
private: private:
mutable std::atomic<unsigned> refcount {0}; mutable std::atomic<unsigned> refcount {0};
public: public:
#ifdef ENABLE_MVCC
uint64_t mvcc_tstamp;
#endif
void *m_ptr; void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
@ -890,11 +900,18 @@ public:
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
} robj; } robj;
#ifdef ENABLE_MVCC
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
#else
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
#endif
class redisObjectStack : public redisObjectExtended, public redisObject
{
public:
redisObjectStack();
};
uint64_t mvccFromObj(robj_roptr o);
void setMvccTstamp(redisObject *o, uint64_t mvcc);
void *allocPtrFromObj(robj_roptr o);
robj *objFromAllocPtr(void *pv);
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
{ {
@ -920,281 +937,6 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o); return (char*)ptrFromObj(o);
} }
class expireEntryFat
{
friend class expireEntry;
public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
subexpireEntry(const subexpireEntry &e)
: when(e.when), spsubkey(nullptr, sdsfree)
{
if (e.spsubkey)
spsubkey = std::unique_ptr<const char, void(*)(const char*)>((const char*)sdsdup((sds)e.spsubkey.get()), sdsfree);
}
subexpireEntry(subexpireEntry &&e) = default;
subexpireEntry& operator=(subexpireEntry &&e) = default;
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sdsimmutablestring m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
public:
expireEntryFat(const sdsimmutablestring &keyPrimary)
: m_keyPrimary(keyPrimary)
{}
expireEntryFat(const expireEntryFat &e) = default;
expireEntryFat(expireEntryFat &&e) = default;
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return static_cast<const char*>(m_keyPrimary); }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when)
{
// First check if the subkey already has an expiration
for (auto &entry : m_vecexpireEntries)
{
if (szSubkey != nullptr)
{
// if this is a subkey expiry then its not a match if the expireEntry is either for the
// primary key or a different subkey
if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0)
continue;
}
else
{
if (entry.spsubkey != nullptr)
continue;
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data()));
break;
}
auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr;
m_vecexpireEntries.emplace(itrInsert, when, subkey);
}
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); }
const subexpireEntry &operator[](size_t idx) const { return m_vecexpireEntries[idx]; }
size_t size() const noexcept { return m_vecexpireEntries.size(); }
};
class expireEntry {
struct
{
sdsimmutablestring m_key;
expireEntryFat *m_pfatentry = nullptr;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
class iter
{
friend class expireEntry;
const expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(const expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
iter &operator++() { ++m_idx; return *this; }
const char *subkey() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].spsubkey.get();
return nullptr;
}
long long when() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].when;
return m_pentry->when();
}
bool operator!=(const iter &other)
{
return m_idx != other.m_idx;
}
const iter &operator*() const { return *this; }
};
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key)));
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = sdsimmutablestring(sdsdupshared(key));
m_when = when;
}
}
expireEntry(expireEntryFat *pfatentry)
{
u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
}
expireEntry(const expireEntry &e)
{
*this = e;
}
expireEntry(expireEntry &&e)
{
u.m_key = std::move(e.u.m_key);
u.m_pfatentry = std::move(e.u.m_pfatentry);
m_when = e.m_when;
e.m_when = 0;
e.u.m_pfatentry = nullptr;
}
~expireEntry()
{
if (FFat())
delete u.m_pfatentry;
}
expireEntry &operator=(const expireEntry &e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
if (e.FFat())
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry);
return *this;
}
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key));
else
u.m_key = sdsimmutablestring(sdsdupshared(key));
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; }
bool operator==(const sdsview &key) const noexcept
{
return key == this->key();
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return static_cast<const char*>(u.m_key);
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sdsimmutablestring keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() const { return iter(this, 0); }
iter end() const
{
if (FFat())
return iter(this, u.m_pfatentry->size());
return iter(this, 1);
}
void erase(iter &itr)
{
if (!FFat())
throw -1; // assert
pfatentry()->m_vecexpireEntries.erase(
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
size_t size() const
{
if (FFat())
return u.m_pfatentry->size();
return 1;
}
bool FGetPrimaryExpire(long long *pwhen) const
{
*pwhen = -1;
for (auto itr : *this)
{
if (itr.subkey() == nullptr)
{
*pwhen = itr.when();
return true;
}
}
return false;
}
explicit operator sdsview() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
};
typedef semiorderedset<expireEntry, sdsview, true /*expireEntry can be memmoved*/> expireset;
extern fastlock g_expireLock;
/* The a string name for an object's type as listed above /* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
* and Module types have their registered name returned. */ * and Module types have their registered name returned. */
@ -1940,6 +1682,9 @@ typedef struct redisTLSContextConfig {
char *ciphers; char *ciphers;
char *ciphersuites; char *ciphersuites;
int prefer_server_ciphers; int prefer_server_ciphers;
int session_caching;
int session_cache_size;
int session_cache_timeout;
} redisTLSContextConfig; } redisTLSContextConfig;
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
@ -2038,6 +1783,7 @@ struct redisServerThreadVars {
GarbageCollectorCollection::Epoch gcEpoch; GarbageCollectorCollection::Epoch gcEpoch;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
bool fRetrySetAofEvent = false; bool fRetrySetAofEvent = false;
std::vector<client*> vecclientsProcess;
int getRdbKeySaveDelay(); int getRdbKeySaveDelay();
private: private:
@ -2057,6 +1803,7 @@ struct redisMaster {
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */ long long master_initial_offset; /* Master PSYNC offset. */
bool isActive = false;
int repl_state; /* Replication status if the instance is a replica */ int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
@ -2360,6 +2107,7 @@ struct redisServer {
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
int repl_serve_stale_data; /* Serve stale data when link is down? */ int repl_serve_stale_data; /* Serve stale data when link is down? */
int repl_quorum; /* For multimaster what do we consider a quorum? -1 means all master must be online */
int repl_slave_ro; /* Slave is read only? */ int repl_slave_ro; /* Slave is read only? */
int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */
int slave_priority; /* Reported in INFO and used by Sentinel. */ int slave_priority; /* Reported in INFO and used by Sentinel. */
@ -2480,6 +2228,7 @@ struct redisServer {
int watchdog_period; /* Software watchdog period in ms. 0 = off */ int watchdog_period; /* Software watchdog period in ms. 0 = off */
int fActiveReplica; /* Can this replica also be a master? */ int fActiveReplica; /* Can this replica also be a master? */
int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */
// Format: // Format:
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Lower 20 bits: a counter incrementing for each command executed in the same millisecond
@ -3071,7 +2820,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
size_t freeMemoryGetNotCountedMemory(); size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot); int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot);
int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot); int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot);
int processCommand(client *c, int callFlags, class AeLocker &locker); int processCommand(client *c, int callFlags);
void setupSignalHandlers(void); void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(const char *s); struct redisCommand *lookupCommandByCString(const char *s);

View File

@ -331,7 +331,7 @@ void lindexCommand(client *c) {
addReplyBulk(c,value); addReplyBulk(c,value);
decrRefCount(value); decrRefCount(value);
} else { } else {
addReplyNull(c,shared.nullbulk); addReplyNull(c);
} }
} else { } else {
serverPanic("Unknown list encoding"); serverPanic("Unknown list encoding");

View File

@ -150,8 +150,6 @@ void tlsInit(void) {
serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator.");
} }
/* Server configuration */
g_pserver->tls_auth_clients = 1; /* Secure by default */
tlsInitThread(); tlsInitThread();
} }
@ -193,6 +191,15 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) {
SSL_CTX_set_options(ctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS); SSL_CTX_set_options(ctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS);
#endif #endif
if (ctx_config->session_caching) {
SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_SERVER);
SSL_CTX_sess_set_cache_size(ctx, ctx_config->session_cache_size);
SSL_CTX_set_timeout(ctx, ctx_config->session_cache_timeout);
SSL_CTX_set_session_id_context(ctx, (const unsigned char*) "KeyDB", 5);
} else {
SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF);
}
protocols = parseProtocolsConfig(ctx_config->protocols); protocols = parseProtocolsConfig(ctx_config->protocols);
if (protocols == -1) goto error; if (protocols == -1) goto error;

View File

@ -96,6 +96,26 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
} }
} }
} }
# Keep this test last since it muchs with the config
if [string equal $topology "mesh"] {
test "$topology_name quorum respected" {
$R(0) config set replica-serve-stale-data no
# No issues when all nodes are connected with default settings
$R(0) get testkey
# No issues when quorum is equal to the number of nodes
$R(0) config set replica-quorum 3
$R(0) get testkey
$R(0) config set replica-quorum 4
catch {
$R(0) get testkey
} e
assert_match {*MASTER is down*} $e
}
}
} }
} }
} }

View File

@ -78,17 +78,8 @@ start_server {tags {"introspection"}} {
syslog-facility syslog-facility
databases databases
port port
io-threads
tls-port tls-port
tls-prefer-server-ciphers io-threads
tls-cert-file
tls-key-file
tls-dh-params-file
tls-ca-cert-file
tls-ca-cert-dir
tls-protocols
tls-ciphers
tls-ciphersuites
logfile logfile
unixsocketperm unixsocketperm
slaveof slaveof
@ -104,6 +95,23 @@ start_server {tags {"introspection"}} {
use-fork use-fork
} }
if {!$::tls} {
append skip_configs {
tls-prefer-server-ciphers
tls-session-cache-timeout
tls-session-cache-size
tls-session-caching
tls-cert-file
tls-key-file
tls-dh-params-file
tls-ca-cert-file
tls-ca-cert-dir
tls-protocols
tls-ciphers
tls-ciphersuites
}
}
set configs {} set configs {}
foreach {k v} [r config get *] { foreach {k v} [r config get *] {
if {[lsearch $skip_configs $k] != -1} { if {[lsearch $skip_configs $k] != -1} {

View File

@ -109,6 +109,7 @@ start_server {tags {"pubsub"}} {
unsubscribe $rd1 unsubscribe $rd1
# Wait for a response to the unsub # Wait for a response to the unsub
__consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3} __consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3}
after 1
assert_equal 0 [r publish chan1 hello] assert_equal 0 [r publish chan1 hello]
assert_equal 0 [r publish chan2 hello] assert_equal 0 [r publish chan2 hello]
assert_equal 0 [r publish chan3 hello] assert_equal 0 [r publish chan3 hello]