From 773ecdcdd42b09da2c1181cab92704dfdb99fc80 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 16:24:23 -0500 Subject: [PATCH 1/6] Update issue templates Former-commit-id: becd3fc5a34d9c7d665531290c017f13d1b03c16 --- .github/ISSUE_TEMPLATE/bug_report.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 000000000..91b34ebe0 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,20 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: '' +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +** Log Files ** +These should be KeyDB logs, not syslogs or logs from your container manager. If you are reporting a crash there will be a line in your log stating: +"=== KEYDB BUG REPORT START: Cut & paste starting from here ===" + +Please copy everything after this line. + +**To Reproduce** +Do you know how to reproduce this? If so please provide repro steps. From 75fe5ea8e830fae4fdc63bd5e14b92099e17bae9 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 16:25:33 -0500 Subject: [PATCH 2/6] Update issue templates Former-commit-id: 3ea56ffb38efd4fbfbb096481668be69cc61b15f --- .github/ISSUE_TEMPLATE/feature_request.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 000000000..bbcbbe7d6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: '' +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. From b3dd59db5cdd913e4689d3c1adf51b71ddae5da8 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 17:00:40 -0500 Subject: [PATCH 3/6] Additional asserts ensuring the client is creating on the correct thread Former-commit-id: 937702ea1dd0a4331dd7c66ee9f5c2c3f2354d10 --- src/networking.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.cpp b/src/networking.cpp index ef3ee2683..97744c410 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -99,6 +99,7 @@ client *createClient(int fd, int iel) { * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { + serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (cserver.tcpkeepalive) From 051bde5d3da3852beb478fcf43807286661ba45d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 19:44:31 -0500 Subject: [PATCH 4/6] Fix issue #107, active replicas do their own expires Former-commit-id: 8e4f323439df29a5e8c0de9db7a848291721fd07 --- src/db.cpp | 6 ++++-- src/expire.cpp | 2 +- src/replication.cpp | 23 +++++++++++++++++++++-- src/server.cpp | 6 +++--- src/server.h | 1 + tests/integration/replication-active.tcl | 21 +++++++++++++++++++++ 6 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index d8aa0dc47..605baaf40 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1374,7 +1374,9 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { if (g_pserver->aof_state != AOF_OFF) feedAppendOnlyFile(cserver.delCommand,db->id,argv,2); - replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); decrRefCount(argv[0]); decrRefCount(argv[1]); @@ -1442,7 +1444,7 @@ int expireIfNeeded(redisDb *db, robj *key) { * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ - if (listLength(g_pserver->masters)) return 1; + if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1; /* Delete the key */ g_pserver->stat_expiredkeys++; diff --git a/src/expire.cpp b/src/expire.cpp index 62b5e7a6e..fdad83638 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -535,7 +535,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ - if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) { + if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) { robj *aux; int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : diff --git a/src/replication.cpp b/src/replication.cpp index d2f948567..89f73eb06 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -669,6 +669,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ @@ -1002,6 +1003,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_EOF; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2")) c->slave_capa |= SLAVE_CAPA_PSYNC2; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) + c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1071,6 +1074,14 @@ void putSlaveOnline(client *replica) { refreshGoodSlavesCount(); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(replica)); + + if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica) + { + serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations." + "\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica)); + serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions." + "\n\tPlease fix your replica topology"); + } } void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -2094,8 +2105,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", - "capa","eof","capa","psync2",NULL); + if (g_pserver->fActiveReplica) + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); + } + else + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2",NULL); + } if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; diff --git a/src/server.cpp b/src/server.cpp index f01c8bb00..6fc44f387 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1703,9 +1703,9 @@ void clientsCron(int iel) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) { + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (listLength(g_pserver->masters)) { + } else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { expireSlaveKeys(); } @@ -2105,7 +2105,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked diff --git a/src/server.h b/src/server.h index 29c2db301..81a5da8c3 100644 --- a/src/server.h +++ b/src/server.h @@ -433,6 +433,7 @@ public: #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ +#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 2ba761766..4f37f2adf 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] + set master_pid [s process_id] # Use a short replication timeout on the slave, so that if there # are no bugs the timeout is triggered in a reasonable amount @@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid + after 3000 + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info + assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} + } + exec kill -SIGCONT $slave_pid + test {Active replica different databases} { $master select 3 $master set testkey abcd From 04f3a6003256bcad9f0d669d534ca5fc0d570178 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Nov 2019 20:35:14 -0500 Subject: [PATCH 5/6] Add missing dependency to dockerfile Former-commit-id: 39c6fbddb1dc6738c3ea7cbc2bd5d5bc7fb46ccf --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 931f30d59..d3cea108e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ FROM ubuntu:18.04 RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive apt-get install -qqy \ - build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev \ + build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev libcurl4-openssl-dev \ && apt-get clean CMD make From 27c23b0c9b429b2f033ce412786b4b38a37d1407 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Nov 2019 20:05:16 -0500 Subject: [PATCH 6/6] Fix issue #83 Former-commit-id: 3028a890ef11cd99b2c7538de0f480d2466eb150 --- src/object.cpp | 28 +++++++++++++ src/rdb.cpp | 9 +++- src/replication.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++- src/server.cpp | 1 + src/server.h | 90 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 224 insertions(+), 4 deletions(-) diff --git a/src/object.cpp b/src/object.cpp index e7f053a24..2a9c3f215 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) { return C_OK; } +int getUnsignedLongLongFromObject(robj *o, uint64_t *target) { + uint64_t value; + + if (o == NULL) { + value = 0; + } else { + serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); + if (sdsEncodedObject(o)) { + char *pchEnd = nullptr; + errno = 0; + value = strtoull(szFromObj(o), &pchEnd, 10); + if (value == 0) { + // potential error + if (errno != 0) + return C_ERR; + if (pchEnd == szFromObj(o)) + return C_ERR; + } + } else if (o->encoding == OBJ_ENCODING_INT) { + value = (long)ptrFromObj(o); + } else { + serverPanic("Unknown string encoding"); + } + } + if (target) *target = value; + return C_OK; +} + int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) { long long value; if (getLongLongFromObject(o, &value) != C_OK) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 476b2e2e4..1c5b25d16 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2115,12 +2115,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; + bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) { + bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now; + if (fStaleMvccKey || fExpiredKey) { + if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) { + // We have a key that we've already deleted and is not back in our database. + // We'll need to inform the sending master of the delete if it is also a replica of us + rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key); + } decrRefCount(key); key = nullptr; decrRefCount(val); diff --git a/src/replication.cpp b/src/replication.cpp index 89f73eb06..c912abba8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd); void replicationSendAck(redisMaster *mi); void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); +static void propagateMasterStaleKeys(); /* --------------------------- Utility functions ---------------------------- */ @@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters() return false; } +client *replicaFromMaster(redisMaster *mi) +{ + if (mi->master == nullptr) + return nullptr; + + listIter liReplica; + listNode *lnReplica; + listRewind(g_pserver->slaves, &liReplica); + while ((lnReplica = listNext(&liReplica)) != nullptr) + { + client *replica = (client*)listNodeValue(lnReplica); + if (FSameHost(mi->master, replica)) + return replica; + } + return nullptr; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char uuid[40] = {'\0'}; uuid_unparse(cserver.uuid, uuid); char proto[1024]; - int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid); + int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); + int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); + cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that + + char szMvcc[128]; + uint64_t mvccTstamp = getMvccTstamp(); + int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); + int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); + cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); + feedReplicationBacklog(szMvcc, cchMvcc); } } @@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { { addReplyAsync(replica,shared.crlf); addReplyProtoAsync(replica, szDbNum, cchDbNum); + addReplyProtoAsync(replica, szMvcc, cchMvcc); } } @@ -1587,6 +1615,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (g_pserver->fActiveReplica) + { + rsi.mvccMinThreshold = mi->mvccLastSync; + if (mi->staleKeyMap != nullptr) + mi->staleKeyMap->clear(); + else + mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); + rsi.mi = mi; + } if (rdbLoadFile(rdb_filename, &rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(mi); @@ -2382,6 +2419,7 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + delete mi->staleKeyMap; zfree(mi); } @@ -3215,6 +3253,8 @@ void replicationCron(void) { } } + propagateMasterStaleKeys(); + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ @@ -3361,6 +3401,17 @@ void replicaReplayCommand(client *c) } } + uint64_t mvcc = 0; + if (c->argc >= 5) + { + if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK) + { + addReplyError(c, "Invalid MVCC Timestamp"); + s_pstate->Cancel(); + return; + } + } + if (FSameUuidNoNil(uuid, cserver.uuid)) { addReply(c, shared.ok); @@ -3387,6 +3438,11 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); + redisMaster *mi = MasterInfoFromClient(c); + if (mi != nullptr) // this should never be null but I'd prefer not to crash + { + mi->mvccLastSync = mvcc; + } } else { @@ -3421,3 +3477,43 @@ void updateMasterAuth() mi->masteruser = zstrdup(cserver.default_masteruser); } } + +static void propagateMasterStaleKeys() +{ + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + robj *rgobj[2]; + + rgobj[0] = createEmbeddedStringObject("DEL", 3); + + while ((ln = listNext(&li)) != nullptr) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->staleKeyMap != nullptr) + { + if (mi->master != nullptr) + { + for (auto &pair : *mi->staleKeyMap) + { + if (pair.second.empty()) + continue; + + client *replica = replicaFromMaster(mi); + if (replica == nullptr) + continue; + + for (auto &spkey : pair.second) + { + rgobj[1] = spkey.get(); + replicationFeedSlave(replica, pair.first, rgobj, 2, false); + } + } + delete mi->staleKeyMap; + mi->staleKeyMap = nullptr; + } + } + } + + decrRefCount(rgobj[0]); +} \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 6fc44f387..3c8c588cd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2319,6 +2319,7 @@ void initMasterInfo(redisMaster *master) master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ + master->mvccLastSync = 0; } void initServerConfig(void) { diff --git a/src/server.h b/src/server.h index 81a5da8c3..ad7f2826c 100644 --- a/src/server.h +++ b/src/server.h @@ -54,6 +54,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -144,6 +145,87 @@ public: } }; +void decrRefCount(robj_roptr o); +void incrRefCount(robj_roptr o); +class robj_sharedptr +{ + redisObject *m_ptr; + +public: + robj_sharedptr() + : m_ptr(nullptr) + {} + robj_sharedptr(redisObject *ptr) + : m_ptr(ptr) + { + incrRefCount(ptr); + } + ~robj_sharedptr() + { + if (m_ptr) + decrRefCount(m_ptr); + } + robj_sharedptr(const robj_sharedptr& other) + { + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + } + + robj_sharedptr(robj_sharedptr&& other) + { + m_ptr = other.m_ptr; + other.m_ptr = nullptr; + } + + robj_sharedptr &operator=(const robj_sharedptr& other) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + return *this; + } + robj_sharedptr &operator=(redisObject *ptr) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = ptr; + incrRefCount(m_ptr); + return *this; + } + + bool operator==(const robj_sharedptr &other) const + { + return m_ptr == other.m_ptr; + } + + bool operator!=(const robj_sharedptr &other) const + { + return m_ptr != other.m_ptr; + } + + redisObject* operator->() const + { + return m_ptr; + } + + bool operator!() const + { + return !m_ptr; + } + + operator bool() const{ + return !!m_ptr; + } + + operator redisObject *() + { + return (redisObject*)m_ptr; + } + + redisObject *get() { return m_ptr; } +}; + /* Error codes */ #define C_OK 0 #define C_ERR -1 @@ -1391,9 +1473,11 @@ typedef struct rdbSaveInfo { char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ long long repl_offset; /* Replication offset. */ int fForceSetKey; + uint64_t mvccMinThreshold; + struct redisMaster *mi; } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} +#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, nullptr} struct malloc_stats { size_t zmalloc_used; @@ -1467,6 +1551,9 @@ struct redisMaster { unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ + uint64_t mvccLastSync; + /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ + std::map> *staleKeyMap; }; // Const vars are not changed after worker threads are launched @@ -2156,6 +2243,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg); int getDoubleFromObject(const robj *o, double *target); int getLongLongFromObject(robj *o, long long *target); +int getUnsignedLongLongFromObject(robj *o, uint64_t *target); int getLongDoubleFromObject(robj *o, long double *target); int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg); const char *strEncoding(int encoding);