Former-commit-id: 3028a890ef11cd99b2c7538de0f480d2466eb150
This commit is contained in:
John Sully 2019-11-21 20:05:16 -05:00
parent 88dcedc6e9
commit 3668715ba3
5 changed files with 224 additions and 4 deletions

View File

@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) {
return C_OK;
}
int getUnsignedLongLongFromObject(robj *o, uint64_t *target) {
uint64_t value;
if (o == NULL) {
value = 0;
} else {
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
if (sdsEncodedObject(o)) {
char *pchEnd = nullptr;
errno = 0;
value = strtoull(szFromObj(o), &pchEnd, 10);
if (value == 0) {
// potential error
if (errno != 0)
return C_ERR;
if (pchEnd == szFromObj(o))
return C_ERR;
}
} else if (o->encoding == OBJ_ENCODING_INT) {
value = (long)ptrFromObj(o);
} else {
serverPanic("Unknown string encoding");
}
}
if (target) *target = value;
return C_OK;
}
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
long long value;
if (getLongLongFromObject(o, &value) != C_OK) {

View File

@ -2115,12 +2115,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the replica. */
if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key);
}
decrRefCount(key);
key = nullptr;
decrRefCount(val);

View File

@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
void replicationSendAck(redisMaster *mi);
void putSlaveOnline(client *replica);
int cancelReplicationHandshake(redisMaster *mi);
static void propagateMasterStaleKeys();
/* --------------------------- Utility functions ---------------------------- */
@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters()
return false;
}
client *replicaFromMaster(redisMaster *mi)
{
if (mi->master == nullptr)
return nullptr;
listIter liReplica;
listNode *lnReplica;
listRewind(g_pserver->slaves, &liReplica);
while ((lnReplica = listNext(&liReplica)) != nullptr)
{
client *replica = (client*)listNodeValue(lnReplica);
if (FSameHost(mi->master, replica))
return replica;
}
return nullptr;
}
/* ---------------------------------- MASTER -------------------------------- */
void createReplicationBacklog(void) {
@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
char uuid[40] = {'\0'};
uuid_unparse(cserver.uuid, uuid);
char proto[1024];
int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
cchProto = std::min((int)sizeof(proto), cchProto);
long long master_repl_offset_start = g_pserver->master_repl_offset;
char szDbNum[128];
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid);
int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid);
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid);
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
char szMvcc[128];
uint64_t mvccTstamp = getMvccTstamp();
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
cchMvcc = std::min<int>(cchMvcc, sizeof(szMvcc)); // tricky snprintf
/* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog)
@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
const char *crlf = "\r\n";
feedReplicationBacklog(crlf, 2);
feedReplicationBacklog(szDbNum, cchDbNum);
feedReplicationBacklog(szMvcc, cchMvcc);
}
}
@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
{
addReplyAsync(replica,shared.crlf);
addReplyProtoAsync(replica, szDbNum, cchDbNum);
addReplyProtoAsync(replica, szMvcc, cchMvcc);
}
}
@ -1587,6 +1615,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (g_pserver->fActiveReplica)
{
rsi.mvccMinThreshold = mi->mvccLastSync;
if (mi->staleKeyMap != nullptr)
mi->staleKeyMap->clear();
else
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
rsi.mi = mi;
}
if (rdbLoadFile(rdb_filename, &rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake(mi);
@ -2382,6 +2419,7 @@ void freeMasterInfo(redisMaster *mi)
{
zfree(mi->masterauth);
zfree(mi->masteruser);
delete mi->staleKeyMap;
zfree(mi);
}
@ -3215,6 +3253,8 @@ void replicationCron(void) {
}
}
propagateMasterStaleKeys();
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
@ -3361,6 +3401,17 @@ void replicaReplayCommand(client *c)
}
}
uint64_t mvcc = 0;
if (c->argc >= 5)
{
if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK)
{
addReplyError(c, "Invalid MVCC Timestamp");
s_pstate->Cancel();
return;
}
}
if (FSameUuidNoNil(uuid, cserver.uuid))
{
addReply(c, shared.ok);
@ -3387,6 +3438,11 @@ void replicaReplayCommand(client *c)
{
addReply(c, shared.ok);
selectDb(c, cFake->db->id);
redisMaster *mi = MasterInfoFromClient(c);
if (mi != nullptr) // this should never be null but I'd prefer not to crash
{
mi->mvccLastSync = mvcc;
}
}
else
{
@ -3421,3 +3477,43 @@ void updateMasterAuth()
mi->masteruser = zstrdup(cserver.default_masteruser);
}
}
static void propagateMasterStaleKeys()
{
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
robj *rgobj[2];
rgobj[0] = createEmbeddedStringObject("DEL", 3);
while ((ln = listNext(&li)) != nullptr)
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (mi->staleKeyMap != nullptr)
{
if (mi->master != nullptr)
{
for (auto &pair : *mi->staleKeyMap)
{
if (pair.second.empty())
continue;
client *replica = replicaFromMaster(mi);
if (replica == nullptr)
continue;
for (auto &spkey : pair.second)
{
rgobj[1] = spkey.get();
replicationFeedSlave(replica, pair.first, rgobj, 2, false);
}
}
delete mi->staleKeyMap;
mi->staleKeyMap = nullptr;
}
}
}
decrRefCount(rgobj[0]);
}

View File

@ -2319,6 +2319,7 @@ void initMasterInfo(redisMaster *master)
master->repl_state = REPL_STATE_NONE;
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
master->mvccLastSync = 0;
}
void initServerConfig(void) {

View File

@ -54,6 +54,7 @@
#include <vector>
#include <algorithm>
#include <memory>
#include <map>
#ifdef __cplusplus
extern "C" {
#include <lua.h>
@ -144,6 +145,87 @@ public:
}
};
void decrRefCount(robj_roptr o);
void incrRefCount(robj_roptr o);
class robj_sharedptr
{
redisObject *m_ptr;
public:
robj_sharedptr()
: m_ptr(nullptr)
{}
robj_sharedptr(redisObject *ptr)
: m_ptr(ptr)
{
incrRefCount(ptr);
}
~robj_sharedptr()
{
if (m_ptr)
decrRefCount(m_ptr);
}
robj_sharedptr(const robj_sharedptr& other)
{
m_ptr = other.m_ptr;
incrRefCount(m_ptr);
}
robj_sharedptr(robj_sharedptr&& other)
{
m_ptr = other.m_ptr;
other.m_ptr = nullptr;
}
robj_sharedptr &operator=(const robj_sharedptr& other)
{
if (m_ptr)
decrRefCount(m_ptr);
m_ptr = other.m_ptr;
incrRefCount(m_ptr);
return *this;
}
robj_sharedptr &operator=(redisObject *ptr)
{
if (m_ptr)
decrRefCount(m_ptr);
m_ptr = ptr;
incrRefCount(m_ptr);
return *this;
}
bool operator==(const robj_sharedptr &other) const
{
return m_ptr == other.m_ptr;
}
bool operator!=(const robj_sharedptr &other) const
{
return m_ptr != other.m_ptr;
}
redisObject* operator->() const
{
return m_ptr;
}
bool operator!() const
{
return !m_ptr;
}
operator bool() const{
return !!m_ptr;
}
operator redisObject *()
{
return (redisObject*)m_ptr;
}
redisObject *get() { return m_ptr; }
};
/* Error codes */
#define C_OK 0
#define C_ERR -1
@ -1391,9 +1473,11 @@ typedef struct rdbSaveInfo {
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */
int fForceSetKey;
uint64_t mvccMinThreshold;
struct redisMaster *mi;
} rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE}
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, nullptr}
struct malloc_stats {
size_t zmalloc_used;
@ -1467,6 +1551,9 @@ struct redisMaster {
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in g_pserver->master */
uint64_t mvccLastSync;
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
};
// Const vars are not changed after worker threads are launched
@ -2156,6 +2243,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch
int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg);
int getDoubleFromObject(const robj *o, double *target);
int getLongLongFromObject(robj *o, long long *target);
int getUnsignedLongLongFromObject(robj *o, uint64_t *target);
int getLongDoubleFromObject(robj *o, long double *target);
int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg);
const char *strEncoding(int encoding);