Merge branch 'unstable' into RELEASE_5

Former-commit-id: 2f65aa2337e93320204bb97122aac3db15827772
This commit is contained in:
John Sully 2019-10-16 12:47:06 -04:00
commit 960d98b092
28 changed files with 562 additions and 405 deletions

View File

@ -4,7 +4,7 @@ FROM ubuntu:18.04
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -qqy \
build-essential nasm autotools-dev autoconf libjemalloc-dev tcl tcl-dev uuid-dev \
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev \
&& apt-get clean
CMD make

View File

@ -18,7 +18,7 @@ On the same hardware KeyDB can perform twice as many queries per second as Redis
Why fork Redis?
---------------
KeyDB has a different philosophy on how the codebase should evolve. We feel that ease of use, high performance, and a "batteries included" approach is the best way to create a good user experience. While we have great respect for the Redis maintainers it is our opinion that the Redis approach focusses too much on simplicity of the code base at the expense of complexity for the user. This results in the need for external components and workarounds to solve common problems - resulting in more complexity overall.
KeyDB has a different philosophy on how the codebase should evolve. We feel that ease of use, high performance, and a "batteries included" approach is the best way to create a good user experience. While we have great respect for the Redis maintainers it is our opinion that the Redis approach focuses too much on simplicity of the code base at the expense of complexity for the user. This results in the need for external components and workarounds to solve common problems - resulting in more complexity overall.
Because of this difference of opinion features which are right for KeyDB may not be appropriate for Redis. A fork allows us to explore this new development path and implement features which may never be a part of Redis. KeyDB keeps in sync with upstream Redis changes, and where applicable we upstream bug fixes and changes. It is our hope that the two projects can continue to grow and learn from each other.

View File

@ -476,7 +476,7 @@ static int anetV6Only(char *err, int s) {
return ANET_OK;
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int fReusePort)
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int fReusePort, int fFirstListen)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
@ -498,8 +498,12 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (fReusePort && anetSetReusePort(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
if (fReusePort && !fFirstListen && anetSetReusePort(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) {
s = ANET_ERR;
goto end;
}
if (fReusePort && fFirstListen && anetSetReusePort(err,s) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
@ -515,14 +519,14 @@ end:
return s;
}
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort)
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort, int fFirstListen)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, fReusePort);
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, fReusePort, fFirstListen);
}
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort)
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort, int fFirstListen)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, fReusePort);
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, fReusePort, fFirstListen);
}
int anetUnixServer(char *err, char *path, mode_t perm, int backlog)

View File

@ -62,8 +62,8 @@ int anetUnixNonBlockConnect(char *err, char *path);
int anetRead(int fd, char *buf, int count);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort, int fFirstListen);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort, int fFirstListen);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);

View File

@ -666,7 +666,7 @@ client *createFakeClient(void) {
c->flags = 0;
c->fPendingAsyncWrite = FALSE;
c->btype = BLOCKED_NONE;
/* We set the fake client as a slave waiting for the synchronization
/* We set the fake client as a replica waiting for the synchronization
* so that Redis will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->reply = listCreate();

View File

@ -221,8 +221,8 @@ void replyToBlockedClientTimedOut(client *c) {
/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
* is called when a master turns into a slave.
* in an instance which turns from master to replica is unsafe, so this function
* is called when a master turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */

View File

@ -499,7 +499,7 @@ void clusterInit(void) {
}
if (listenToPort(g_pserver->port+CLUSTER_PORT_INCR,
g_pserver->cfd,&g_pserver->cfd_count, 0 /*fReusePort*/) == C_ERR)
g_pserver->cfd,&g_pserver->cfd_count, 0 /*fReusePort*/, 0 /*fFirstListen*/) == C_ERR)
{
exit(1);
} else {

View File

@ -96,7 +96,7 @@ configEnum aof_fsync_enum[] = {
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
{1024*1024*256, 1024*1024*64, 60}, /* slave */
{1024*1024*256, 1024*1024*64, 60}, /* replica */
{1024*1024*32, 1024*1024*8, 60} /* pubsub */
};

View File

@ -115,7 +115,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
* LOOKUP_NOTOUCH: don't alter the last access time of the key.
*
* Note: this function also returns NULL if the key is logically expired
* but still existing, in case this is a slave, since this API is called only
* but still existing, in case this is a replica, since this API is called only
* for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */
@ -133,7 +133,7 @@ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
return NULL;
}
/* However if we are in the context of a slave, expireIfNeeded() will
/* However if we are in the context of a replica, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
@ -344,7 +344,7 @@ robj *dbRandomKey(redisDb *db) {
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
* expired in the replica, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because
* dictGetRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there
@ -1227,6 +1227,39 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
return 1;
}
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,de != NULL);
robj *val = (robj*)dictGetVal(de);
if (!val->FExpires())
return 0;
auto itr = db->setexpire->find((sds)dictGetKey(de));
serverAssert(itr != db->setexpire->end());
serverAssert(itr->key() == (sds)dictGetKey(de));
if (!itr->FFat())
return 0;
int found = 0;
for (auto subitr : *itr)
{
if (subitr.subkey() == nullptr)
continue;
if (sdscmp((sds)subitr.subkey(), szFromObj(subkey)) == 0)
{
itr->erase(subitr);
found = 1;
break;
}
}
if (itr->pfatentry()->size() == 0)
removeExpireCore(db, key, de);
return found;
}
/* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
@ -1335,7 +1368,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) {
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* AOF and the master->replica link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) {
@ -1393,10 +1426,10 @@ int keyIsExpired(redisDb *db, robj *key) {
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because slave instances do not expire keys, they wait
* instance, because replica instances do not expire keys, they wait
* for DELs from the master for consistency matters. However even
* slaves will try to have a coherent return value for the function,
* so that read commands executed in the slave side will be able to
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
@ -1409,9 +1442,9 @@ int keyIsExpired(redisDb *db, robj *key) {
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave, instead of
/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,

View File

@ -374,8 +374,8 @@ size_t freeMemoryGetNotCountedMemory(void) {
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln);
overhead += getClientOutputBufferMemoryUsage(slave);
client *replica = (client*)listNodeValue(ln);
overhead += getClientOutputBufferMemoryUsage(replica);
}
}
if (g_pserver->aof_state != AOF_OFF) {

View File

@ -130,28 +130,44 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
}
}
void expireMemberCommand(client *c)
int parseUnitString(const char *sz)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
if (strcasecmp(sz, "s") == 0)
return UNIT_SECONDS;
if (strcasecmp(sz, "ms") == 0)
return UNIT_MILLISECONDS;
return -1;
}
void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, long long when, int unit)
{
switch (unit)
{
case UNIT_SECONDS:
when *= 1000;
when += mstime();
case UNIT_MILLISECONDS:
break;
/* No key, return zero. */
dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1]));
if (de == NULL) {
addReply(c,shared.czero);
default:
addReplyError(c, "Invalid unit arg");
return;
}
robj *val = (robj*)dictGetVal(de);
when += basetime;
/* No key, return zero. */
robj *val = lookupKeyWriteOrReply(c, key, shared.czero);
if (val == NULL) {
return;
}
switch (val->type)
{
case OBJ_SET:
// these types are safe
if (!setTypeIsMember(val, szFromObj(subkey))) {
addReply(c,shared.czero);
return;
}
break;
default:
@ -159,11 +175,40 @@ void expireMemberCommand(client *c)
return;
}
setExpire(c, c->db, c->argv[1], c->argv[2], when);
setExpire(c, c->db, key, subkey, when);
addReply(c, shared.ok);
addReply(c, shared.cone);
}
void expireMemberCommand(client *c)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
if (c->argc > 5) {
addReplyError(c, "Invalid number of arguments");
return;
}
int unit = UNIT_SECONDS;
if (c->argc == 5) {
unit = parseUnitString(szFromObj(c->argv[4]));
}
expireMemberCore(c, c->argv[1], c->argv[2], mstime(), when, unit);
}
void expireMemberAtCommand(client *c)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS);
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
@ -307,17 +352,17 @@ void activeExpireCycle(int type) {
*
* Normally slaves do not process expires: they wait the masters to synthesize
* DEL operations in order to retain consistency. However writable slaves are
* an exception: if a key is created in the slave and an expire is assigned
* an exception: if a key is created in the replica and an expire is assigned
* to it, we need a way to expire such a key, since the master does not know
* anything about such a key.
*
* In order to do so, we track keys created in the slave side with an expire
* In order to do so, we track keys created in the replica side with an expire
* set, and call the expireSlaveKeys() function from time to time in order to
* reclaim the keys if they already expired.
*
* Note that the use case we are trying to cover here, is a popular one where
* slaves are put in writable mode in order to compute slow operations in
* the slave side that are mostly useful to actually read data in a more
* the replica side that are mostly useful to actually read data in a more
* processed way. Think at sets intersections in a tmp key, with an expire so
* that it is also used as a cache to avoid intersecting every time.
*
@ -326,7 +371,7 @@ void activeExpireCycle(int type) {
*----------------------------------------------------------------------------*/
/* The dictionary where we remember key names and database ID of keys we may
* want to expire from the slave. Since this function is not often used we
* want to expire from the replica. Since this function is not often used we
* don't even care to initialize the database at startup. We'll do it once
* the feature is used the first time, that is, when rememberSlaveKeyWithExpire()
* is called.
@ -389,7 +434,7 @@ void expireSlaveKeys(void) {
}
/* Set the new bitmap as value of the key, in the dictionary
* of keys with an expire set directly in the writable slave. Otherwise
* of keys with an expire set directly in the writable replica. Otherwise
* if the bitmap is zero, we no longer need to keep track of it. */
if (new_dbids)
dictSetUnsignedIntegerVal(de,new_dbids);
@ -406,7 +451,7 @@ void expireSlaveKeys(void) {
}
/* Track keys that received an EXPIRE or similar command in the context
* of a writable slave. */
* of a writable replica. */
void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
if (slaveKeysWithExpire == NULL) {
static dictType dt = {
@ -448,7 +493,7 @@ size_t getSlaveKeyWithExpireCount(void) {
*
* Note: technically we should handle the case of a single DB being flushed
* but it is not worth it since anyway race conditions using the same set
* of key names in a wriatable slave and in its master will lead to
* of key names in a wriatable replica and in its master will lead to
* inconsistencies. This is just a best-effort thing we do. */
void flushSlaveKeysWithExpireList(void) {
if (slaveKeysWithExpire) {
@ -486,7 +531,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
* of a replica instance.
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
@ -544,11 +589,32 @@ void ttlGenericCommand(client *c, int output_ms) {
addReplyLongLong(c,-2);
return;
}
/* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */
expireEntry *pexpire = getExpire(c->db,c->argv[1]);
if (c->argc == 2) {
// primary expire
if (pexpire != nullptr)
pexpire->FGetPrimaryExpire(&expire);
} else if (c->argc == 3) {
// We want a subkey expire
if (pexpire && pexpire->FFat()) {
for (auto itr : *pexpire) {
if (itr.subkey() == nullptr)
continue;
if (sdscmp((sds)itr.subkey(), szFromObj(c->argv[2])) == 0) {
expire = itr.when();
break;
}
}
}
} else {
addReplyError(c, "Invalid arguments");
return;
}
if (expire != -1) {
ttl = expire-mstime();
@ -574,12 +640,23 @@ void pttlCommand(client *c) {
/* PERSIST key */
void persistCommand(client *c) {
if (lookupKeyWrite(c->db,c->argv[1])) {
if (c->argc == 2) {
if (removeExpire(c->db,c->argv[1])) {
addReply(c,shared.cone);
g_pserver->dirty++;
} else {
addReply(c,shared.czero);
}
} else if (c->argc == 3) {
if (removeSubkeyExpire(c->db, c->argv[1], c->argv[2])) {
addReply(c,shared.cone);
g_pserver->dirty++;
} else {
addReply(c,shared.czero);
}
} else {
addReplyError(c, "Invalid arguments");
}
} else {
addReply(c,shared.czero);
}

View File

@ -344,8 +344,11 @@ struct commandHelp {
0,
"1.2.0" },
{ "EXPIREMEMBER",
"key subkey seconds",
"set a subkey's time to live in seconds"},
"key subkey delay [Unit: s,ms]",
"set a subkey's time to live in seconds (or milliseconds)"},
{ "EXPIREMEMBERAT",
"key subkey timestamp",
"Set the expiration for a subkey as a UNIX timestamp"},
{ "FLUSHALL",
"[ASYNC]",
"Remove all keys from all databases",
@ -632,8 +635,8 @@ struct commandHelp {
0,
"2.2.3" },
{ "PERSIST",
"key",
"Remove the expiration from a key",
"key [subkey]",
"Remove the expiration from a key or subkey",
0,
"2.2.0" },
{ "PEXPIRE",
@ -677,8 +680,8 @@ struct commandHelp {
6,
"2.0.0" },
{ "PTTL",
"key",
"Get the time to live for a key in milliseconds",
"key [subkey]",
"Get the time to live for a key or subkey in milliseconds",
0,
"2.6.0" },
{ "PUBLISH",
@ -952,8 +955,8 @@ struct commandHelp {
0,
"3.2.1" },
{ "TTL",
"key",
"Get the time to live for a key",
"key [subkey]",
"Get the time to live for a key or subkey",
0,
"1.0.0" },
{ "TYPE",

View File

@ -1439,7 +1439,7 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
*
* * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
*
* * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
* * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a replica
*
* * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
*
@ -4289,7 +4289,7 @@ size_t RM_GetClusterSize(void) {
*
* The arguments ip, master_id, port and flags can be NULL in case we don't
* need to populate back certain info. If an ip and master_id (only populated
* if the instance is a slave) are specified, they point to buffers holding
* if the instance is a replica) are specified, they point to buffers holding
* at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
* and master_id are not null terminated.
*
@ -4300,7 +4300,7 @@ size_t RM_GetClusterSize(void) {
* * REDISMODULE_NODE_SLAVE The node is a replica
* * REDISMODULE_NODE_PFAIL We see the node as failing
* * REDISMODULE_NODE_FAIL The cluster agrees the node is failing
* * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover
* * REDISMODULE_NODE_NOFAILOVER The replica is configured to never failover
*/
clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
@ -5181,7 +5181,7 @@ void moduleInitModulesSystem(void) {
* The function aborts the server on errors, since to start with missing
* modules is not considered sane: clients may rely on the existence of
* given commands, loading AOF also may need some modules to exist, and
* if this instance is a slave, it must understand commands from master. */
* if this instance is a replica, it must understand commands from master. */
void moduleLoadFromQueue(void) {
listIter li;
listNode *ln;

View File

@ -142,7 +142,7 @@ void execCommand(client *c) {
}
/* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction
* only replica, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
@ -195,7 +195,7 @@ void execCommand(client *c) {
int is_master = listLength(g_pserver->masters) == 0;
g_pserver->dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the
* switched from master to replica (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the
* rest was not. We need to make sure to at least terminate the
* backlog with the final EXEC. */

View File

@ -195,7 +195,7 @@ client *createClient(int fd, int iel) {
* buffers can hold, then we'll really install the handler. */
void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* if not already done and, for slaves, if the replica can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
@ -239,7 +239,7 @@ void clientInstallAsyncWriteHandler(client *c) {
*
* 1) The event handler should already be installed since the output buffer
* already contains something.
* 2) The client is a slave but not yet online, so we want to just accumulate
* 2) The client is a replica but not yet online, so we want to just accumulate
* writes in the buffer but not actually sending them yet.
*
* Typically gets called every time a reply is built, before adding more
@ -442,7 +442,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
addReplyProtoCore(c,s,len,fAsync);
addReplyProtoCore(c,"\r\n",2,fAsync);
/* Sometimes it could be normal that a slave replies to a master with
/* Sometimes it could be normal that a replica replies to a master with
* an error and this function gets called. Actually the error will never
* be sent because addReply*() against master clients has no effect...
* A notable example is:
@ -1214,7 +1214,7 @@ void unlinkClient(client *c) {
/* In the case of diskless replication the fork is writing to the
* sockets and just closing the fd isn't enough, if we don't also
* shutdown the socket the fork will continue to write to the slave
* shutdown the socket the fork will continue to write to the replica
* and the salve will only find out that it was disconnected when
* it will finish reading the rdb. */
if ((c->flags & CLIENT_SLAVE) &&
@ -1299,7 +1299,7 @@ bool freeClient(client *c) {
}
}
/* Log link disconnection with slave */
/* Log link disconnection with replica */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c));
@ -1333,8 +1333,8 @@ bool freeClient(client *c) {
* places where active clients may be referenced. */
unlinkClient(c);
/* Master/slave cleanup Case 1:
* we lost the connection with a slave. */
/* Master/replica cleanup Case 1:
* we lost the connection with a replica. */
if (c->flags & CLIENT_SLAVE) {
if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
@ -1352,7 +1352,7 @@ bool freeClient(client *c) {
refreshGoodSlavesCount();
}
/* Master/slave cleanup Case 2:
/* Master/replica cleanup Case 2:
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(MasterInfoFromClient(c));
@ -1494,7 +1494,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a slave (otherwise, on high-speed traffic, the replication
* a replica (otherwise, on high-speed traffic, the replication
* buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
@ -1805,7 +1805,7 @@ int processInlineBuffer(client *c) {
}
/* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* This is useful for a replica to ping back while loading a big
* RDB file. */
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = g_pserver->unixtime;
@ -2038,8 +2038,8 @@ int processCommandAndResetClient(client *c, int flags) {
}
if (serverTL->current_client == NULL) deadclient = 1;
serverTL->current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
/* freeMemoryIfNeeded may flush replica output buffers. This may
* result into a replica, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
@ -2060,7 +2060,7 @@ void processInputBuffer(client *c, int callFlags) {
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* condition on the replica. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break;
@ -2910,10 +2910,10 @@ void flushSlavesOutputBuffers(void) {
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln);
client *replica = (client*)listNodeValue(ln);
int events;
if (!FCorrectThread(slave))
if (!FCorrectThread(replica))
continue; // we cannot synchronously flush other thread's clients
/* Note that the following will not flush output buffers of slaves
@ -2922,12 +2922,12 @@ void flushSlavesOutputBuffers(void) {
* of put_online_on_ack is to postpone the moment it is installed.
* This is what we want since slaves in this state should not receive
* writes before the first ACK. */
events = aeGetFileEvents(g_pserver->rgthreadvar[slave->iel].el,slave->fd);
events = aeGetFileEvents(g_pserver->rgthreadvar[replica->iel].el,replica->fd);
if (events & AE_WRITABLE &&
slave->replstate == SLAVE_STATE_ONLINE &&
clientHasPendingReplies(slave))
replica->replstate == SLAVE_STATE_ONLINE &&
clientHasPendingReplies(replica))
{
writeToClient(slave->fd,slave,0);
writeToClient(replica->fd,replica,0);
}
}
}

View File

@ -1302,6 +1302,10 @@ NULL
* because we update the access time only
* when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o));
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000);
} else {
addReplySubcommandSyntaxError(c);
}

View File

@ -2119,7 +2119,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
* snapshot taken by the master may not be reflected on the replica. */
if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
decrRefCount(key);
key = nullptr;
@ -2273,7 +2273,7 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
g_pserver->rdb_child_type = RDB_CHILD_TYPE_NONE;
g_pserver->rdb_save_time_start = -1;
/* If the child returns an OK exit code, read the set of slave client
/* If the child returns an OK exit code, read the set of replica client
* IDs and the associated status code. We'll terminate all the slaves
* in error state.
*
@ -2312,35 +2312,35 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
uint64_t j;
int errorcode = 0;
/* Search for the slave ID in the reply. In order for a slave to
/* Search for the replica ID in the reply. In order for a replica to
* continue the replication process, we need to find it in the list,
* and it must have an error code set to 0 (which means success). */
for (j = 0; j < ok_slaves[0]; j++) {
if (slave->id == ok_slaves[2*j+1]) {
if (replica->id == ok_slaves[2*j+1]) {
errorcode = ok_slaves[2*j+2];
break; /* Found in slaves list. */
}
}
if (j == ok_slaves[0] || errorcode != 0) {
serverLog(LL_WARNING,
"Closing slave %s: child->slave RDB transfer failed: %s",
replicationGetSlaveName(slave),
"Closing replica %s: child->replica RDB transfer failed: %s",
replicationGetSlaveName(replica),
(errorcode == 0) ? "RDB transfer child aborted"
: strerror(errorcode));
freeClient(slave);
freeClient(replica);
} else {
serverLog(LL_WARNING,
"Slave %s correctly received the streamed RDB file.",
replicationGetSlaveName(slave));
replicationGetSlaveName(replica));
/* Restore the socket as non-blocking. */
anetNonBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,0);
anetNonBlock(NULL,replica->fd);
anetSendTimeout(NULL,replica->fd,0);
}
}
}
@ -2407,17 +2407,17 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
client *replica = (client*)ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numfds] = slave->id;
fds[numfds++] = slave->fd;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numfds] = replica->id;
fds[numfds++] = replica->fd;
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
anetBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,g_pserver->repl_timeout*1000);
anetBlock(NULL,replica->fd);
anetSendTimeout(NULL,replica->fd,g_pserver->repl_timeout*1000);
}
}
@ -2451,19 +2451,19 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
/* If we are returning OK, at least one slave was served
/* If we are returning OK, at least one replica was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <slave[0].id> <slave[0].error> ...
* <len> <replica[0].id> <replica[0].error> ...
*
* len, slave IDs, and slave errors, are all uint64_t integers,
* len, replica IDs, and replica errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the slave's client ID, so that the master
* can match the report with a specific slave, and 'error' is
* The 'id' represents the replica's client ID, so that the master
* can match the report with a specific replica, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds), MALLOC_LOCAL);
@ -2504,12 +2504,12 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)ln->value;
client *replica = (client*)ln->value;
int j;
for (j = 0; j < numfds; j++) {
if (slave->id == clientids[j]) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (replica->id == clientids[j]) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break;
}
}
@ -2603,17 +2603,17 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
/* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this
* scenario the replication info is useless, because when a slave
* scenario the replication info is useless, because when a replica
* connects to us, the NULL repl_backlog will trigger a full
* synchronization, at the same time we will use a new replid and clear
* replid2. */
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
/* Note that when g_pserver->slaveseldb is -1, it means that this master
/* Note that when g_pserver->replicaseldb is -1, it means that this master
* didn't apply any write commands after a full synchronization.
* So we can let repl_stream_db be 0, this allows a restarted slave
* So we can let repl_stream_db be 0, this allows a restarted replica
* to reload replication ID/offset, it's safe because the next write
* command must generate a SELECT statement. */
rsi->repl_stream_db = g_pserver->slaveseldb == -1 ? 0 : g_pserver->slaveseldb;
rsi->repl_stream_db = g_pserver->replicaseldb == -1 ? 0 : g_pserver->replicaseldb;
return rsi;
}
@ -2624,7 +2624,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
/* If the instance is a slave we need a connected master
/* If the instance is a replica we need a connected master
* in order to fetch the currently selected DB. */
if (miFirst && miFirst->master) {
rsi->repl_stream_db = miFirst->master->db->id;
@ -2632,7 +2632,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
}
/* If we have a cached master we can use it in order to populate the
* replication selected DB info inside the RDB file: the slave can
* replication selected DB info inside the RDB file: the replica can
* increment the master_repl_offset only from data arriving from the
* master, so if we are disconnected the offset in the cached master
* is valid. */

View File

@ -152,7 +152,7 @@ typedef struct clusterNode {
int port;
sds name;
int flags;
sds replicate; /* Master ID if node is a slave */
sds replicate; /* Master ID if node is a replica */
int *slots;
int slots_count;
int current_slot_index;

View File

@ -124,8 +124,8 @@ extern "C" void freeClusterManager(void) {
*
* The score is calculated as follows:
*
* SAME_AS_MASTER = 10000 * each slave in the same IP of its master.
* SAME_AS_SLAVE = 1 * each slave having the same IP as another slave
* SAME_AS_MASTER = 10000 * each replica in the same IP of its master.
* SAME_AS_SLAVE = 1 * each replica having the same IP as another replica
of the same master.
* FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
*

View File

@ -187,7 +187,7 @@ typedef struct clusterManagerNode {
time_t ping_recv;
int flags;
list *flags_str; /* Flags string representations */
sds replicate; /* Master ID if node is a slave */
sds replicate; /* Master ID if node is a replica */
int dirty; /* Node has changes that can be flushed */
uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count;

View File

@ -71,9 +71,9 @@ extern "C" {
#define REDISMODULE_CTX_FLAGS_MULTI (1<<1)
/* The instance is a master */
#define REDISMODULE_CTX_FLAGS_MASTER (1<<2)
/* The instance is a slave */
/* The instance is a replica */
#define REDISMODULE_CTX_FLAGS_SLAVE (1<<3)
/* The instance is read-only (usually meaning it's a slave as well) */
/* The instance is read-only (usually meaning it's a replica as well) */
#define REDISMODULE_CTX_FLAGS_READONLY (1<<4)
/* The instance is running in cluster mode */
#define REDISMODULE_CTX_FLAGS_CLUSTER (1<<5)

File diff suppressed because it is too large Load Diff

View File

@ -1479,7 +1479,7 @@ void evalGenericCommand(client *c, int evalsha) {
* To do so we use a cache of SHA1s of scripts that we already propagated
* as full EVAL, that's called the Replication Script Cache.
*
* For repliation, everytime a new slave attaches to the master, we need to
* For repliation, everytime a new replica attaches to the master, we need to
* flush our cache of scripts that can be replicated as EVALSHA, while
* for AOF we need to do so every time we rewrite the AOF file. */
if (evalsha && !g_pserver->lua_replicate_commands) {

View File

@ -154,7 +154,7 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */
*
* ok-loading: Allow the command while loading the database.
*
* ok-stale: Allow the command while a slave has stale data but is not
* ok-stale: Allow the command while a replica has stale data but is not
* allowed to serve this data. Normally no command is accepted
* in this condition but just a few.
*
@ -623,7 +623,11 @@ struct redisCommand redisCommandTable[] = {
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"expiremember", expireMemberCommand, 4,
{"expiremember", expireMemberCommand, -4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"expirememberat", expireMemberAtCommand, 4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
@ -730,7 +734,7 @@ struct redisCommand redisCommandTable[] = {
"admin no-script",
0,NULL,0,0,0,0,0,0},
{"ttl",ttlCommand,2,
{"ttl",ttlCommand,-2,
"read-only fast random @keyspace",
0,NULL,1,1,1,0,0,0},
@ -738,11 +742,11 @@ struct redisCommand redisCommandTable[] = {
"read-only fast @keyspace",
0,NULL,1,-1,1,0,0,0},
{"pttl",pttlCommand,2,
{"pttl",pttlCommand,-2,
"read-only fast random @keyspace",
0,NULL,1,1,1,0,0,0},
{"persist",persistCommand,2,
{"persist",persistCommand,-2,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
@ -2106,7 +2110,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(g_pserver->slaves, g_pserver->slaveseldb, argv, 3);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
@ -2511,7 +2515,7 @@ void initServerConfig(void) {
/* By default we want scripts to be always replicated by effects
* (single commands executed by the script), and not by sending the
* script to the slave / AOF. This is the new way starting from
* script to the replica / AOF. This is the new way starting from
* Redis 5. However it is possible to revert it via redis.conf. */
g_pserver->lua_always_replicate_commands = 1;
@ -2704,7 +2708,7 @@ void checkTcpBacklogSettings(void) {
* impossible to bind, or no bind addresses were specified in the server
* configuration but the function is not able to bind * for at least
* one of the IPv4 or IPv6 protocols. */
int listenToPort(int port, int *fds, int *count, int fReusePort) {
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen) {
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
@ -2716,7 +2720,7 @@ int listenToPort(int port, int *fds, int *count, int fReusePort) {
/* Bind * for both IPv6 and IPv4, we enter here only if
* g_pserver->bindaddr_count == 0. */
fds[*count] = anetTcp6Server(serverTL->neterr,port,NULL,
g_pserver->tcp_backlog, fReusePort);
g_pserver->tcp_backlog, fReusePort, fFirstListen);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
@ -2728,7 +2732,7 @@ int listenToPort(int port, int *fds, int *count, int fReusePort) {
if (*count == 1 || unsupported) {
/* Bind the IPv4 address as well. */
fds[*count] = anetTcpServer(serverTL->neterr,port,NULL,
g_pserver->tcp_backlog, fReusePort);
g_pserver->tcp_backlog, fReusePort, fFirstListen);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
@ -2744,11 +2748,11 @@ int listenToPort(int port, int *fds, int *count, int fReusePort) {
} else if (strchr(g_pserver->bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(serverTL->neterr,port,g_pserver->bindaddr[j],
g_pserver->tcp_backlog, fReusePort);
g_pserver->tcp_backlog, fReusePort, fFirstListen);
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(serverTL->neterr,port,g_pserver->bindaddr[j],
g_pserver->tcp_backlog, fReusePort);
g_pserver->tcp_backlog, fReusePort, fFirstListen);
}
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
@ -2810,7 +2814,7 @@ static void initNetworkingThread(int iel, int fReusePort)
if (fReusePort || (iel == IDX_EVENT_LOOP_MAIN))
{
if (g_pserver->port != 0 &&
listenToPort(g_pserver->port,g_pserver->rgthreadvar[iel].ipfd,&g_pserver->rgthreadvar[iel].ipfd_count, fReusePort) == C_ERR)
listenToPort(g_pserver->port,g_pserver->rgthreadvar[iel].ipfd,&g_pserver->rgthreadvar[iel].ipfd_count, fReusePort, (iel == IDX_EVENT_LOOP_MAIN)) == C_ERR)
exit(1);
}
else
@ -2961,7 +2965,7 @@ void initServer(void) {
cserver.pid = getpid();
g_pserver->clients_index = raxNew();
g_pserver->clients_to_close = listCreate();
g_pserver->slaveseldb = -1; /* Force to emit the first SELECT command. */
g_pserver->replicaseldb = -1; /* Force to emit the first SELECT command. */
g_pserver->ready_keys = listCreate();
g_pserver->clients_waiting_acks = listCreate();
g_pserver->get_ack_from_slaves = 0;
@ -3588,8 +3592,8 @@ int processCommand(client *c, int callFlags) {
* propagation of DELs due to eviction. */
if (g_pserver->maxmemory && !g_pserver->lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
/* freeMemoryIfNeeded may flush replica output buffers. This may result
* into a replica, that may be the active client, to be freed. */
if (serverTL->current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client
@ -3636,7 +3640,7 @@ int processCommand(client *c, int callFlags) {
return C_OK;
}
/* Don't accept write commands if this is a read only slave. But
/* Don't accept write commands if this is a read only replica. But
* accept write commands if this is our master. */
if (listLength(g_pserver->masters) && g_pserver->repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
@ -3659,7 +3663,7 @@ int processCommand(client *c, int callFlags) {
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* when replica-serve-stale-data is no and we are a replica with a broken
* link with master. */
if (FBrokenLinkToMaster() &&
g_pserver->repl_serve_stale_data == 0 &&
@ -3792,7 +3796,7 @@ int prepareForShutdown(int flags) {
unlink(cserver.pidfile);
}
/* Best effort flush of slave output buffers, so that we hopefully
/* Best effort flush of replica output buffers, so that we hopefully
* send them pending writes. */
flushSlavesOutputBuffers();
@ -4470,18 +4474,18 @@ sds genRedisInfoString(const char *section) {
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *slave = (client*)listNodeValue(ln);
client *replica = (client*)listNodeValue(ln);
const char *state = NULL;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
char ip[NET_IP_STR_LEN], *slaveip = replica->slave_ip;
int port;
long lag = 0;
if (slaveip[0] == '\0') {
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
if (anetPeerToString(replica->fd,ip,sizeof(ip),&port) == -1)
continue;
slaveip = ip;
}
switch(slave->replstate) {
switch(replica->replstate) {
case SLAVE_STATE_WAIT_BGSAVE_START:
case SLAVE_STATE_WAIT_BGSAVE_END:
state = "wait_bgsave";
@ -4494,14 +4498,14 @@ sds genRedisInfoString(const char *section) {
break;
}
if (state == NULL) continue;
if (slave->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - slave->repl_ack_time;
if (replica->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - replica->repl_ack_time;
info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld\r\n",
slaveid,slaveip,slave->slave_listening_port,state,
(slave->repl_ack_off + slave->reploff_skipped), lag);
slaveid,slaveip,replica->slave_listening_port,state,
(replica->repl_ack_off + replica->reploff_skipped), lag);
slaveid++;
}
}
@ -4609,7 +4613,7 @@ void infoCommand(client *c) {
}
void monitorCommand(client *c) {
/* ignore MONITOR if already slave or in monitor mode */
/* ignore MONITOR if already replica or in monitor mode */
serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_SLAVE) return;
@ -4836,7 +4840,7 @@ void loadDataFromDisk(void) {
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
/* If we are a slave, create a cached master from this
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
@ -4984,7 +4988,7 @@ void incrementMvccTstamp()
{
uint64_t msPrev;
__atomic_load(&g_pserver->mvcc_tstamp, &msPrev, __ATOMIC_ACQUIRE);
msPrev >>= 20; // convert to milliseconds
msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds
long long mst;
__atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED);
@ -4994,7 +4998,7 @@ void incrementMvccTstamp()
}
else
{
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << 20);
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT);
}
}

View File

@ -331,7 +331,7 @@ public:
/* Client flags */
#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
#define CLIENT_MASTER (1<<1) /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MONITOR (1<<2) /* This client is a replica monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
@ -395,7 +395,7 @@ public:
#define CLIENT_TYPE_MASTER 3 /* Master. */
#define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output
buffer configuration. Just the first
three: normal, slave, pubsub. */
three: normal, replica, pubsub. */
/* Slave replication state. Used in g_pserver->repl_state for slaves to remember
* what to do next. */
@ -421,12 +421,12 @@ public:
#define REPL_STATE_CONNECTED 17 /* Connected to master */
/* State of slaves from the POV of the master. Used in client->replstate.
* In SEND_BULK and ONLINE state the slave receives new updates
* In SEND_BULK and ONLINE state the replica receives new updates
* in its output queue. In the WAIT_BGSAVE states instead the server is waiting
* to start the next background saving in order to send updates to it. */
#define SLAVE_STATE_WAIT_BGSAVE_START 6 /* We need to produce a new RDB file. */
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to slave. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
/* Slave capabilities. */
@ -434,7 +434,7 @@ public:
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Synchronous read timeout - slave side */
/* Synchronous read timeout - replica side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
/* List related stuff */
@ -543,7 +543,7 @@ public:
/* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
#define RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to slave socket. */
#define RDB_CHILD_TYPE_SOCKET 2 /* RDB is written to replica socket. */
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes. */
@ -722,6 +722,8 @@ typedef struct RedisModuleDigest {
#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF)
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
#define MVCC_MS_SHIFT 20
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
@ -800,6 +802,24 @@ public:
void expireSubKey(const char *szSubkey, long long when)
{
// First check if the subkey already has an expiration
for (auto &entry : m_vecexpireEntries)
{
if (szSubkey != nullptr)
{
// if this is a subkey expiry then its not a match if the expireEntry is either for the
// primary key or a different subkey
if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0)
continue;
}
else
{
if (entry.spsubkey != nullptr)
continue;
}
m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data()));
break;
}
auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr;
m_vecexpireEntries.emplace(itrInsert, when, subkey);
@ -823,6 +843,7 @@ class expireEntry {
public:
class iter
{
friend class expireEntry;
expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
@ -959,6 +980,14 @@ public:
return iter(this, 1);
}
void erase(iter &itr)
{
if (!FFat())
throw -1; // assert
pfatentry()->m_vecexpireEntries.erase(
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
bool FGetPrimaryExpire(long long *pwhen)
{
*pwhen = -1;
@ -1173,8 +1202,8 @@ typedef struct client {
int casyncOpsPending;
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int replstate; /* Replication state if this is a replica. */
int repl_put_online_on_ack; /* Install replica write handler on ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
@ -1182,10 +1211,10 @@ typedef struct client {
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long reploff_skipped; /* Repl backlog we did not send to this client */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_ack_off; /* Replication ack offset, if this is a replica. */
long long repl_ack_time;/* Replication ack time, if this is a replica. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
copying this replica output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
@ -1426,7 +1455,7 @@ struct redisMaster {
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
int repl_state; /* Replication status if the instance is a slave */
int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
@ -1636,7 +1665,7 @@ struct redisServer {
int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
int rdb_pipe_read_result_from_child; /* of each replica in diskless SYNC. */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@ -1656,8 +1685,8 @@ struct redisServer {
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
long long master_repl_offset; /* My current replication offset */
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
int replicaseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the replica every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */
@ -1674,7 +1703,7 @@ struct redisServer {
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
/* Replication (replica) */
list *masters;
int enable_multimaster;
int repl_timeout; /* Timeout after N seconds of master idle */
@ -1745,7 +1774,7 @@ struct redisServer {
int cluster_slave_validity_factor; /* Slave max data age for failover. */
int cluster_require_full_coverage; /* If true, put the cluster down if
there is at least an uncovered slot.*/
int cluster_slave_no_failover; /* Prevent slave from starting a failover
int cluster_slave_no_failover; /* Prevent replica from starting a failover
if the master is in failure state. */
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
int cluster_announce_port; /* base port to announce on cluster bus. */
@ -2014,7 +2043,7 @@ const char *getClientTypeName(int cclass);
void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
void disconnectSlavesExcept(unsigned char *uuid);
int listenToPort(int port, int *fds, int *count, int fReusePort);
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(int iel);
@ -2168,7 +2197,7 @@ void replicationSendNewlineToMaster(struct redisMaster *mi);
long long replicationGetSlaveOffset(struct redisMaster *mi);
char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
int replicationSetupSlaveForFullResync(client *replica, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void mergeReplicationId(const char *);
@ -2408,6 +2437,7 @@ int rewriteConfig(char *path);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey);
void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
expireEntry *getExpire(redisDb *db, robj_roptr key);
@ -2608,6 +2638,7 @@ void monitorCommand(client *c);
void expireCommand(client *c);
void expireatCommand(client *c);
void expireMemberCommand(client *c);
void expireMemberAtCommand(client *c);
void pexpireCommand(client *c);
void pexpireatCommand(client *c);
void getsetCommand(client *c);

View File

@ -33,7 +33,7 @@
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
/* Redis performs most of the I/O in a nonblocking way, with the exception
* of the SYNC command where the slave does it in a blocking way, and
* of the SYNC command where the replica does it in a blocking way, and
* the MIGRATE command that must be blocking in order to be atomic from the
* point of view of the two instances (one migrating the key and one receiving
* the key). This is why need the following blocking I/O functions.

View File

@ -803,7 +803,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
*
* Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */
* useless work in the replica side, trying to fetch the stream item. */
robj *argv[14];
argv[0] = createStringObject("XCLAIM",6);
argv[1] = key;

View File

@ -202,7 +202,7 @@ REDIS_CHKCONFIG_INFO=\
# chkconfig: - 58 74\n
# description: redis_${REDIS_PORT} is the redis daemon.\n
### BEGIN INIT INFO\n
# Provides: redis_6379\n
# Provides: redis_${REDIS_PORT}\n
# Required-Start: \$network \$local_fs \$remote_fs\n
# Required-Stop: \$network \$local_fs \$remote_fs\n
# Default-Start: 2 3 4 5\n