Merge branch 'unstable' into RELEASE_5
Former-commit-id: 53b95d830ae7d62ec0a9083a59c8eca1ce2bff69
This commit is contained in:
commit
db872b2895
20
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
20
.github/ISSUE_TEMPLATE/bug_report.md
vendored
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
---
|
||||||
|
name: Bug report
|
||||||
|
about: Create a report to help us improve
|
||||||
|
title: ''
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Describe the bug**
|
||||||
|
A clear and concise description of what the bug is.
|
||||||
|
|
||||||
|
** Log Files **
|
||||||
|
These should be KeyDB logs, not syslogs or logs from your container manager. If you are reporting a crash there will be a line in your log stating:
|
||||||
|
"=== KEYDB BUG REPORT START: Cut & paste starting from here ==="
|
||||||
|
|
||||||
|
Please copy everything after this line.
|
||||||
|
|
||||||
|
**To Reproduce**
|
||||||
|
Do you know how to reproduce this? If so please provide repro steps.
|
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
20
.github/ISSUE_TEMPLATE/feature_request.md
vendored
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
---
|
||||||
|
name: Feature request
|
||||||
|
about: Suggest an idea for this project
|
||||||
|
title: ''
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Is your feature request related to a problem? Please describe.**
|
||||||
|
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||||
|
|
||||||
|
**Describe the solution you'd like**
|
||||||
|
A clear and concise description of what you want to happen.
|
||||||
|
|
||||||
|
**Describe alternatives you've considered**
|
||||||
|
A clear and concise description of any alternative solutions or features you've considered.
|
||||||
|
|
||||||
|
**Additional context**
|
||||||
|
Add any other context or screenshots about the feature request here.
|
@ -4,7 +4,7 @@ FROM ubuntu:18.04
|
|||||||
|
|
||||||
RUN apt-get update \
|
RUN apt-get update \
|
||||||
&& DEBIAN_FRONTEND=noninteractive apt-get install -qqy \
|
&& DEBIAN_FRONTEND=noninteractive apt-get install -qqy \
|
||||||
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev \
|
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev libcurl4-openssl-dev \
|
||||||
&& apt-get clean
|
&& apt-get clean
|
||||||
|
|
||||||
CMD make
|
CMD make
|
||||||
|
@ -84,7 +84,7 @@ fastlock g_lock("AE (global)");
|
|||||||
#endif
|
#endif
|
||||||
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
thread_local aeEventLoop *g_eventLoopThisThread = NULL;
|
||||||
|
|
||||||
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0)
|
#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSERT FAILURE %s: %d\n", __FILE__, __LINE__); *((volatile int*)0) = 1; } while(0)
|
||||||
|
|
||||||
/* Include the best multiplexing layer supported by this system.
|
/* Include the best multiplexing layer supported by this system.
|
||||||
* The following should be ordered by performances, descending. */
|
* The following should be ordered by performances, descending. */
|
||||||
|
@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) {
|
|||||||
* This function will just make sure that the original node associated
|
* This function will just make sure that the original node associated
|
||||||
* with this link will have the 'link' field set to NULL. */
|
* with this link will have the 'link' field set to NULL. */
|
||||||
void freeClusterLink(clusterLink *link) {
|
void freeClusterLink(clusterLink *link) {
|
||||||
|
if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN)
|
||||||
|
{
|
||||||
|
// we can't perform this operation on this thread, queue it on the main thread
|
||||||
|
if (link->node)
|
||||||
|
link->node->link = NULL;
|
||||||
|
link->node = nullptr;
|
||||||
|
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||||
|
freeClusterLink(link);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
||||||
}
|
}
|
||||||
@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) {
|
|||||||
* consumed by write(). We don't try to optimize this for speed too much
|
* consumed by write(). We don't try to optimize this for speed too much
|
||||||
* as this is a very low traffic channel. */
|
* as this is a very low traffic channel. */
|
||||||
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
|
serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN);
|
||||||
clusterLink *link = (clusterLink*) privdata;
|
clusterLink *link = (clusterLink*) privdata;
|
||||||
ssize_t nwritten;
|
ssize_t nwritten;
|
||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(mask);
|
UNUSED(mask);
|
||||||
|
|
||||||
nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
|
// We're about to release the lock, so the link's sndbuf needs to be owned fully by us
|
||||||
|
// allocate a new one in case anyone tries to write while we're waiting
|
||||||
|
sds sndbuf = link->sndbuf;
|
||||||
|
link->sndbuf = sdsempty();
|
||||||
|
|
||||||
|
aeReleaseLock();
|
||||||
|
nwritten = write(fd, sndbuf, sdslen(sndbuf));
|
||||||
|
aeAcquireLock();
|
||||||
|
|
||||||
if (nwritten <= 0) {
|
if (nwritten <= 0) {
|
||||||
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
|
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
|
||||||
(nwritten == -1) ? strerror(errno) : "short write");
|
(nwritten == -1) ? strerror(errno) : "short write");
|
||||||
|
sdsfree(sndbuf);
|
||||||
handleLinkIOError(link);
|
handleLinkIOError(link);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sdsrange(link->sndbuf,nwritten,-1);
|
sdsrange(sndbuf,nwritten,-1);
|
||||||
|
// Restore our send buffer, ensuring any unsent data is first
|
||||||
|
sndbuf = sdscat(sndbuf, link->sndbuf);
|
||||||
|
sdsfree(link->sndbuf);
|
||||||
|
link->sndbuf = sndbuf;
|
||||||
if (sdslen(link->sndbuf) == 0)
|
if (sdslen(link->sndbuf) == 0)
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE);
|
aeDeleteFileEvent(el, link->fd, AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Read data. Try to read the first field of the header first to check the
|
/* Read data. Try to read the first field of the header first to check the
|
||||||
@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* the link to be invalidated, so it is safe to call this function
|
* the link to be invalidated, so it is safe to call this function
|
||||||
* from event handlers that will do stuff with the same link later. */
|
* from event handlers that will do stuff with the same link later. */
|
||||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
||||||
clusterWriteHandler,link);
|
clusterWriteHandler,link,false);
|
||||||
|
|
||||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||||
|
|
||||||
@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
|||||||
void resetManualFailover(void) {
|
void resetManualFailover(void) {
|
||||||
if (g_pserver->cluster->mf_end && clientsArePaused()) {
|
if (g_pserver->cluster->mf_end && clientsArePaused()) {
|
||||||
g_pserver->clients_pause_end_time = 0;
|
g_pserver->clients_pause_end_time = 0;
|
||||||
clientsArePaused(); /* Just use the side effect of the function. */
|
unpauseClientsIfNecessary();
|
||||||
}
|
}
|
||||||
g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */
|
g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */
|
||||||
g_pserver->cluster->mf_can_start = 0;
|
g_pserver->cluster->mf_can_start = 0;
|
||||||
@ -4154,16 +4180,32 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
dictIterator *di = dictGetSafeIterator(g_pserver->cluster->nodes);
|
dictIterator *di = dictGetSafeIterator(g_pserver->cluster->nodes);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
clusterNode *node = (clusterNode*)dictGetVal(de);
|
clusterNode *node = (clusterNode*)dictGetVal(de);
|
||||||
int j = 0, start = -1;
|
int start = -1;
|
||||||
|
|
||||||
/* Skip slaves (that are iterated when producing the output of their
|
/* Skip slaves (that are iterated when producing the output of their
|
||||||
* master) and masters not serving any slot. */
|
* master) and masters not serving any slot. */
|
||||||
if (!nodeIsMaster(node) || node->numslots == 0) continue;
|
if (!nodeIsMaster(node) || node->numslots == 0) continue;
|
||||||
|
|
||||||
for (j = 0; j < CLUSTER_SLOTS; j++) {
|
static_assert((CLUSTER_SLOTS % (sizeof(uint32_t)*8)) == 0, "code below assumes the bitfield is a multiple of sizeof(unsinged)");
|
||||||
int bit, i;
|
|
||||||
|
|
||||||
if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
|
for (unsigned iw = 0; iw < (CLUSTER_SLOTS/sizeof(uint32_t)/8); ++iw)
|
||||||
|
{
|
||||||
|
uint32_t wordCur = reinterpret_cast<uint32_t*>(node->slots)[iw];
|
||||||
|
if (iw != ((CLUSTER_SLOTS/sizeof(uint32_t)/8)-1))
|
||||||
|
{
|
||||||
|
if (start == -1 && wordCur == 0)
|
||||||
|
continue;
|
||||||
|
if (start != -1 && (wordCur+1)==0)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
unsigned ibitStartLoop = iw*sizeof(uint32_t)*8;
|
||||||
|
|
||||||
|
for (unsigned j = ibitStartLoop; j < (iw+1)*sizeof(uint32_t)*8; j++) {
|
||||||
|
int i;
|
||||||
|
int bit = (int)(wordCur & 1);
|
||||||
|
wordCur >>= 1;
|
||||||
|
if (bit != 0) {
|
||||||
if (start == -1) start = j;
|
if (start == -1) start = j;
|
||||||
}
|
}
|
||||||
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
|
||||||
@ -4205,6 +4247,9 @@ void clusterReplyMultiBulkSlots(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
serverAssert(start == -1);
|
||||||
|
}
|
||||||
|
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
setDeferredArrayLen(c, slot_replylen, num_masters);
|
setDeferredArrayLen(c, slot_replylen, num_masters);
|
||||||
}
|
}
|
||||||
|
@ -1374,6 +1374,8 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
|
|||||||
|
|
||||||
if (g_pserver->aof_state != AOF_OFF)
|
if (g_pserver->aof_state != AOF_OFF)
|
||||||
feedAppendOnlyFile(cserver.delCommand,db->id,argv,2);
|
feedAppendOnlyFile(cserver.delCommand,db->id,argv,2);
|
||||||
|
// Active replicas do their own expiries, do not propogate
|
||||||
|
if (!g_pserver->fActiveReplica)
|
||||||
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
|
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
|
||||||
|
|
||||||
decrRefCount(argv[0]);
|
decrRefCount(argv[0]);
|
||||||
@ -1442,7 +1444,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
|
|||||||
* Still we try to return the right information to the caller,
|
* Still we try to return the right information to the caller,
|
||||||
* that is, 0 if we think the key should be still valid, 1 if
|
* that is, 0 if we think the key should be still valid, 1 if
|
||||||
* we think the key is expired at this time. */
|
* we think the key is expired at this time. */
|
||||||
if (listLength(g_pserver->masters)) return 1;
|
if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1;
|
||||||
|
|
||||||
/* Delete the key */
|
/* Delete the key */
|
||||||
g_pserver->stat_expiredkeys++;
|
g_pserver->stat_expiredkeys++;
|
||||||
|
@ -535,7 +535,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
|
|||||||
*
|
*
|
||||||
* Instead we take the other branch of the IF statement setting an expire
|
* Instead we take the other branch of the IF statement setting an expire
|
||||||
* (possibly in the past) and wait for an explicit DEL from the master. */
|
* (possibly in the past) and wait for an explicit DEL from the master. */
|
||||||
if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) {
|
if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) {
|
||||||
robj *aux;
|
robj *aux;
|
||||||
|
|
||||||
int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
|
int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
|
||||||
|
@ -258,7 +258,10 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name)
|
|||||||
lock->m_depth = 0;
|
lock->m_depth = 0;
|
||||||
lock->m_pidOwner = -1;
|
lock->m_pidOwner = -1;
|
||||||
lock->futex = 0;
|
lock->futex = 0;
|
||||||
lock->szName = name;
|
int cch = strlen(name);
|
||||||
|
cch = std::min<int>(cch, sizeof(lock->szName)-1);
|
||||||
|
memcpy(lock->szName, name, cch);
|
||||||
|
lock->szName[cch] = '\0';
|
||||||
ANNOTATE_RWLOCK_CREATE(lock);
|
ANNOTATE_RWLOCK_CREATE(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
@ -40,12 +41,13 @@ struct ticket
|
|||||||
|
|
||||||
struct fastlock
|
struct fastlock
|
||||||
{
|
{
|
||||||
volatile struct ticket m_ticket;
|
|
||||||
|
|
||||||
volatile int m_pidOwner;
|
volatile int m_pidOwner;
|
||||||
volatile int m_depth;
|
volatile int m_depth;
|
||||||
|
char szName[56];
|
||||||
|
/* Volatile data on seperate cache line */
|
||||||
|
volatile struct ticket m_ticket;
|
||||||
unsigned futex;
|
unsigned futex;
|
||||||
const char *szName;
|
char padding[56]; // ensure ticket and futex are on their own independent cache line
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
fastlock(const char *name)
|
fastlock(const char *name)
|
||||||
@ -81,3 +83,5 @@ struct fastlock
|
|||||||
bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only
|
bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static_assert(offsetof(struct fastlock, m_ticket) == 64, "ensure padding is correct");
|
@ -16,10 +16,11 @@ fastlock_lock:
|
|||||||
.cfi_startproc
|
.cfi_startproc
|
||||||
.cfi_def_cfa rsp, 8
|
.cfi_def_cfa rsp, 8
|
||||||
# RDI points to the struct:
|
# RDI points to the struct:
|
||||||
# uint16_t active
|
|
||||||
# uint16_t avail
|
|
||||||
# int32_t m_pidOwner
|
# int32_t m_pidOwner
|
||||||
# int32_t m_depth
|
# int32_t m_depth
|
||||||
|
# [rdi+64] ...
|
||||||
|
# uint16_t active
|
||||||
|
# uint16_t avail
|
||||||
|
|
||||||
# First get our TID and put it in ecx
|
# First get our TID and put it in ecx
|
||||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
push rdi # we need our struct pointer (also balance the stack for the call)
|
||||||
@ -29,18 +30,18 @@ fastlock_lock:
|
|||||||
pop rdi # get our pointer back
|
pop rdi # get our pointer back
|
||||||
.cfi_adjust_cfa_offset -8
|
.cfi_adjust_cfa_offset -8
|
||||||
|
|
||||||
cmp [rdi+4], esi # Is the TID we got back the owner of the lock?
|
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||||
je .LLocked # Don't spin in that case
|
je .LLocked # Don't spin in that case
|
||||||
|
|
||||||
xor eax, eax # eliminate partial register dependency
|
xor eax, eax # eliminate partial register dependency
|
||||||
inc eax # we want to add one
|
inc eax # we want to add one
|
||||||
lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition
|
lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition
|
||||||
# ax now contains the ticket
|
# ax now contains the ticket
|
||||||
# OK Start the wait loop
|
# OK Start the wait loop
|
||||||
xor ecx, ecx
|
xor ecx, ecx
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LLoop:
|
.LLoop:
|
||||||
mov edx, [rdi]
|
mov edx, [rdi+64]
|
||||||
cmp dx, ax # is our ticket up?
|
cmp dx, ax # is our ticket up?
|
||||||
je .LLocked # leave the loop
|
je .LLocked # leave the loop
|
||||||
pause
|
pause
|
||||||
@ -72,8 +73,8 @@ fastlock_lock:
|
|||||||
jmp .LLoop # Get back in the game
|
jmp .LLoop # Get back in the game
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LLocked:
|
.LLocked:
|
||||||
mov [rdi+4], esi # lock->m_pidOwner = gettid()
|
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||||
inc dword ptr [rdi+8] # lock->m_depth++
|
inc dword ptr [rdi+4] # lock->m_depth++
|
||||||
ret
|
ret
|
||||||
.cfi_endproc
|
.cfi_endproc
|
||||||
|
|
||||||
@ -82,10 +83,11 @@ fastlock_lock:
|
|||||||
.type fastlock_trylock,@function
|
.type fastlock_trylock,@function
|
||||||
fastlock_trylock:
|
fastlock_trylock:
|
||||||
# RDI points to the struct:
|
# RDI points to the struct:
|
||||||
# uint16_t active
|
|
||||||
# uint16_t avail
|
|
||||||
# int32_t m_pidOwner
|
# int32_t m_pidOwner
|
||||||
# int32_t m_depth
|
# int32_t m_depth
|
||||||
|
# [rdi+64] ...
|
||||||
|
# uint16_t active
|
||||||
|
# uint16_t avail
|
||||||
|
|
||||||
# First get our TID and put it in ecx
|
# First get our TID and put it in ecx
|
||||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
push rdi # we need our struct pointer (also balance the stack for the call)
|
||||||
@ -93,10 +95,10 @@ fastlock_trylock:
|
|||||||
mov esi, eax # back it up in esi
|
mov esi, eax # back it up in esi
|
||||||
pop rdi # get our pointer back
|
pop rdi # get our pointer back
|
||||||
|
|
||||||
cmp [rdi+4], esi # Is the TID we got back the owner of the lock?
|
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||||
je .LRecursive # Don't spin in that case
|
je .LRecursive # Don't spin in that case
|
||||||
|
|
||||||
mov eax, [rdi] # get both active and avail counters
|
mov eax, [rdi+64] # get both active and avail counters
|
||||||
mov ecx, eax # duplicate in ecx
|
mov ecx, eax # duplicate in ecx
|
||||||
ror ecx, 16 # swap upper and lower 16-bits
|
ror ecx, 16 # swap upper and lower 16-bits
|
||||||
cmp eax, ecx # are the upper and lower 16-bits the same?
|
cmp eax, ecx # are the upper and lower 16-bits the same?
|
||||||
@ -104,18 +106,18 @@ fastlock_trylock:
|
|||||||
|
|
||||||
# at this point we know eax+ecx have [avail][active] and they are both the same
|
# at this point we know eax+ecx have [avail][active] and they are both the same
|
||||||
add ecx, 0x10000 # increment avail, ecx is now our wanted value
|
add ecx, 0x10000 # increment avail, ecx is now our wanted value
|
||||||
lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail)
|
lock cmpxchg [rdi+64], ecx # If rdi still contains the value in eax, put in ecx (inc avail)
|
||||||
jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing
|
jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing
|
||||||
xor eax, eax
|
xor eax, eax
|
||||||
inc eax # return SUCCESS! (eax=1)
|
inc eax # return SUCCESS! (eax=1)
|
||||||
mov [rdi+4], esi # lock->m_pidOwner = gettid()
|
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||||
mov dword ptr [rdi+8], eax # lock->m_depth = 1
|
mov dword ptr [rdi+4], eax # lock->m_depth = 1
|
||||||
ret
|
ret
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LRecursive:
|
.LRecursive:
|
||||||
xor eax, eax
|
xor eax, eax
|
||||||
inc eax # return SUCCESS! (eax=1)
|
inc eax # return SUCCESS! (eax=1)
|
||||||
inc dword ptr [rdi+8] # lock->m_depth++
|
inc dword ptr [rdi+4] # lock->m_depth++
|
||||||
ret
|
ret
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LAlreadyLocked:
|
.LAlreadyLocked:
|
||||||
@ -126,23 +128,25 @@ fastlock_trylock:
|
|||||||
.global fastlock_unlock
|
.global fastlock_unlock
|
||||||
fastlock_unlock:
|
fastlock_unlock:
|
||||||
# RDI points to the struct:
|
# RDI points to the struct:
|
||||||
# uint16_t active
|
|
||||||
# uint16_t avail
|
|
||||||
# int32_t m_pidOwner
|
# int32_t m_pidOwner
|
||||||
# int32_t m_depth
|
# int32_t m_depth
|
||||||
|
# [rdi+64] ...
|
||||||
|
# uint16_t active
|
||||||
|
# uint16_t avail
|
||||||
push r11
|
push r11
|
||||||
sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state
|
||||||
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
|
jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it
|
||||||
mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore)
|
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
||||||
mov ecx, [rdi] # get current active (this one)
|
mov ecx, [rdi+64] # get current active (this one)
|
||||||
inc ecx # bump it to the next thread
|
inc ecx # bump it to the next thread
|
||||||
mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
||||||
# At this point the lock is removed, however we must wake up any pending futexs
|
# At this point the lock is removed, however we must wake up any pending futexs
|
||||||
mov r9d, 1 # eax is the bitmask for 2 threads
|
mov r9d, 1 # eax is the bitmask for 2 threads
|
||||||
rol r9d, cl # place the mask in the right spot for the next 2 threads
|
rol r9d, cl # place the mask in the right spot for the next 2 threads
|
||||||
|
add rdi, 64 # rdi now points to the token
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LRetryWake:
|
.LRetryWake:
|
||||||
mov r11d, [rdi+12] # load the futex mask
|
mov r11d, [rdi+4] # load the futex mask
|
||||||
and r11d, r9d # are any threads waiting on a futex?
|
and r11d, r9d # are any threads waiting on a futex?
|
||||||
jz .LDone # if not we're done.
|
jz .LDone # if not we're done.
|
||||||
# we have to wake the futexs
|
# we have to wake the futexs
|
||||||
|
@ -99,6 +99,7 @@ client *createClient(int fd, int iel) {
|
|||||||
* in the context of a client. When commands are executed in other
|
* in the context of a client. When commands are executed in other
|
||||||
* contexts (for instance a Lua script) we need a non connected client. */
|
* contexts (for instance a Lua script) we need a non connected client. */
|
||||||
if (fd != -1) {
|
if (fd != -1) {
|
||||||
|
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
|
||||||
anetNonBlock(NULL,fd);
|
anetNonBlock(NULL,fd);
|
||||||
anetEnableTcpNoDelay(NULL,fd);
|
anetEnableTcpNoDelay(NULL,fd);
|
||||||
if (cserver.tcpkeepalive)
|
if (cserver.tcpkeepalive)
|
||||||
@ -2521,9 +2522,17 @@ NULL
|
|||||||
close_this_client = 1;
|
close_this_client = 1;
|
||||||
} else {
|
} else {
|
||||||
if (FCorrectThread(client))
|
if (FCorrectThread(client))
|
||||||
|
{
|
||||||
freeClient(client);
|
freeClient(client);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
int iel = client->iel;
|
||||||
freeClientAsync(client);
|
freeClientAsync(client);
|
||||||
|
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
|
||||||
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
killed++;
|
killed++;
|
||||||
}
|
}
|
||||||
@ -2952,38 +2961,48 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
* than the time left for the previous pause, no change is made to the
|
* than the time left for the previous pause, no change is made to the
|
||||||
* left duration. */
|
* left duration. */
|
||||||
void pauseClients(mstime_t end) {
|
void pauseClients(mstime_t end) {
|
||||||
if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time)
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time)
|
||||||
g_pserver->clients_pause_end_time = end;
|
g_pserver->clients_pause_end_time = end;
|
||||||
g_pserver->clients_paused = 1;
|
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
g_pserver->rgthreadvar[iel].clients_paused = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return non-zero if clients are currently paused. As a side effect the
|
/* Return non-zero if clients are currently paused. As a side effect the
|
||||||
* function checks if the pause time was reached and clear it. */
|
* function checks if the pause time was reached and clear it. */
|
||||||
int clientsArePaused(void) {
|
int clientsArePaused(void) {
|
||||||
if (g_pserver->clients_paused &&
|
return serverTL->clients_paused;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unpauseClientsIfNecessary()
|
||||||
|
{
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
if (serverTL->clients_paused &&
|
||||||
g_pserver->clients_pause_end_time < g_pserver->mstime)
|
g_pserver->clients_pause_end_time < g_pserver->mstime)
|
||||||
{
|
{
|
||||||
aeAcquireLock();
|
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
client *c;
|
client *c;
|
||||||
|
|
||||||
g_pserver->clients_paused = 0;
|
serverTL->clients_paused = 0;
|
||||||
|
|
||||||
/* Put all the clients in the unblocked clients queue in order to
|
/* Put all the clients in the unblocked clients queue in order to
|
||||||
* force the re-processing of the input buffer if any. */
|
* force the re-processing of the input buffer if any. */
|
||||||
listRewind(g_pserver->clients,&li);
|
listRewind(g_pserver->clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
c = (client*)listNodeValue(ln);
|
c = (client*)listNodeValue(ln);
|
||||||
|
if (!FCorrectThread(c))
|
||||||
|
continue;
|
||||||
|
|
||||||
/* Don't touch slaves and blocked clients.
|
/* Don't touch slaves and blocked clients.
|
||||||
* The latter pending requests will be processed when unblocked. */
|
* The latter pending requests will be processed when unblocked. */
|
||||||
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
|
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
|
||||||
queueClientForReprocessing(c);
|
queueClientForReprocessing(c);
|
||||||
}
|
}
|
||||||
aeReleaseLock();
|
|
||||||
}
|
}
|
||||||
return g_pserver->clients_paused;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called by Redis in order to process a few events from
|
/* This function is called by Redis in order to process a few events from
|
||||||
|
@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int getUnsignedLongLongFromObject(robj *o, uint64_t *target) {
|
||||||
|
uint64_t value;
|
||||||
|
|
||||||
|
if (o == NULL) {
|
||||||
|
value = 0;
|
||||||
|
} else {
|
||||||
|
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
|
||||||
|
if (sdsEncodedObject(o)) {
|
||||||
|
char *pchEnd = nullptr;
|
||||||
|
errno = 0;
|
||||||
|
value = strtoull(szFromObj(o), &pchEnd, 10);
|
||||||
|
if (value == 0) {
|
||||||
|
// potential error
|
||||||
|
if (errno != 0)
|
||||||
|
return C_ERR;
|
||||||
|
if (pchEnd == szFromObj(o))
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
} else if (o->encoding == OBJ_ENCODING_INT) {
|
||||||
|
value = (long)ptrFromObj(o);
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown string encoding");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (target) *target = value;
|
||||||
|
return C_OK;
|
||||||
|
}
|
||||||
|
|
||||||
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
|
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
|
||||||
long long value;
|
long long value;
|
||||||
if (getLongLongFromObject(o, &value) != C_OK) {
|
if (getLongLongFromObject(o, &value) != C_OK) {
|
||||||
|
@ -2115,12 +2115,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||||
/* Read value */
|
/* Read value */
|
||||||
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
|
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
|
||||||
|
bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold;
|
||||||
/* Check if the key already expired. This function is used when loading
|
/* Check if the key already expired. This function is used when loading
|
||||||
* an RDB file from disk, either at startup, or when an RDB was
|
* an RDB file from disk, either at startup, or when an RDB was
|
||||||
* received from the master. In the latter case, the master is
|
* received from the master. In the latter case, the master is
|
||||||
* responsible for key expiry. If we would expire keys here, the
|
* responsible for key expiry. If we would expire keys here, the
|
||||||
* snapshot taken by the master may not be reflected on the replica. */
|
* snapshot taken by the master may not be reflected on the replica. */
|
||||||
if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
|
bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now;
|
||||||
|
if (fStaleMvccKey || fExpiredKey) {
|
||||||
|
if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
|
||||||
|
// We have a key that we've already deleted and is not back in our database.
|
||||||
|
// We'll need to inform the sending master of the delete if it is also a replica of us
|
||||||
|
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key);
|
||||||
|
}
|
||||||
decrRefCount(key);
|
decrRefCount(key);
|
||||||
key = nullptr;
|
key = nullptr;
|
||||||
decrRefCount(val);
|
decrRefCount(val);
|
||||||
|
@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
|||||||
void replicationSendAck(redisMaster *mi);
|
void replicationSendAck(redisMaster *mi);
|
||||||
void putSlaveOnline(client *replica);
|
void putSlaveOnline(client *replica);
|
||||||
int cancelReplicationHandshake(redisMaster *mi);
|
int cancelReplicationHandshake(redisMaster *mi);
|
||||||
|
static void propagateMasterStaleKeys();
|
||||||
|
|
||||||
/* --------------------------- Utility functions ---------------------------- */
|
/* --------------------------- Utility functions ---------------------------- */
|
||||||
|
|
||||||
@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client *replicaFromMaster(redisMaster *mi)
|
||||||
|
{
|
||||||
|
if (mi->master == nullptr)
|
||||||
|
return nullptr;
|
||||||
|
|
||||||
|
listIter liReplica;
|
||||||
|
listNode *lnReplica;
|
||||||
|
listRewind(g_pserver->slaves, &liReplica);
|
||||||
|
while ((lnReplica = listNext(&liReplica)) != nullptr)
|
||||||
|
{
|
||||||
|
client *replica = (client*)listNodeValue(lnReplica);
|
||||||
|
if (FSameHost(mi->master, replica))
|
||||||
|
return replica;
|
||||||
|
}
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
/* ---------------------------------- MASTER -------------------------------- */
|
/* ---------------------------------- MASTER -------------------------------- */
|
||||||
|
|
||||||
void createReplicationBacklog(void) {
|
void createReplicationBacklog(void) {
|
||||||
@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
char uuid[40] = {'\0'};
|
char uuid[40] = {'\0'};
|
||||||
uuid_unparse(cserver.uuid, uuid);
|
uuid_unparse(cserver.uuid, uuid);
|
||||||
char proto[1024];
|
char proto[1024];
|
||||||
int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||||
cchProto = std::min((int)sizeof(proto), cchProto);
|
cchProto = std::min((int)sizeof(proto), cchProto);
|
||||||
long long master_repl_offset_start = g_pserver->master_repl_offset;
|
long long master_repl_offset_start = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
char szDbNum[128];
|
char szDbNum[128];
|
||||||
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid);
|
int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid);
|
||||||
|
int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid);
|
||||||
|
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||||
|
|
||||||
|
char szMvcc[128];
|
||||||
|
uint64_t mvccTstamp = getMvccTstamp();
|
||||||
|
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
||||||
|
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
||||||
|
cchMvcc = std::min<int>(cchMvcc, sizeof(szMvcc)); // tricky snprintf
|
||||||
|
|
||||||
/* Write the command to the replication backlog if any. */
|
/* Write the command to the replication backlog if any. */
|
||||||
if (g_pserver->repl_backlog)
|
if (g_pserver->repl_backlog)
|
||||||
@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
const char *crlf = "\r\n";
|
const char *crlf = "\r\n";
|
||||||
feedReplicationBacklog(crlf, 2);
|
feedReplicationBacklog(crlf, 2);
|
||||||
feedReplicationBacklog(szDbNum, cchDbNum);
|
feedReplicationBacklog(szDbNum, cchDbNum);
|
||||||
|
feedReplicationBacklog(szMvcc, cchMvcc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
{
|
{
|
||||||
addReplyAsync(replica,shared.crlf);
|
addReplyAsync(replica,shared.crlf);
|
||||||
addReplyProtoAsync(replica, szDbNum, cchDbNum);
|
addReplyProtoAsync(replica, szDbNum, cchDbNum);
|
||||||
|
addReplyProtoAsync(replica, szMvcc, cchMvcc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -669,6 +697,7 @@ int masterTryPartialResynchronization(client *c) {
|
|||||||
c->repl_ack_time = g_pserver->unixtime;
|
c->repl_ack_time = g_pserver->unixtime;
|
||||||
c->repl_put_online_on_ack = 0;
|
c->repl_put_online_on_ack = 0;
|
||||||
listAddNodeTail(g_pserver->slaves,c);
|
listAddNodeTail(g_pserver->slaves,c);
|
||||||
|
|
||||||
/* We can't use the connection buffers since they are used to accumulate
|
/* We can't use the connection buffers since they are used to accumulate
|
||||||
* new commands at this stage. But we are sure the socket send buffer is
|
* new commands at this stage. But we are sure the socket send buffer is
|
||||||
* empty so this write will never fail actually. */
|
* empty so this write will never fail actually. */
|
||||||
@ -1002,6 +1031,8 @@ void replconfCommand(client *c) {
|
|||||||
c->slave_capa |= SLAVE_CAPA_EOF;
|
c->slave_capa |= SLAVE_CAPA_EOF;
|
||||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2"))
|
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2"))
|
||||||
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
||||||
|
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
||||||
|
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
||||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
||||||
/* REPLCONF ACK is used by replica to inform the master the amount
|
/* REPLCONF ACK is used by replica to inform the master the amount
|
||||||
* of replication stream that it processed so far. It is an
|
* of replication stream that it processed so far. It is an
|
||||||
@ -1071,6 +1102,14 @@ void putSlaveOnline(client *replica) {
|
|||||||
refreshGoodSlavesCount();
|
refreshGoodSlavesCount();
|
||||||
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
|
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
|
||||||
replicationGetSlaveName(replica));
|
replicationGetSlaveName(replica));
|
||||||
|
|
||||||
|
if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica)
|
||||||
|
{
|
||||||
|
serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations."
|
||||||
|
"\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica));
|
||||||
|
serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions."
|
||||||
|
"\n\tPlease fix your replica topology");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
@ -1576,6 +1615,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE);
|
aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE);
|
||||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||||
|
if (g_pserver->fActiveReplica)
|
||||||
|
{
|
||||||
|
rsi.mvccMinThreshold = mi->mvccLastSync;
|
||||||
|
if (mi->staleKeyMap != nullptr)
|
||||||
|
mi->staleKeyMap->clear();
|
||||||
|
else
|
||||||
|
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
|
||||||
|
rsi.mi = mi;
|
||||||
|
}
|
||||||
if (rdbLoadFile(rdb_filename, &rsi) != C_OK) {
|
if (rdbLoadFile(rdb_filename, &rsi) != C_OK) {
|
||||||
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
||||||
cancelReplicationHandshake(mi);
|
cancelReplicationHandshake(mi);
|
||||||
@ -2094,8 +2142,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
*
|
*
|
||||||
* The master will ignore capabilities it does not understand. */
|
* The master will ignore capabilities it does not understand. */
|
||||||
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
|
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
|
||||||
|
if (g_pserver->fActiveReplica)
|
||||||
|
{
|
||||||
|
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||||
|
"capa","eof","capa","psync2","capa","activeExpire",NULL);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||||
"capa","eof","capa","psync2",NULL);
|
"capa","eof","capa","psync2",NULL);
|
||||||
|
}
|
||||||
if (err) goto write_error;
|
if (err) goto write_error;
|
||||||
sdsfree(err);
|
sdsfree(err);
|
||||||
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
|
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
|
||||||
@ -2363,6 +2419,7 @@ void freeMasterInfo(redisMaster *mi)
|
|||||||
{
|
{
|
||||||
zfree(mi->masterauth);
|
zfree(mi->masterauth);
|
||||||
zfree(mi->masteruser);
|
zfree(mi->masteruser);
|
||||||
|
delete mi->staleKeyMap;
|
||||||
zfree(mi);
|
zfree(mi);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3196,6 +3253,8 @@ void replicationCron(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
propagateMasterStaleKeys();
|
||||||
|
|
||||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||||
refreshGoodSlavesCount();
|
refreshGoodSlavesCount();
|
||||||
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
||||||
@ -3342,6 +3401,17 @@ void replicaReplayCommand(client *c)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t mvcc = 0;
|
||||||
|
if (c->argc >= 5)
|
||||||
|
{
|
||||||
|
if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK)
|
||||||
|
{
|
||||||
|
addReplyError(c, "Invalid MVCC Timestamp");
|
||||||
|
s_pstate->Cancel();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (FSameUuidNoNil(uuid, cserver.uuid))
|
if (FSameUuidNoNil(uuid, cserver.uuid))
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
@ -3368,6 +3438,11 @@ void replicaReplayCommand(client *c)
|
|||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
selectDb(c, cFake->db->id);
|
selectDb(c, cFake->db->id);
|
||||||
|
redisMaster *mi = MasterInfoFromClient(c);
|
||||||
|
if (mi != nullptr) // this should never be null but I'd prefer not to crash
|
||||||
|
{
|
||||||
|
mi->mvccLastSync = mvcc;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -3402,3 +3477,43 @@ void updateMasterAuth()
|
|||||||
mi->masteruser = zstrdup(cserver.default_masteruser);
|
mi->masteruser = zstrdup(cserver.default_masteruser);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void propagateMasterStaleKeys()
|
||||||
|
{
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(g_pserver->masters, &li);
|
||||||
|
robj *rgobj[2];
|
||||||
|
|
||||||
|
rgobj[0] = createEmbeddedStringObject("DEL", 3);
|
||||||
|
|
||||||
|
while ((ln = listNext(&li)) != nullptr)
|
||||||
|
{
|
||||||
|
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||||
|
if (mi->staleKeyMap != nullptr)
|
||||||
|
{
|
||||||
|
if (mi->master != nullptr)
|
||||||
|
{
|
||||||
|
for (auto &pair : *mi->staleKeyMap)
|
||||||
|
{
|
||||||
|
if (pair.second.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
client *replica = replicaFromMaster(mi);
|
||||||
|
if (replica == nullptr)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
for (auto &spkey : pair.second)
|
||||||
|
{
|
||||||
|
rgobj[1] = spkey.get();
|
||||||
|
replicationFeedSlave(replica, pair.first, rgobj, 2, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete mi->staleKeyMap;
|
||||||
|
mi->staleKeyMap = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
decrRefCount(rgobj[0]);
|
||||||
|
}
|
@ -1692,6 +1692,9 @@ void clientsCron(int iel) {
|
|||||||
fastlock_unlock(&c->lock);
|
fastlock_unlock(&c->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Free any pending clients */
|
||||||
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
@ -1700,9 +1703,9 @@ void clientsCron(int iel) {
|
|||||||
void databasesCron(void) {
|
void databasesCron(void) {
|
||||||
/* Expire keys by random sampling. Not required for slaves
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
* as master will synthesize DELs for us. */
|
* as master will synthesize DELs for us. */
|
||||||
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) {
|
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) {
|
||||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||||
} else if (listLength(g_pserver->masters)) {
|
} else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) {
|
||||||
expireSlaveKeys();
|
expireSlaveKeys();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1812,6 +1815,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
/* Update the time cache. */
|
/* Update the time cache. */
|
||||||
updateCachedTime();
|
updateCachedTime();
|
||||||
|
|
||||||
|
/* Unpause clients if enough time has elapsed */
|
||||||
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
g_pserver->hz = g_pserver->config_hz;
|
g_pserver->hz = g_pserver->config_hz;
|
||||||
/* Adapt the g_pserver->hz value to the number of configured clients. If we have
|
/* Adapt the g_pserver->hz value to the number of configured clients. If we have
|
||||||
* many clients, we want to call serverCron() with an higher frequency. */
|
* many clients, we want to call serverCron() with an higher frequency. */
|
||||||
@ -1819,7 +1825,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
while (listLength(g_pserver->clients) / g_pserver->hz >
|
while (listLength(g_pserver->clients) / g_pserver->hz >
|
||||||
MAX_CLIENTS_PER_CLOCK_TICK)
|
MAX_CLIENTS_PER_CLOCK_TICK)
|
||||||
{
|
{
|
||||||
g_pserver->hz *= 2;
|
g_pserver->hz += g_pserver->hz; // *= 2
|
||||||
if (g_pserver->hz > CONFIG_MAX_HZ) {
|
if (g_pserver->hz > CONFIG_MAX_HZ) {
|
||||||
g_pserver->hz = CONFIG_MAX_HZ;
|
g_pserver->hz = CONFIG_MAX_HZ;
|
||||||
break;
|
break;
|
||||||
@ -2019,9 +2025,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Clear the paused clients flag if needed. */
|
|
||||||
clientsArePaused(); /* Don't check return value, just use the side effect.*/
|
|
||||||
|
|
||||||
/* Replication cron function -- used to reconnect to master,
|
/* Replication cron function -- used to reconnect to master,
|
||||||
* detect transfer failures, start background RDB transfers and so forth. */
|
* detect transfer failures, start background RDB transfers and so forth. */
|
||||||
run_with_period(1000) replicationCron();
|
run_with_period(1000) replicationCron();
|
||||||
@ -2079,6 +2082,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
|||||||
processUnblockedClients(iel);
|
processUnblockedClients(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Unpause clients if enough time has elapsed */
|
||||||
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
||||||
clientsCron(iel);
|
clientsCron(iel);
|
||||||
|
|
||||||
@ -2099,7 +2105,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
/* Run a fast expire cycle (the called function will return
|
/* Run a fast expire cycle (the called function will return
|
||||||
* ASAP if a fast cycle is not needed). */
|
* ASAP if a fast cycle is not needed). */
|
||||||
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0)
|
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica))
|
||||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||||
|
|
||||||
/* Send all the slaves an ACK request if at least one client blocked
|
/* Send all the slaves an ACK request if at least one client blocked
|
||||||
@ -2313,6 +2319,7 @@ void initMasterInfo(redisMaster *master)
|
|||||||
|
|
||||||
master->repl_state = REPL_STATE_NONE;
|
master->repl_state = REPL_STATE_NONE;
|
||||||
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||||
|
master->mvccLastSync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initServerConfig(void) {
|
void initServerConfig(void) {
|
||||||
@ -2871,6 +2878,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
|||||||
pvar->cclients = 0;
|
pvar->cclients = 0;
|
||||||
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
||||||
pvar->current_client = nullptr;
|
pvar->current_client = nullptr;
|
||||||
|
pvar->clients_paused = 0;
|
||||||
if (pvar->el == NULL) {
|
if (pvar->el == NULL) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
@ -2967,7 +2975,6 @@ void initServer(void) {
|
|||||||
g_pserver->ready_keys = listCreate();
|
g_pserver->ready_keys = listCreate();
|
||||||
g_pserver->clients_waiting_acks = listCreate();
|
g_pserver->clients_waiting_acks = listCreate();
|
||||||
g_pserver->get_ack_from_slaves = 0;
|
g_pserver->get_ack_from_slaves = 0;
|
||||||
g_pserver->clients_paused = 0;
|
|
||||||
cserver.system_memory_size = zmalloc_get_memory_size();
|
cserver.system_memory_size = zmalloc_get_memory_size();
|
||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
@ -4088,7 +4095,7 @@ sds genRedisInfoString(const char *section) {
|
|||||||
g_pserver->port,
|
g_pserver->port,
|
||||||
(intmax_t)uptime,
|
(intmax_t)uptime,
|
||||||
(intmax_t)(uptime/(3600*24)),
|
(intmax_t)(uptime/(3600*24)),
|
||||||
g_pserver->hz,
|
g_pserver->hz.load(),
|
||||||
g_pserver->config_hz,
|
g_pserver->config_hz,
|
||||||
(unsigned long) lruclock,
|
(unsigned long) lruclock,
|
||||||
cserver.executable ? cserver.executable : "",
|
cserver.executable ? cserver.executable : "",
|
||||||
|
96
src/server.h
96
src/server.h
@ -54,6 +54,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <map>
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include <lua.h>
|
#include <lua.h>
|
||||||
@ -144,6 +145,87 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void decrRefCount(robj_roptr o);
|
||||||
|
void incrRefCount(robj_roptr o);
|
||||||
|
class robj_sharedptr
|
||||||
|
{
|
||||||
|
redisObject *m_ptr;
|
||||||
|
|
||||||
|
public:
|
||||||
|
robj_sharedptr()
|
||||||
|
: m_ptr(nullptr)
|
||||||
|
{}
|
||||||
|
robj_sharedptr(redisObject *ptr)
|
||||||
|
: m_ptr(ptr)
|
||||||
|
{
|
||||||
|
incrRefCount(ptr);
|
||||||
|
}
|
||||||
|
~robj_sharedptr()
|
||||||
|
{
|
||||||
|
if (m_ptr)
|
||||||
|
decrRefCount(m_ptr);
|
||||||
|
}
|
||||||
|
robj_sharedptr(const robj_sharedptr& other)
|
||||||
|
{
|
||||||
|
m_ptr = other.m_ptr;
|
||||||
|
incrRefCount(m_ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
robj_sharedptr(robj_sharedptr&& other)
|
||||||
|
{
|
||||||
|
m_ptr = other.m_ptr;
|
||||||
|
other.m_ptr = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
robj_sharedptr &operator=(const robj_sharedptr& other)
|
||||||
|
{
|
||||||
|
if (m_ptr)
|
||||||
|
decrRefCount(m_ptr);
|
||||||
|
m_ptr = other.m_ptr;
|
||||||
|
incrRefCount(m_ptr);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
robj_sharedptr &operator=(redisObject *ptr)
|
||||||
|
{
|
||||||
|
if (m_ptr)
|
||||||
|
decrRefCount(m_ptr);
|
||||||
|
m_ptr = ptr;
|
||||||
|
incrRefCount(m_ptr);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator==(const robj_sharedptr &other) const
|
||||||
|
{
|
||||||
|
return m_ptr == other.m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator!=(const robj_sharedptr &other) const
|
||||||
|
{
|
||||||
|
return m_ptr != other.m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
redisObject* operator->() const
|
||||||
|
{
|
||||||
|
return m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator!() const
|
||||||
|
{
|
||||||
|
return !m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
operator bool() const{
|
||||||
|
return !!m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
operator redisObject *()
|
||||||
|
{
|
||||||
|
return (redisObject*)m_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
redisObject *get() { return m_ptr; }
|
||||||
|
};
|
||||||
|
|
||||||
/* Error codes */
|
/* Error codes */
|
||||||
#define C_OK 0
|
#define C_OK 0
|
||||||
#define C_ERR -1
|
#define C_ERR -1
|
||||||
@ -433,6 +515,7 @@ public:
|
|||||||
#define SLAVE_CAPA_NONE 0
|
#define SLAVE_CAPA_NONE 0
|
||||||
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
||||||
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
||||||
|
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
|
||||||
|
|
||||||
/* Synchronous read timeout - replica side */
|
/* Synchronous read timeout - replica side */
|
||||||
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
||||||
@ -1390,9 +1473,11 @@ typedef struct rdbSaveInfo {
|
|||||||
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
|
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
|
||||||
long long repl_offset; /* Replication offset. */
|
long long repl_offset; /* Replication offset. */
|
||||||
int fForceSetKey;
|
int fForceSetKey;
|
||||||
|
uint64_t mvccMinThreshold;
|
||||||
|
struct redisMaster *mi;
|
||||||
} rdbSaveInfo;
|
} rdbSaveInfo;
|
||||||
|
|
||||||
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE}
|
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, nullptr}
|
||||||
|
|
||||||
struct malloc_stats {
|
struct malloc_stats {
|
||||||
size_t zmalloc_used;
|
size_t zmalloc_used;
|
||||||
@ -1426,6 +1511,7 @@ struct redisServerThreadVars {
|
|||||||
aeEventLoop *el;
|
aeEventLoop *el;
|
||||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||||
int ipfd_count; /* Used slots in ipfd[] */
|
int ipfd_count; /* Used slots in ipfd[] */
|
||||||
|
int clients_paused; /* True if clients are currently paused */
|
||||||
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
||||||
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
||||||
list *clients_pending_asyncwrite;
|
list *clients_pending_asyncwrite;
|
||||||
@ -1465,6 +1551,9 @@ struct redisMaster {
|
|||||||
|
|
||||||
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
||||||
/* After we've connected with our master use the UUID in g_pserver->master */
|
/* After we've connected with our master use the UUID in g_pserver->master */
|
||||||
|
uint64_t mvccLastSync;
|
||||||
|
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
|
||||||
|
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Const vars are not changed after worker threads are launched
|
// Const vars are not changed after worker threads are launched
|
||||||
@ -1518,7 +1607,7 @@ struct redisServer {
|
|||||||
int config_hz; /* Configured HZ value. May be different than
|
int config_hz; /* Configured HZ value. May be different than
|
||||||
the actual 'hz' field value if dynamic-hz
|
the actual 'hz' field value if dynamic-hz
|
||||||
is enabled. */
|
is enabled. */
|
||||||
int hz; /* serverCron() calls frequency in hertz */
|
std::atomic<int> hz; /* serverCron() calls frequency in hertz */
|
||||||
redisDb *db;
|
redisDb *db;
|
||||||
dict *commands; /* Command table */
|
dict *commands; /* Command table */
|
||||||
dict *orig_commands; /* Command table before command renaming. */
|
dict *orig_commands; /* Command table before command renaming. */
|
||||||
@ -1553,7 +1642,6 @@ struct redisServer {
|
|||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||||
int clients_paused; /* True if clients are currently paused */
|
|
||||||
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
||||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||||
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
|
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
|
||||||
@ -2042,6 +2130,7 @@ void disconnectSlavesExcept(unsigned char *uuid);
|
|||||||
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
|
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
|
||||||
void pauseClients(mstime_t duration);
|
void pauseClients(mstime_t duration);
|
||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
|
void unpauseClientsIfNecessary();
|
||||||
int processEventsWhileBlocked(int iel);
|
int processEventsWhileBlocked(int iel);
|
||||||
int handleClientsWithPendingWrites(int iel);
|
int handleClientsWithPendingWrites(int iel);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c);
|
||||||
@ -2154,6 +2243,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch
|
|||||||
int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg);
|
int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg);
|
||||||
int getDoubleFromObject(const robj *o, double *target);
|
int getDoubleFromObject(const robj *o, double *target);
|
||||||
int getLongLongFromObject(robj *o, long long *target);
|
int getLongLongFromObject(robj *o, long long *target);
|
||||||
|
int getUnsignedLongLongFromObject(robj *o, uint64_t *target);
|
||||||
int getLongDoubleFromObject(robj *o, long double *target);
|
int getLongDoubleFromObject(robj *o, long double *target);
|
||||||
int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg);
|
int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg);
|
||||||
const char *strEncoding(int encoding);
|
const char *strEncoding(int encoding);
|
||||||
|
@ -14,6 +14,8 @@ proc main {} {
|
|||||||
spawn_instance redis $::redis_base_port $::instances_count {
|
spawn_instance redis $::redis_base_port $::instances_count {
|
||||||
"cluster-enabled yes"
|
"cluster-enabled yes"
|
||||||
"appendonly yes"
|
"appendonly yes"
|
||||||
|
"testmode yes"
|
||||||
|
"server-threads 3"
|
||||||
}
|
}
|
||||||
run_tests
|
run_tests
|
||||||
cleanup
|
cleanup
|
||||||
|
@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
|||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
set master_port [srv 0 port]
|
set master_port [srv 0 port]
|
||||||
|
set master_pid [s process_id]
|
||||||
|
|
||||||
# Use a short replication timeout on the slave, so that if there
|
# Use a short replication timeout on the slave, so that if there
|
||||||
# are no bugs the timeout is triggered in a reasonable amount
|
# are no bugs the timeout is triggered in a reasonable amount
|
||||||
@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
|||||||
assert_equal {0} [$slave del testkey1]
|
assert_equal {0} [$slave del testkey1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Active replica expire propogates when source is down} {
|
||||||
|
$slave flushall
|
||||||
|
$slave set testkey2 foo
|
||||||
|
$slave set testkey1 foo
|
||||||
|
wait_for_condition 50 1000 {
|
||||||
|
[string match *foo* [$master get testkey1]]
|
||||||
|
} else {
|
||||||
|
fail "Replication failed to propogate"
|
||||||
|
}
|
||||||
|
$slave expire testkey1 2
|
||||||
|
assert_equal {1} [$slave wait 1 500] { "value should propogate
|
||||||
|
within 0.5 seconds" }
|
||||||
|
exec kill -SIGSTOP $slave_pid
|
||||||
|
after 3000
|
||||||
|
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
|
||||||
|
# about what is actually in the dict. The only way to know is with a count from info
|
||||||
|
assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"}
|
||||||
|
}
|
||||||
|
exec kill -SIGCONT $slave_pid
|
||||||
|
|
||||||
test {Active replica different databases} {
|
test {Active replica different databases} {
|
||||||
$master select 3
|
$master select 3
|
||||||
$master set testkey abcd
|
$master set testkey abcd
|
||||||
|
@ -238,7 +238,7 @@ start_server {tags {"bitops"}} {
|
|||||||
r set a "abcdefg"
|
r set a "abcdefg"
|
||||||
r bitop lshift x a 8
|
r bitop lshift x a 8
|
||||||
r get x
|
r get x
|
||||||
} "\x00abcdefg"
|
} "\000abcdefg"
|
||||||
|
|
||||||
test {BITOP lshift char} {
|
test {BITOP lshift char} {
|
||||||
r set a "\xAA"
|
r set a "\xAA"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user