Merge branch 'unstable' into advanced_db
Former-commit-id: 00ad497fb3f409dbfcaa62fa9ea5d793263eb13d
This commit is contained in:
commit
b057c1f8fb
20
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
20
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
---
|
||||
name: Bug report
|
||||
about: Create a report to help us improve
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Describe the bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
||||
** Log Files **
|
||||
These should be KeyDB logs, not syslogs or logs from your container manager. If you are reporting a crash there will be a line in your log stating:
|
||||
"=== KEYDB BUG REPORT START: Cut & paste starting from here ==="
|
||||
|
||||
Please copy everything after this line.
|
||||
|
||||
**To Reproduce**
|
||||
Do you know how to reproduce this? If so please provide repro steps.
|
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
---
|
||||
name: Feature request
|
||||
about: Suggest an idea for this project
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||
|
||||
**Describe the solution you'd like**
|
||||
A clear and concise description of what you want to happen.
|
||||
|
||||
**Describe alternatives you've considered**
|
||||
A clear and concise description of any alternative solutions or features you've considered.
|
||||
|
||||
**Additional context**
|
||||
Add any other context or screenshots about the feature request here.
|
@ -4,7 +4,7 @@ FROM ubuntu:18.04
|
||||
|
||||
RUN apt-get update \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install -qqy \
|
||||
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev \
|
||||
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev libcurl4-openssl-dev \
|
||||
&& apt-get clean
|
||||
|
||||
CMD make
|
||||
|
@ -1447,7 +1447,9 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
|
||||
if (g_pserver->aof_state != AOF_OFF)
|
||||
feedAppendOnlyFile(cserver.delCommand,db->id,argv,2);
|
||||
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
|
||||
// Active replicas do their own expiries, do not propogate
|
||||
if (!g_pserver->fActiveReplica)
|
||||
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
|
||||
|
||||
decrRefCount(argv[0]);
|
||||
decrRefCount(argv[1]);
|
||||
@ -1515,7 +1517,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
|
||||
* Still we try to return the right information to the caller,
|
||||
* that is, 0 if we think the key should be still valid, 1 if
|
||||
* we think the key is expired at this time. */
|
||||
if (listLength(g_pserver->masters)) return 1;
|
||||
if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1;
|
||||
|
||||
/* Delete the key */
|
||||
g_pserver->stat_expiredkeys++;
|
||||
|
@ -534,7 +534,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
|
||||
*
|
||||
* Instead we take the other branch of the IF statement setting an expire
|
||||
* (possibly in the past) and wait for an explicit DEL from the master. */
|
||||
if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) {
|
||||
if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) {
|
||||
robj *aux;
|
||||
|
||||
int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
|
||||
|
@ -99,6 +99,7 @@ client *createClient(int fd, int iel) {
|
||||
* in the context of a client. When commands are executed in other
|
||||
* contexts (for instance a Lua script) we need a non connected client. */
|
||||
if (fd != -1) {
|
||||
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
|
||||
anetNonBlock(NULL,fd);
|
||||
anetEnableTcpNoDelay(NULL,fd);
|
||||
if (cserver.tcpkeepalive)
|
||||
|
@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
int getUnsignedLongLongFromObject(robj *o, uint64_t *target) {
|
||||
uint64_t value;
|
||||
|
||||
if (o == NULL) {
|
||||
value = 0;
|
||||
} else {
|
||||
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
|
||||
if (sdsEncodedObject(o)) {
|
||||
char *pchEnd = nullptr;
|
||||
errno = 0;
|
||||
value = strtoull(szFromObj(o), &pchEnd, 10);
|
||||
if (value == 0) {
|
||||
// potential error
|
||||
if (errno != 0)
|
||||
return C_ERR;
|
||||
if (pchEnd == szFromObj(o))
|
||||
return C_ERR;
|
||||
}
|
||||
} else if (o->encoding == OBJ_ENCODING_INT) {
|
||||
value = (long)ptrFromObj(o);
|
||||
} else {
|
||||
serverPanic("Unknown string encoding");
|
||||
}
|
||||
}
|
||||
if (target) *target = value;
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
|
||||
long long value;
|
||||
if (getLongLongFromObject(o, &value) != C_OK) {
|
||||
|
@ -2176,12 +2176,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
||||
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
/* Read value */
|
||||
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
|
||||
bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold;
|
||||
/* Check if the key already expired. This function is used when loading
|
||||
* an RDB file from disk, either at startup, or when an RDB was
|
||||
* received from the master. In the latter case, the master is
|
||||
* responsible for key expiry. If we would expire keys here, the
|
||||
* snapshot taken by the master may not be reflected on the replica. */
|
||||
if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
|
||||
bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now;
|
||||
if (fStaleMvccKey || fExpiredKey) {
|
||||
if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
|
||||
// We have a key that we've already deleted and is not back in our database.
|
||||
// We'll need to inform the sending master of the delete if it is also a replica of us
|
||||
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key);
|
||||
}
|
||||
decrRefCount(key);
|
||||
key = nullptr;
|
||||
decrRefCount(val);
|
||||
|
@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
||||
void replicationSendAck(redisMaster *mi);
|
||||
void putSlaveOnline(client *replica);
|
||||
int cancelReplicationHandshake(redisMaster *mi);
|
||||
static void propagateMasterStaleKeys();
|
||||
|
||||
/* --------------------------- Utility functions ---------------------------- */
|
||||
|
||||
@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters()
|
||||
return false;
|
||||
}
|
||||
|
||||
client *replicaFromMaster(redisMaster *mi)
|
||||
{
|
||||
if (mi->master == nullptr)
|
||||
return nullptr;
|
||||
|
||||
listIter liReplica;
|
||||
listNode *lnReplica;
|
||||
listRewind(g_pserver->slaves, &liReplica);
|
||||
while ((lnReplica = listNext(&liReplica)) != nullptr)
|
||||
{
|
||||
client *replica = (client*)listNodeValue(lnReplica);
|
||||
if (FSameHost(mi->master, replica))
|
||||
return replica;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/* ---------------------------------- MASTER -------------------------------- */
|
||||
|
||||
void createReplicationBacklog(void) {
|
||||
@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
char uuid[40] = {'\0'};
|
||||
uuid_unparse(cserver.uuid, uuid);
|
||||
char proto[1024];
|
||||
int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||
int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||
cchProto = std::min((int)sizeof(proto), cchProto);
|
||||
long long master_repl_offset_start = g_pserver->master_repl_offset;
|
||||
|
||||
char szDbNum[128];
|
||||
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid);
|
||||
int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid);
|
||||
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid);
|
||||
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||
|
||||
char szMvcc[128];
|
||||
uint64_t mvccTstamp = getMvccTstamp();
|
||||
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
||||
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
||||
cchMvcc = std::min<int>(cchMvcc, sizeof(szMvcc)); // tricky snprintf
|
||||
|
||||
/* Write the command to the replication backlog if any. */
|
||||
if (g_pserver->repl_backlog)
|
||||
@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
const char *crlf = "\r\n";
|
||||
feedReplicationBacklog(crlf, 2);
|
||||
feedReplicationBacklog(szDbNum, cchDbNum);
|
||||
feedReplicationBacklog(szMvcc, cchMvcc);
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
{
|
||||
addReplyAsync(replica,shared.crlf);
|
||||
addReplyProtoAsync(replica, szDbNum, cchDbNum);
|
||||
addReplyProtoAsync(replica, szMvcc, cchMvcc);
|
||||
}
|
||||
}
|
||||
|
||||
@ -669,6 +697,7 @@ int masterTryPartialResynchronization(client *c) {
|
||||
c->repl_ack_time = g_pserver->unixtime;
|
||||
c->repl_put_online_on_ack = 0;
|
||||
listAddNodeTail(g_pserver->slaves,c);
|
||||
|
||||
/* We can't use the connection buffers since they are used to accumulate
|
||||
* new commands at this stage. But we are sure the socket send buffer is
|
||||
* empty so this write will never fail actually. */
|
||||
@ -1002,6 +1031,8 @@ void replconfCommand(client *c) {
|
||||
c->slave_capa |= SLAVE_CAPA_EOF;
|
||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2"))
|
||||
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
||||
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
||||
/* REPLCONF ACK is used by replica to inform the master the amount
|
||||
* of replication stream that it processed so far. It is an
|
||||
@ -1071,6 +1102,14 @@ void putSlaveOnline(client *replica) {
|
||||
refreshGoodSlavesCount();
|
||||
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
|
||||
replicationGetSlaveName(replica));
|
||||
|
||||
if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica)
|
||||
{
|
||||
serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations."
|
||||
"\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica));
|
||||
serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions."
|
||||
"\n\tPlease fix your replica topology");
|
||||
}
|
||||
}
|
||||
|
||||
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
@ -1575,6 +1614,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE);
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (g_pserver->fActiveReplica)
|
||||
{
|
||||
rsi.mvccMinThreshold = mi->mvccLastSync;
|
||||
if (mi->staleKeyMap != nullptr)
|
||||
mi->staleKeyMap->clear();
|
||||
else
|
||||
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
|
||||
rsi.mi = mi;
|
||||
}
|
||||
if (rdbLoadFile(rdb_filename, &rsi) != C_OK) {
|
||||
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
||||
cancelReplicationHandshake(mi);
|
||||
@ -2093,8 +2141,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
*
|
||||
* The master will ignore capabilities it does not understand. */
|
||||
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
|
||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||
"capa","eof","capa","psync2",NULL);
|
||||
if (g_pserver->fActiveReplica)
|
||||
{
|
||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||
"capa","eof","capa","psync2","capa","activeExpire",NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||
"capa","eof","capa","psync2",NULL);
|
||||
}
|
||||
if (err) goto write_error;
|
||||
sdsfree(err);
|
||||
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
|
||||
@ -2362,6 +2418,7 @@ void freeMasterInfo(redisMaster *mi)
|
||||
{
|
||||
zfree(mi->masterauth);
|
||||
zfree(mi->masteruser);
|
||||
delete mi->staleKeyMap;
|
||||
zfree(mi);
|
||||
}
|
||||
|
||||
@ -3195,6 +3252,8 @@ void replicationCron(void) {
|
||||
}
|
||||
}
|
||||
|
||||
propagateMasterStaleKeys();
|
||||
|
||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||
refreshGoodSlavesCount();
|
||||
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
||||
@ -3341,6 +3400,17 @@ void replicaReplayCommand(client *c)
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t mvcc = 0;
|
||||
if (c->argc >= 5)
|
||||
{
|
||||
if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK)
|
||||
{
|
||||
addReplyError(c, "Invalid MVCC Timestamp");
|
||||
s_pstate->Cancel();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (FSameUuidNoNil(uuid, cserver.uuid))
|
||||
{
|
||||
addReply(c, shared.ok);
|
||||
@ -3367,6 +3437,11 @@ void replicaReplayCommand(client *c)
|
||||
{
|
||||
addReply(c, shared.ok);
|
||||
selectDb(c, cFake->db->id);
|
||||
redisMaster *mi = MasterInfoFromClient(c);
|
||||
if (mi != nullptr) // this should never be null but I'd prefer not to crash
|
||||
{
|
||||
mi->mvccLastSync = mvcc;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3401,3 +3476,43 @@ void updateMasterAuth()
|
||||
mi->masteruser = zstrdup(cserver.default_masteruser);
|
||||
}
|
||||
}
|
||||
|
||||
static void propagateMasterStaleKeys()
|
||||
{
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->masters, &li);
|
||||
robj *rgobj[2];
|
||||
|
||||
rgobj[0] = createEmbeddedStringObject("DEL", 3);
|
||||
|
||||
while ((ln = listNext(&li)) != nullptr)
|
||||
{
|
||||
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||
if (mi->staleKeyMap != nullptr)
|
||||
{
|
||||
if (mi->master != nullptr)
|
||||
{
|
||||
for (auto &pair : *mi->staleKeyMap)
|
||||
{
|
||||
if (pair.second.empty())
|
||||
continue;
|
||||
|
||||
client *replica = replicaFromMaster(mi);
|
||||
if (replica == nullptr)
|
||||
continue;
|
||||
|
||||
for (auto &spkey : pair.second)
|
||||
{
|
||||
rgobj[1] = spkey.get();
|
||||
replicationFeedSlave(replica, pair.first, rgobj, 2, false);
|
||||
}
|
||||
}
|
||||
delete mi->staleKeyMap;
|
||||
mi->staleKeyMap = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
decrRefCount(rgobj[0]);
|
||||
}
|
@ -1718,9 +1718,9 @@ void clientsCron(int iel) {
|
||||
void databasesCron(void) {
|
||||
/* Expire keys by random sampling. Not required for slaves
|
||||
* as master will synthesize DELs for us. */
|
||||
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) {
|
||||
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) {
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||
} else if (listLength(g_pserver->masters)) {
|
||||
} else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) {
|
||||
expireSlaveKeys();
|
||||
}
|
||||
|
||||
@ -2133,7 +2133,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
|
||||
/* Run a fast expire cycle (the called function will return
|
||||
* ASAP if a fast cycle is not needed). */
|
||||
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0)
|
||||
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica))
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||
|
||||
/* Send all the slaves an ACK request if at least one client blocked
|
||||
@ -2347,6 +2347,7 @@ void initMasterInfo(redisMaster *master)
|
||||
|
||||
master->repl_state = REPL_STATE_NONE;
|
||||
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||
master->mvccLastSync = 0;
|
||||
}
|
||||
|
||||
void initServerConfig(void) {
|
||||
|
92
src/server.h
92
src/server.h
@ -56,6 +56,7 @@
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#include <lua.h>
|
||||
@ -153,6 +154,87 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void decrRefCount(robj_roptr o);
|
||||
void incrRefCount(robj_roptr o);
|
||||
class robj_sharedptr
|
||||
{
|
||||
redisObject *m_ptr;
|
||||
|
||||
public:
|
||||
robj_sharedptr()
|
||||
: m_ptr(nullptr)
|
||||
{}
|
||||
robj_sharedptr(redisObject *ptr)
|
||||
: m_ptr(ptr)
|
||||
{
|
||||
incrRefCount(ptr);
|
||||
}
|
||||
~robj_sharedptr()
|
||||
{
|
||||
if (m_ptr)
|
||||
decrRefCount(m_ptr);
|
||||
}
|
||||
robj_sharedptr(const robj_sharedptr& other)
|
||||
{
|
||||
m_ptr = other.m_ptr;
|
||||
incrRefCount(m_ptr);
|
||||
}
|
||||
|
||||
robj_sharedptr(robj_sharedptr&& other)
|
||||
{
|
||||
m_ptr = other.m_ptr;
|
||||
other.m_ptr = nullptr;
|
||||
}
|
||||
|
||||
robj_sharedptr &operator=(const robj_sharedptr& other)
|
||||
{
|
||||
if (m_ptr)
|
||||
decrRefCount(m_ptr);
|
||||
m_ptr = other.m_ptr;
|
||||
incrRefCount(m_ptr);
|
||||
return *this;
|
||||
}
|
||||
robj_sharedptr &operator=(redisObject *ptr)
|
||||
{
|
||||
if (m_ptr)
|
||||
decrRefCount(m_ptr);
|
||||
m_ptr = ptr;
|
||||
incrRefCount(m_ptr);
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool operator==(const robj_sharedptr &other) const
|
||||
{
|
||||
return m_ptr == other.m_ptr;
|
||||
}
|
||||
|
||||
bool operator!=(const robj_sharedptr &other) const
|
||||
{
|
||||
return m_ptr != other.m_ptr;
|
||||
}
|
||||
|
||||
redisObject* operator->() const
|
||||
{
|
||||
return m_ptr;
|
||||
}
|
||||
|
||||
bool operator!() const
|
||||
{
|
||||
return !m_ptr;
|
||||
}
|
||||
|
||||
operator bool() const{
|
||||
return !!m_ptr;
|
||||
}
|
||||
|
||||
operator redisObject *()
|
||||
{
|
||||
return (redisObject*)m_ptr;
|
||||
}
|
||||
|
||||
redisObject *get() { return m_ptr; }
|
||||
};
|
||||
|
||||
/* Error codes */
|
||||
#define C_OK 0
|
||||
#define C_ERR -1
|
||||
@ -443,6 +525,7 @@ public:
|
||||
#define SLAVE_CAPA_NONE 0
|
||||
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
||||
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
||||
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
|
||||
|
||||
/* Synchronous read timeout - replica side */
|
||||
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
||||
@ -1624,9 +1707,12 @@ typedef struct rdbSaveInfo {
|
||||
|
||||
/* Used In Save */
|
||||
long long master_repl_offset;
|
||||
|
||||
uint64_t mvccMinThreshold;
|
||||
struct redisMaster *mi;
|
||||
} rdbSaveInfo;
|
||||
|
||||
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE}
|
||||
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, 0, nullptr}
|
||||
|
||||
struct malloc_stats {
|
||||
size_t zmalloc_used;
|
||||
@ -1700,6 +1786,9 @@ struct redisMaster {
|
||||
|
||||
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
||||
/* After we've connected with our master use the UUID in g_pserver->master */
|
||||
uint64_t mvccLastSync;
|
||||
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
|
||||
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
|
||||
};
|
||||
|
||||
// Const vars are not changed after worker threads are launched
|
||||
@ -2401,6 +2490,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch
|
||||
int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg);
|
||||
int getDoubleFromObject(const robj *o, double *target);
|
||||
int getLongLongFromObject(robj *o, long long *target);
|
||||
int getUnsignedLongLongFromObject(robj *o, uint64_t *target);
|
||||
int getLongDoubleFromObject(robj *o, long double *target);
|
||||
int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg);
|
||||
const char *strEncoding(int encoding);
|
||||
|
@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set master_pid [s process_id]
|
||||
|
||||
# Use a short replication timeout on the slave, so that if there
|
||||
# are no bugs the timeout is triggered in a reasonable amount
|
||||
@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||
assert_equal {0} [$slave del testkey1]
|
||||
}
|
||||
|
||||
test {Active replica expire propogates when source is down} {
|
||||
$slave flushall
|
||||
$slave set testkey2 foo
|
||||
$slave set testkey1 foo
|
||||
wait_for_condition 50 1000 {
|
||||
[string match *foo* [$master get testkey1]]
|
||||
} else {
|
||||
fail "Replication failed to propogate"
|
||||
}
|
||||
$slave expire testkey1 2
|
||||
assert_equal {1} [$slave wait 1 500] { "value should propogate
|
||||
within 0.5 seconds" }
|
||||
exec kill -SIGSTOP $slave_pid
|
||||
after 3000
|
||||
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
|
||||
# about what is actually in the dict. The only way to know is with a count from info
|
||||
assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"}
|
||||
}
|
||||
exec kill -SIGCONT $slave_pid
|
||||
|
||||
test {Active replica different databases} {
|
||||
$master select 3
|
||||
$master set testkey abcd
|
||||
|
Loading…
x
Reference in New Issue
Block a user