Merge branch 'unstable' into redis_6_merge
Former-commit-id: 18a5f46b6138e8a975dda0ed4897d19eed756d24
This commit is contained in:
commit
fbaa46505c
@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
|||||||
cmd.proc = proc;
|
cmd.proc = proc;
|
||||||
cmd.clientData = arg;
|
cmd.clientData = arg;
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
return AE_OK;
|
return AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
AE_ASSERT(size == sizeof(cmd));
|
||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
|
33
src/aof.cpp
33
src/aof.cpp
@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void installAofRewriteEvent()
|
||||||
|
{
|
||||||
|
serverTL->fRetrySetAofEvent = false;
|
||||||
|
if (!g_pserver->aof_rewrite_pending) {
|
||||||
|
g_pserver->aof_rewrite_pending = true;
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
||||||
|
g_pserver->aof_rewrite_pending = false;
|
||||||
|
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
||||||
|
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
||||||
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
serverTL->fRetrySetAofEvent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
||||||
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
||||||
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
|
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
|
||||||
@ -165,14 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
|||||||
|
|
||||||
/* Install a file event to send data to the rewrite child if there is
|
/* Install a file event to send data to the rewrite child if there is
|
||||||
* not one already. */
|
* not one already. */
|
||||||
if (!g_pserver->aof_rewrite_pending) {
|
installAofRewriteEvent();
|
||||||
g_pserver->aof_rewrite_pending = true;
|
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
|
||||||
g_pserver->aof_rewrite_pending = false;
|
|
||||||
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
|
||||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
||||||
@ -346,6 +354,9 @@ void flushAppendOnlyFile(int force) {
|
|||||||
int sync_in_progress = 0;
|
int sync_in_progress = 0;
|
||||||
mstime_t latency;
|
mstime_t latency;
|
||||||
|
|
||||||
|
if (serverTL->fRetrySetAofEvent)
|
||||||
|
installAofRewriteEvent();
|
||||||
|
|
||||||
if (sdslen(g_pserver->aof_buf) == 0) {
|
if (sdslen(g_pserver->aof_buf) == 0) {
|
||||||
/* Check if we need to do fsync even the aof buffer is empty,
|
/* Check if we need to do fsync even the aof buffer is empty,
|
||||||
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
|
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
|
||||||
@ -1584,16 +1595,18 @@ error:
|
|||||||
|
|
||||||
void aofClosePipes(void) {
|
void aofClosePipes(void) {
|
||||||
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
||||||
aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
||||||
close (fdAofAckPipe);
|
close (fdAofAckPipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
|
|
||||||
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
||||||
close(fdAofWritePipe);
|
close(fdAofWritePipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
g_pserver->aof_pipe_write_data_to_child = -1;
|
g_pserver->aof_pipe_write_data_to_child = -1;
|
||||||
|
|
||||||
close(g_pserver->aof_pipe_read_data_from_parent);
|
close(g_pserver->aof_pipe_read_data_from_parent);
|
||||||
|
@ -296,6 +296,15 @@ int clusterLoadConfig(char *filename) {
|
|||||||
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
||||||
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100)
|
||||||
|
{
|
||||||
|
// Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server)
|
||||||
|
// we can increase the grouping of clients on a single thread within reason
|
||||||
|
cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes);
|
||||||
|
cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200);
|
||||||
|
serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold);
|
||||||
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
fmterr:
|
fmterr:
|
||||||
@ -624,9 +633,10 @@ void freeClusterLink(clusterLink *link) {
|
|||||||
if (link->node)
|
if (link->node)
|
||||||
link->node->link = NULL;
|
link->node->link = NULL;
|
||||||
link->node = nullptr;
|
link->node = nullptr;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||||
freeClusterLink(link);
|
freeClusterLink(link);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (link->conn) {
|
if (link->conn) {
|
||||||
|
@ -540,6 +540,11 @@ void loadServerConfigFromString(char *config) {
|
|||||||
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
||||||
cserver.fUsePro = true;
|
cserver.fUsePro = true;
|
||||||
break;
|
break;
|
||||||
|
} else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) {
|
||||||
|
cserver.thread_min_client_threshold = atoi(argv[1]);
|
||||||
|
if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) {
|
||||||
|
err = "min-thread-client must be between 0 and 400"; goto loaderr;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
bool FInReplicaReplay();
|
||||||
|
|
||||||
/* ================================ MULTI/EXEC ============================== */
|
/* ================================ MULTI/EXEC ============================== */
|
||||||
|
|
||||||
@ -174,7 +175,7 @@ void execCommand(client *c) {
|
|||||||
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
||||||
* both the AOF and the replication link will have the same consistency
|
* both the AOF and the replication link will have the same consistency
|
||||||
* and atomicity guarantees. */
|
* and atomicity guarantees. */
|
||||||
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
|
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) {
|
||||||
execCommandPropagateMulti(c);
|
execCommandPropagateMulti(c);
|
||||||
must_propagate = 1;
|
must_propagate = 1;
|
||||||
}
|
}
|
||||||
@ -190,7 +191,10 @@ void execCommand(client *c) {
|
|||||||
"no permission to execute the command or subcommand" :
|
"no permission to execute the command or subcommand" :
|
||||||
"no permission to touch the specified keys");
|
"no permission to touch the specified keys");
|
||||||
} else {
|
} else {
|
||||||
call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL);
|
int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL;
|
||||||
|
if (FInReplicaReplay())
|
||||||
|
flags &= ~CMD_CALL_PROPAGATE;
|
||||||
|
call(c,flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Commands may alter argc/argv, restore mstate. */
|
/* Commands may alter argc/argv, restore mstate. */
|
||||||
|
@ -1035,6 +1035,33 @@ int clientHasPendingReplies(client *c) {
|
|||||||
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int chooseBestThreadForAccept()
|
||||||
|
{
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
int rgcclients[MAX_EVENT_LOOPS] = {0};
|
||||||
|
|
||||||
|
listRewind(g_pserver->clients, &li);
|
||||||
|
while ((ln = listNext(&li)) != nullptr)
|
||||||
|
{
|
||||||
|
client *c = (client*)listNodeValue(ln);
|
||||||
|
if (c->iel < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
rgcclients[c->iel]++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ielMinLoad = 0;
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
if (rgcclients[iel] < cserver.thread_min_client_threshold)
|
||||||
|
return iel;
|
||||||
|
if (rgcclients[iel] < rgcclients[ielMinLoad])
|
||||||
|
ielMinLoad = iel;
|
||||||
|
}
|
||||||
|
return ielMinLoad;
|
||||||
|
}
|
||||||
|
|
||||||
void clientAcceptHandler(connection *conn) {
|
void clientAcceptHandler(connection *conn) {
|
||||||
client *c = (client*)connGetPrivateData(conn);
|
client *c = (client*)connGetPrivateData(conn);
|
||||||
|
|
||||||
@ -1176,7 +1203,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
|
|
||||||
if (!g_fTestMode)
|
if (!g_fTestMode)
|
||||||
{
|
{
|
||||||
// We always accept on the same thread
|
{
|
||||||
|
int ielTarget = chooseBestThreadForAccept();
|
||||||
|
if (ielTarget != ielCur)
|
||||||
|
{
|
||||||
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
|
||||||
|
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
|
||||||
|
zfree(szT);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (res == AE_OK)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LLocalThread:
|
LLocalThread:
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur);
|
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur);
|
||||||
@ -1193,10 +1235,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
goto LLocalThread;
|
goto LLocalThread;
|
||||||
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
memcpy(szT, cip, NET_IP_STR_LEN);
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
||||||
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel);
|
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel);
|
||||||
zfree(szT);
|
zfree(szT);
|
||||||
});
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
{
|
||||||
|
zfree(szT);
|
||||||
|
goto LLocalThread;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1240,8 +1287,25 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
|
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
|
||||||
|
|
||||||
|
aeAcquireLock();
|
||||||
|
int ielTarget = rand() % cserver.cthreads;
|
||||||
|
if (ielTarget == iel)
|
||||||
|
{
|
||||||
|
LLocalThread:
|
||||||
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
|
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
|
||||||
|
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget);
|
||||||
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
goto LLocalThread;
|
||||||
|
}
|
||||||
|
aeReleaseLock();
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeClientArgv(client *c) {
|
static void freeClientArgv(client *c) {
|
||||||
@ -2616,7 +2680,7 @@ NULL
|
|||||||
{
|
{
|
||||||
int iel = client->iel;
|
int iel = client->iel;
|
||||||
freeClientAsync(client);
|
freeClientAsync(client);
|
||||||
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
|
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK
|
||||||
freeClientsInAsyncFreeQueue(iel);
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,8 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <uuid/uuid.h>
|
#include <uuid/uuid.h>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||||
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
||||||
@ -354,6 +356,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||||
|
|
||||||
char szMvcc[128];
|
char szMvcc[128];
|
||||||
|
incrementMvccTstamp();
|
||||||
uint64_t mvccTstamp = getMvccTstamp();
|
uint64_t mvccTstamp = getMvccTstamp();
|
||||||
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
||||||
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
||||||
@ -437,6 +440,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
||||||
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
{
|
{
|
||||||
addReplyAsync(replica,shared.crlf);
|
addReplyAsync(replica,shared.crlf);
|
||||||
@ -2808,6 +2812,8 @@ void freeMasterInfo(redisMaster *mi)
|
|||||||
zfree(mi->masteruser);
|
zfree(mi->masteruser);
|
||||||
if (mi->repl_transfer_tmpfile)
|
if (mi->repl_transfer_tmpfile)
|
||||||
zfree(mi->repl_transfer_tmpfile);
|
zfree(mi->repl_transfer_tmpfile);
|
||||||
|
if (mi->clientFake)
|
||||||
|
freeClient(mi->clientFake);
|
||||||
delete mi->staleKeyMap;
|
delete mi->staleKeyMap;
|
||||||
if (mi->cached_master != nullptr)
|
if (mi->cached_master != nullptr)
|
||||||
freeClientAsync(mi->cached_master);
|
freeClientAsync(mi->cached_master);
|
||||||
@ -2886,6 +2892,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
|
|||||||
mi->master = NULL;
|
mi->master = NULL;
|
||||||
mi->repl_state = REPL_STATE_CONNECT;
|
mi->repl_state = REPL_STATE_CONNECT;
|
||||||
mi->repl_down_since = g_pserver->unixtime;
|
mi->repl_down_since = g_pserver->unixtime;
|
||||||
|
if (mi->clientFake) {
|
||||||
|
freeClient(mi->clientFake);
|
||||||
|
mi->clientFake = nullptr;
|
||||||
|
|
||||||
|
}
|
||||||
/* We lost connection with our master, don't disconnect slaves yet,
|
/* We lost connection with our master, don't disconnect slaves yet,
|
||||||
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
||||||
* the slaves only if we'll have to do a full resync with our master. */
|
* the slaves only if we'll have to do a full resync with our master. */
|
||||||
@ -3760,14 +3771,33 @@ public:
|
|||||||
return m_cnesting == 1;
|
return m_cnesting == 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redisMaster *getMi(client *c)
|
||||||
|
{
|
||||||
|
if (m_mi == nullptr)
|
||||||
|
m_mi = MasterInfoFromClient(c);
|
||||||
|
return m_mi;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nesting() const { return m_cnesting; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int m_cnesting = 0;
|
int m_cnesting = 0;
|
||||||
bool m_fCancelled = false;
|
bool m_fCancelled = false;
|
||||||
|
redisMaster *m_mi = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static thread_local ReplicaNestState *s_pstate = nullptr;
|
||||||
|
|
||||||
|
bool FInReplicaReplay()
|
||||||
|
{
|
||||||
|
return s_pstate != nullptr && s_pstate->nesting() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static std::unordered_map<std::string, uint64_t> g_mapmvcc;
|
||||||
|
|
||||||
void replicaReplayCommand(client *c)
|
void replicaReplayCommand(client *c)
|
||||||
{
|
{
|
||||||
static thread_local ReplicaNestState *s_pstate = nullptr;
|
|
||||||
if (s_pstate == nullptr)
|
if (s_pstate == nullptr)
|
||||||
s_pstate = new (MALLOC_LOCAL) ReplicaNestState;
|
s_pstate = new (MALLOC_LOCAL) ReplicaNestState;
|
||||||
|
|
||||||
@ -3791,9 +3821,10 @@ void replicaReplayCommand(client *c)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned char uuid[UUID_BINARY_LEN];
|
std::string uuid;
|
||||||
|
uuid.resize(UUID_BINARY_LEN);
|
||||||
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|
||||||
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
|
|| uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0)
|
||||||
{
|
{
|
||||||
addReplyError(c, "Expected UUID arg1");
|
addReplyError(c, "Expected UUID arg1");
|
||||||
s_pstate->Cancel();
|
s_pstate->Cancel();
|
||||||
@ -3829,7 +3860,7 @@ void replicaReplayCommand(client *c)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FSameUuidNoNil(uuid, cserver.uuid))
|
if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid))
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
s_pstate->Cancel();
|
s_pstate->Cancel();
|
||||||
@ -3839,33 +3870,57 @@ void replicaReplayCommand(client *c)
|
|||||||
if (!s_pstate->FPush())
|
if (!s_pstate->FPush())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
redisMaster *mi = s_pstate->getMi(c);
|
||||||
|
client *cFake = mi->clientFake;
|
||||||
|
if (mi->clientFakeNesting != s_pstate->nesting())
|
||||||
|
cFake = nullptr;
|
||||||
|
serverAssert(mi != nullptr);
|
||||||
|
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
|
||||||
|
{
|
||||||
|
s_pstate->Cancel();
|
||||||
|
s_pstate->Pop();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// OK We've recieved a command lets execute
|
// OK We've recieved a command lets execute
|
||||||
client *current_clientSave = serverTL->current_client;
|
client *current_clientSave = serverTL->current_client;
|
||||||
client *cFake = createClient(nullptr, c->iel);
|
if (cFake == nullptr)
|
||||||
|
cFake = createClient(nullptr, c->iel);
|
||||||
cFake->lock.lock();
|
cFake->lock.lock();
|
||||||
cFake->authenticated = c->authenticated;
|
cFake->authenticated = c->authenticated;
|
||||||
cFake->puser = c->puser;
|
cFake->puser = c->puser;
|
||||||
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
||||||
selectDb(cFake, c->db->id);
|
selectDb(cFake, c->db->id);
|
||||||
auto ccmdPrev = serverTL->commandsExecuted;
|
auto ccmdPrev = serverTL->commandsExecuted;
|
||||||
|
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
||||||
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||||
|
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
||||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||||
cFake->lock.unlock();
|
cFake->lock.unlock();
|
||||||
if (fExec)
|
if (fExec || cFake->flags & CLIENT_MULTI)
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
selectDb(c, cFake->db->id);
|
selectDb(c, cFake->db->id);
|
||||||
redisMaster *mi = MasterInfoFromClient(c);
|
if (mvcc > g_mapmvcc[uuid])
|
||||||
if (mi != nullptr) // this should never be null but I'd prefer not to crash
|
g_mapmvcc[uuid] = mvcc;
|
||||||
{
|
|
||||||
mi->mvccLastSync = mvcc;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf);
|
||||||
addReplyError(c, "command did not execute");
|
addReplyError(c, "command did not execute");
|
||||||
}
|
}
|
||||||
|
serverAssert(sdslen(cFake->querybuf) == 0);
|
||||||
|
if (cFake->flags & CLIENT_MULTI)
|
||||||
|
{
|
||||||
|
mi->clientFake = cFake;
|
||||||
|
mi->clientFakeNesting = s_pstate->nesting();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (mi->clientFake == cFake)
|
||||||
|
mi->clientFake = nullptr;
|
||||||
freeClient(cFake);
|
freeClient(cFake);
|
||||||
|
}
|
||||||
serverTL->current_client = current_clientSave;
|
serverTL->current_client = current_clientSave;
|
||||||
|
|
||||||
// call() will not propogate this for us, so we do so here
|
// call() will not propogate this for us, so we do so here
|
||||||
|
@ -1698,7 +1698,7 @@ void clientsCron(int iel) {
|
|||||||
/* The following functions do different service checks on the client.
|
/* The following functions do different service checks on the client.
|
||||||
* The protocol is that they return non-zero if the client was
|
* The protocol is that they return non-zero if the client was
|
||||||
* terminated. */
|
* terminated. */
|
||||||
if (clientsCronHandleTimeout(c,now)) goto LContinue;
|
if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock
|
||||||
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
|
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
|
||||||
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
|
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
|
||||||
LContinue:
|
LContinue:
|
||||||
@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
|||||||
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
||||||
pvar->current_client = nullptr;
|
pvar->current_client = nullptr;
|
||||||
pvar->clients_paused = 0;
|
pvar->clients_paused = 0;
|
||||||
|
pvar->fRetrySetAofEvent = false;
|
||||||
if (pvar->el == NULL) {
|
if (pvar->el == NULL) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
|
@ -1519,6 +1519,7 @@ struct redisServerThreadVars {
|
|||||||
struct fastlock lockPendingWrite { "thread pending write" };
|
struct fastlock lockPendingWrite { "thread pending write" };
|
||||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||||
long unsigned commandsExecuted = 0;
|
long unsigned commandsExecuted = 0;
|
||||||
|
bool fRetrySetAofEvent = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
struct redisMaster {
|
||||||
@ -1528,6 +1529,8 @@ struct redisMaster {
|
|||||||
int masterport; /* Port of master */
|
int masterport; /* Port of master */
|
||||||
client *cached_master; /* Cached master to be reused for PSYNC. */
|
client *cached_master; /* Cached master to be reused for PSYNC. */
|
||||||
client *master;
|
client *master;
|
||||||
|
client *clientFake;
|
||||||
|
int clientFakeNesting;
|
||||||
/* The following two fields is where we store master PSYNC replid/offset
|
/* The following two fields is where we store master PSYNC replid/offset
|
||||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||||
* the server->master client structure. */
|
* the server->master client structure. */
|
||||||
@ -1598,6 +1601,7 @@ struct redisServerConst {
|
|||||||
|
|
||||||
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
||||||
bool fUsePro = false;
|
bool fUsePro = false;
|
||||||
|
int thread_min_client_threshold = 50;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
|
@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Active replicas propogate transaction} {
|
||||||
|
$master set testkey 0
|
||||||
|
$master multi
|
||||||
|
$master incr testkey
|
||||||
|
$master incr testkey
|
||||||
|
after 5000
|
||||||
|
$master get testkey
|
||||||
|
$master exec
|
||||||
|
assert_equal 2 [$master get testkey]
|
||||||
|
after 500
|
||||||
|
wait_for_condition 50 500 {
|
||||||
|
[string match "2" [$slave get testkey]]
|
||||||
|
} else {
|
||||||
|
fail "Transaction failed to replicate"
|
||||||
|
}
|
||||||
|
$master flushall
|
||||||
|
}
|
||||||
|
|
||||||
test {Active replicas WAIT} {
|
test {Active replicas WAIT} {
|
||||||
# Test that wait succeeds since replicas should be syncronized
|
# Test that wait succeeds since replicas should be syncronized
|
||||||
$master set testkey foo
|
$master set testkey foo
|
||||||
|
74
tests/integration/replication-multimaster.tcl
Normal file
74
tests/integration/replication-multimaster.tcl
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
foreach topology {mesh ring} {
|
||||||
|
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
|
|
||||||
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
|
set R($j) [srv [expr 0-$j] client]
|
||||||
|
set R_host($j) [srv [expr 0-$j] host]
|
||||||
|
set R_port($j) [srv [expr 0-$j] port]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Initialize as mesh
|
||||||
|
if [string equal $topology "mesh"] {
|
||||||
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
|
for {set k 0} {$k < 4} {incr k} {
|
||||||
|
if $j!=$k {
|
||||||
|
$R($j) replicaof $R_host($k) $R_port($k)
|
||||||
|
after 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
#Else Ring
|
||||||
|
if [string equal $topology "ring"] {
|
||||||
|
$R(0) replicaof $R_host(3) $R_port(3)
|
||||||
|
after 100
|
||||||
|
$R(1) replicaof $R_host(0) $R_port(0)
|
||||||
|
after 100
|
||||||
|
$R(2) replicaof $R_host(1) $R_port(1)
|
||||||
|
after 100
|
||||||
|
$R(3) replicaof $R_host(2) $R_port(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
after 2000
|
||||||
|
|
||||||
|
test "$topology replicates to all nodes" {
|
||||||
|
$R(0) set testkey foo
|
||||||
|
after 500
|
||||||
|
assert_equal foo [$R(1) get testkey] "replicates to 1"
|
||||||
|
assert_equal foo [$R(2) get testkey] "replicates to 2"
|
||||||
|
}
|
||||||
|
|
||||||
|
test "$topology replicates only once" {
|
||||||
|
$R(0) set testkey 1
|
||||||
|
after 500
|
||||||
|
$R(1) incr testkey
|
||||||
|
after 500
|
||||||
|
$R(2) incr testkey
|
||||||
|
after 500
|
||||||
|
assert_equal 3 [$R(0) get testkey]
|
||||||
|
assert_equal 3 [$R(1) get testkey]
|
||||||
|
assert_equal 3 [$R(2) get testkey]
|
||||||
|
assert_equal 3 [$R(3) get testkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "$topology transaction replicates only once" {
|
||||||
|
for {set j 0} {$j < 1000} {incr j} {
|
||||||
|
$R(0) set testkey 1
|
||||||
|
$R(0) multi
|
||||||
|
$R(0) incr testkey
|
||||||
|
$R(0) incr testkey
|
||||||
|
$R(0) exec
|
||||||
|
after 1
|
||||||
|
assert_equal 3 [$R(0) get testkey] "node 0"
|
||||||
|
assert_equal 3 [$R(1) get testkey] "node 1"
|
||||||
|
assert_equal 3 [$R(2) get testkey] "node 2"
|
||||||
|
assert_equal 3 [$R(3) get testkey] "node 3"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -45,6 +45,7 @@ set ::all_tests {
|
|||||||
integration/replication-4
|
integration/replication-4
|
||||||
integration/replication-psync
|
integration/replication-psync
|
||||||
integration/replication-active
|
integration/replication-active
|
||||||
|
integration/replication-multimaster
|
||||||
integration/aof
|
integration/aof
|
||||||
integration/rdb
|
integration/rdb
|
||||||
integration/convert-zipmap-hash-on-load
|
integration/convert-zipmap-hash-on-load
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
||||||
|
|
||||||
test {RREPLAY use current db} {
|
test {RREPLAY use current db} {
|
||||||
r debug force-master flagonly
|
r debug force-master yes
|
||||||
r select 4
|
r select 4
|
||||||
r set dbnum invalid
|
r set dbnum invalid
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
||||||
@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
|||||||
reconnect
|
reconnect
|
||||||
|
|
||||||
test {RREPLAY db different} {
|
test {RREPLAY db different} {
|
||||||
r debug force-master flagonly
|
r debug force-master yes
|
||||||
r select 4
|
r select 4
|
||||||
r set testkey four
|
r set testkey four
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user