Merge branch 'keydbpro' into keydbpro_collab
Former-commit-id: ecc69952dfd1f145e1aff12bca56a4b4e102d669
This commit is contained in:
commit
d55bcf23bd
12
src/Makefile
12
src/Makefile
@ -15,7 +15,7 @@
|
|||||||
release_hdr := $(shell sh -c './mkreleasehdr.sh')
|
release_hdr := $(shell sh -c './mkreleasehdr.sh')
|
||||||
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
|
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
|
||||||
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
|
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
|
||||||
OPTIMIZATION?=-O2
|
OPTIMIZATION?=-O2 -flto
|
||||||
DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram rocksdb
|
DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram rocksdb
|
||||||
NODEPS:=clean distclean
|
NODEPS:=clean distclean
|
||||||
|
|
||||||
@ -349,9 +349,9 @@ endif
|
|||||||
|
|
||||||
REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX)
|
REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX)
|
||||||
REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX)
|
REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX)
|
||||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
||||||
REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX)
|
REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX)
|
||||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
||||||
REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
|
REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
|
||||||
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
|
||||||
REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX)
|
REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX)
|
||||||
@ -435,6 +435,12 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ
|
|||||||
# Because the jemalloc.h header is generated as a part of the jemalloc build,
|
# Because the jemalloc.h header is generated as a part of the jemalloc build,
|
||||||
# building it should complete before building any other object. Instead of
|
# building it should complete before building any other object. Instead of
|
||||||
# depending on a single artifact, build all dependencies first.
|
# depending on a single artifact, build all dependencies first.
|
||||||
|
motd_client.o: motd.cpp .make-prerequisites
|
||||||
|
$(REDIS_CXX) -MMD -o motd_client.o -c $< -DCLIENT -fno-lto
|
||||||
|
|
||||||
|
motd_server.o: motd.cpp .make-prerequisites
|
||||||
|
$(REDIS_CXX) -MMD -o motd_server.o -c $< -DSERVER
|
||||||
|
|
||||||
%.o: %.c .make-prerequisites
|
%.o: %.c .make-prerequisites
|
||||||
$(REDIS_CC) -MMD -o $@ -c $<
|
$(REDIS_CC) -MMD -o $@ -c $<
|
||||||
|
|
||||||
|
@ -2726,6 +2726,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
serverAssert(sdsKey != nullptr);
|
serverAssert(sdsKey != nullptr);
|
||||||
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid
|
||||||
serverAssert(m_refCount == 0);
|
serverAssert(m_refCount == 0);
|
||||||
|
if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr)
|
||||||
|
return;
|
||||||
std::unique_lock<fastlock> ul(g_expireLock);
|
std::unique_lock<fastlock> ul(g_expireLock);
|
||||||
|
|
||||||
// First see if the key can be obtained from a snapshot
|
// First see if the key can be obtained from a snapshot
|
||||||
|
@ -355,6 +355,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) {
|
|||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsigned long getClientReplicationBacklogSharedUsage(client *c);
|
||||||
|
|
||||||
/* We don't want to count AOF buffers and slaves output buffers as
|
/* We don't want to count AOF buffers and slaves output buffers as
|
||||||
* used memory: the eviction should use mostly data size. This function
|
* used memory: the eviction should use mostly data size. This function
|
||||||
* returns the sum of AOF and slaves buffer. */
|
* returns the sum of AOF and slaves buffer. */
|
||||||
@ -371,9 +373,15 @@ size_t freeMemoryGetNotCountedMemory(void) {
|
|||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
std::unique_lock<fastlock>(replica->lock);
|
std::unique_lock<fastlock>(replica->lock);
|
||||||
overhead += getClientOutputBufferMemoryUsage(replica);
|
/* we don't wish to multiple count the replication backlog shared usage */
|
||||||
|
overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* also don't count the replication backlog memory
|
||||||
|
* that's where the replication clients get their memory from */
|
||||||
|
overhead += g_pserver->repl_backlog_size;
|
||||||
|
|
||||||
if (g_pserver->aof_state != AOF_OFF) {
|
if (g_pserver->aof_state != AOF_OFF) {
|
||||||
overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize();
|
overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize();
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,11 @@
|
|||||||
|
#ifdef CLIENT
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include <sdscompat.h>
|
#include <sdscompat.h>
|
||||||
#include <sds.h>
|
#include <sds.h>
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
#include "sds.h"
|
||||||
|
#endif
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
@ -15,6 +19,7 @@ extern "C" {
|
|||||||
#ifdef MOTD
|
#ifdef MOTD
|
||||||
#include <curl/curl.h>
|
#include <curl/curl.h>
|
||||||
|
|
||||||
|
#ifdef CLIENT
|
||||||
extern "C" {
|
extern "C" {
|
||||||
__attribute__ ((weak)) hisds hi_sdscatlen(hisds s, const void *t, size_t len) {
|
__attribute__ ((weak)) hisds hi_sdscatlen(hisds s, const void *t, size_t len) {
|
||||||
return sdscatlen(s, t, len);
|
return sdscatlen(s, t, len);
|
||||||
@ -23,6 +28,7 @@ __attribute__ ((weak)) hisds hi_sdscat(hisds s, const char *t) {
|
|||||||
return sdscat(s, t);
|
return sdscat(s, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static const char *szMotdCachePath()
|
static const char *szMotdCachePath()
|
||||||
{
|
{
|
||||||
|
@ -136,6 +136,7 @@ client *createClient(connection *conn, int iel) {
|
|||||||
client_id = g_pserver->next_client_id.fetch_add(1);
|
client_id = g_pserver->next_client_id.fetch_add(1);
|
||||||
c->iel = iel;
|
c->iel = iel;
|
||||||
c->id = client_id;
|
c->id = client_id;
|
||||||
|
sprintf(c->lock.szName, "client %lu", client_id);
|
||||||
c->resp = 2;
|
c->resp = 2;
|
||||||
c->conn = conn;
|
c->conn = conn;
|
||||||
c->name = NULL;
|
c->name = NULL;
|
||||||
@ -157,6 +158,7 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
c->fPendingAsyncWriteHandler = FALSE;
|
c->fPendingAsyncWriteHandler = FALSE;
|
||||||
|
c->fPendingReplicaWrite = FALSE;
|
||||||
c->ctime = c->lastinteraction = g_pserver->unixtime;
|
c->ctime = c->lastinteraction = g_pserver->unixtime;
|
||||||
/* If the default user does not require authentication, the user is
|
/* If the default user does not require authentication, the user is
|
||||||
* directly authenticated. */
|
* directly authenticated. */
|
||||||
@ -234,6 +236,7 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
/* Schedule the client to write the output buffers to the socket only
|
/* Schedule the client to write the output buffers to the socket only
|
||||||
* if not already done and, for slaves, if the replica can actually receive
|
* if not already done and, for slaves, if the replica can actually receive
|
||||||
* writes at this stage. */
|
* writes at this stage. */
|
||||||
|
|
||||||
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||||
(c->replstate == REPL_STATE_NONE ||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
@ -315,7 +318,7 @@ int prepareClientToWrite(client *c) {
|
|||||||
|
|
||||||
/* Schedule the client to write the output buffers to the socket, unless
|
/* Schedule the client to write the output buffers to the socket, unless
|
||||||
* it should already be setup to do so (it has already pending data). */
|
* it should already be setup to do so (it has already pending data). */
|
||||||
if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c);
|
||||||
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
||||||
|
|
||||||
/* Authorize the caller to queue in the output buffer of this client. */
|
/* Authorize the caller to queue in the output buffer of this client. */
|
||||||
@ -1764,6 +1767,8 @@ client *lookupClientByID(uint64_t id) {
|
|||||||
return (c == raxNotFound) ? NULL : c;
|
return (c == raxNotFound) ? NULL : c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long long getReplIndexFromOffset(long long offset);
|
||||||
|
|
||||||
/* Write data in output buffers to client. Return C_OK if the client
|
/* Write data in output buffers to client. Return C_OK if the client
|
||||||
* is still valid after the call, C_ERR if it was freed because of some
|
* is still valid after the call, C_ERR if it was freed because of some
|
||||||
* error. If handler_installed is set, it will attempt to clear the
|
* error. If handler_installed is set, it will attempt to clear the
|
||||||
@ -1781,8 +1786,9 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
serverAssertDebug(FCorrectThread(c));
|
serverAssertDebug(FCorrectThread(c));
|
||||||
|
|
||||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
|
|
||||||
while(clientHasPendingReplies(c)) {
|
while(clientHasPendingReplies(c)) {
|
||||||
|
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
|
||||||
if (c->bufpos > 0) {
|
if (c->bufpos > 0) {
|
||||||
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||||
if (nwritten <= 0) break;
|
if (nwritten <= 0) break;
|
||||||
@ -1790,7 +1796,7 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
|
|
||||||
/* If the buffer was sent, set bufpos to zero to continue with
|
/* If the buffer was sent, set bufpos to zero to continue with
|
||||||
* the remainder of the reply. */
|
* the remainder of the reply. */
|
||||||
if ((int)c->sentlen == c->bufpos) {
|
if ((int)c->sentlen == c->bufpos) {
|
||||||
c->bufpos = 0;
|
c->bufpos = 0;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
@ -1836,7 +1842,49 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
zmalloc_used_memory() < g_pserver->maxmemory) &&
|
zmalloc_used_memory() < g_pserver->maxmemory) &&
|
||||||
!(c->flags & CLIENT_SLAVE)) break;
|
!(c->flags & CLIENT_SLAVE)) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We can only directly read from the replication backlog if the client
|
||||||
|
is a replica, so only attempt to do so if that's the case. */
|
||||||
|
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
||||||
|
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
|
||||||
|
serverAssert(c->repl_curr_off != -1);
|
||||||
|
|
||||||
|
if (c->repl_curr_off != c->repl_end_off){
|
||||||
|
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
|
||||||
|
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
|
||||||
|
* in the event of a wrap around write */
|
||||||
|
/* normal case with no wrap around */
|
||||||
|
if (repl_end_idx >= repl_curr_idx){
|
||||||
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx);
|
||||||
|
/* wrap around case */
|
||||||
|
} else {
|
||||||
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx);
|
||||||
|
/* only attempt wrapping if we write the correct number of bytes */
|
||||||
|
if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){
|
||||||
|
nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx);
|
||||||
|
if (nwritten2ndStage != -1)
|
||||||
|
nwritten += nwritten2ndStage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* only increment bytes if an error didn't occur */
|
||||||
|
if (nwritten > 0){
|
||||||
|
totwritten += nwritten;
|
||||||
|
c->repl_curr_off += nwritten;
|
||||||
|
serverAssert(c->repl_curr_off <= c->repl_end_off);
|
||||||
|
/* If the client's current offset matches the last offset it can read from, there is no pending write */
|
||||||
|
if (c->repl_curr_off == c->repl_end_off){
|
||||||
|
c->fPendingReplicaWrite = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If the second part of a write didn't go through, we still need to register that */
|
||||||
|
if (nwritten2ndStage == -1) nwritten = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
g_pserver->stat_net_output_bytes += totwritten;
|
g_pserver->stat_net_output_bytes += totwritten;
|
||||||
if (nwritten == -1) {
|
if (nwritten == -1) {
|
||||||
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
|
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
|
||||||
@ -1854,7 +1902,7 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
* We just rely on data / pings received for timeout detection. */
|
* We just rely on data / pings received for timeout detection. */
|
||||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
||||||
}
|
}
|
||||||
if (!clientHasPendingReplies(c)) {
|
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
||||||
|
|
||||||
@ -1898,27 +1946,37 @@ void ProcessPendingAsyncWrites()
|
|||||||
serverAssert(c->fPendingAsyncWrite);
|
serverAssert(c->fPendingAsyncWrite);
|
||||||
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
|
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
|
||||||
{
|
{
|
||||||
zfree(c->replyAsync);
|
if (c->replyAsync != nullptr){
|
||||||
c->replyAsync = nullptr;
|
zfree(c->replyAsync);
|
||||||
|
c->replyAsync = nullptr;
|
||||||
|
}
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int size = c->replyAsync->used;
|
/* since writes from master to replica can come directly from the replication backlog,
|
||||||
|
* writes may have been signalled without having been copied to the replyAsync buffer,
|
||||||
|
* thus causing the buffer to be NULL */
|
||||||
|
if (c->replyAsync != nullptr){
|
||||||
|
int size = c->replyAsync->used;
|
||||||
|
|
||||||
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
|
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
|
||||||
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
|
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
|
||||||
c->bufpos += size;
|
c->bufpos += size;
|
||||||
} else {
|
} else {
|
||||||
c->reply_bytes += c->replyAsync->size;
|
c->reply_bytes += c->replyAsync->size;
|
||||||
listAddNodeTail(c->reply, c->replyAsync);
|
listAddNodeTail(c->reply, c->replyAsync);
|
||||||
|
c->replyAsync = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
zfree(c->replyAsync);
|
||||||
c->replyAsync = nullptr;
|
c->replyAsync = nullptr;
|
||||||
|
} else {
|
||||||
|
/* Only replicas should have empty async reply buffers */
|
||||||
|
serverAssert(c->flags & CLIENT_SLAVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
zfree(c->replyAsync);
|
|
||||||
c->replyAsync = nullptr;
|
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
|
|
||||||
// Now install the write event handler
|
// Now install the write event handler
|
||||||
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
||||||
/* For the fsync=always policy, we want that a given FD is never
|
/* For the fsync=always policy, we want that a given FD is never
|
||||||
@ -1931,17 +1989,17 @@ void ProcessPendingAsyncWrites()
|
|||||||
{
|
{
|
||||||
ae_flags |= AE_BARRIER;
|
ae_flags |= AE_BARRIER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!((c->replstate == REPL_STATE_NONE ||
|
if (!((c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||||
if (c->flags & CLIENT_CLOSE_ASAP)
|
if (c->flags & CLIENT_CLOSE_ASAP)
|
||||||
continue; // we will never write this so don't post an op
|
continue; // we will never write this so don't post an op
|
||||||
|
|
||||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||||
|
|
||||||
if (FCorrectThread(c))
|
if (FCorrectThread(c))
|
||||||
{
|
{
|
||||||
prepareClientToWrite(c); // queue an event
|
prepareClientToWrite(c); // queue an event
|
||||||
@ -2024,9 +2082,10 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
|
|
||||||
/* If after the synchronous writes above we still have data to
|
/* If after the synchronous writes above we still have data to
|
||||||
* output to the client, we need to install the writable handler. */
|
* output to the client, we need to install the writable handler. */
|
||||||
if (clientHasPendingReplies(c)) {
|
if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) {
|
||||||
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR)
|
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3647,6 +3706,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* In the case of a replica client, writes to said replica are using data from the replication backlog
|
||||||
|
* as opposed to it's own internal buffer, this number should keep track of that */
|
||||||
|
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
|
||||||
|
return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function returns the number of bytes that Redis is
|
/* This function returns the number of bytes that Redis is
|
||||||
* using to store the reply still not read by the client.
|
* using to store the reply still not read by the client.
|
||||||
*
|
*
|
||||||
@ -3655,9 +3720,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
|||||||
* enforcing the client output length limits. */
|
* enforcing the client output length limits. */
|
||||||
unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
||||||
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
|
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
|
||||||
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0);
|
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* Get the class of a client, used in order to enforce limits to different
|
/* Get the class of a client, used in order to enforce limits to different
|
||||||
* classes of clients.
|
* classes of clients.
|
||||||
*
|
*
|
||||||
|
@ -189,6 +189,7 @@ void createReplicationBacklog(void) {
|
|||||||
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
|
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
|
||||||
g_pserver->repl_backlog_histlen = 0;
|
g_pserver->repl_backlog_histlen = 0;
|
||||||
g_pserver->repl_backlog_idx = 0;
|
g_pserver->repl_backlog_idx = 0;
|
||||||
|
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
/* We don't have any data inside our buffer, but virtually the first
|
/* We don't have any data inside our buffer, but virtually the first
|
||||||
* byte we have is the next byte that will be generated for the
|
* byte we have is the next byte that will be generated for the
|
||||||
@ -200,6 +201,15 @@ void createReplicationBacklog(void) {
|
|||||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Compute the corresponding index from a replication backlog offset
|
||||||
|
* Since this computation needs the size of the replication backlog,
|
||||||
|
* you need to have the repl_backlog_lock in order to call it */
|
||||||
|
long long getReplIndexFromOffset(long long offset){
|
||||||
|
serverAssert(g_pserver->repl_backlog_lock.fOwnLock());
|
||||||
|
long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size;
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called when the user modifies the replication backlog
|
/* This function is called when the user modifies the replication backlog
|
||||||
* size at runtime. It is up to the function to both update the
|
* size at runtime. It is up to the function to both update the
|
||||||
* g_pserver->repl_backlog_size and to resize the buffer and setup it so that
|
* g_pserver->repl_backlog_size and to resize the buffer and setup it so that
|
||||||
@ -211,6 +221,8 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
||||||
if (g_pserver->repl_backlog_size == newsize) return;
|
if (g_pserver->repl_backlog_size == newsize) return;
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
||||||
|
|
||||||
if (g_pserver->repl_backlog != NULL) {
|
if (g_pserver->repl_backlog != NULL) {
|
||||||
/* What we actually do is to flush the old buffer and realloc a new
|
/* What we actually do is to flush the old buffer and realloc a new
|
||||||
* empty one. It will refill with new data incrementally.
|
* empty one. It will refill with new data incrementally.
|
||||||
@ -218,19 +230,23 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
* worse often we need to alloc additional space before freeing the
|
* worse often we need to alloc additional space before freeing the
|
||||||
* old buffer. */
|
* old buffer. */
|
||||||
|
|
||||||
if (g_pserver->repl_batch_idxStart >= 0) {
|
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
||||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
long long earliest_off = g_pserver->repl_lowest_off.load();
|
||||||
newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart);
|
|
||||||
char *backlog = (char*)zmalloc(newsize);
|
|
||||||
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart;
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
if (earliest_off != -1) {
|
||||||
auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||||
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog);
|
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
||||||
|
char *backlog = (char*)zmalloc(newsize);
|
||||||
|
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
|
||||||
|
long long earliest_idx = getReplIndexFromOffset(earliest_off);
|
||||||
|
|
||||||
|
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
||||||
|
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
||||||
|
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||||
} else {
|
} else {
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
|
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
||||||
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
|
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1);
|
||||||
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
||||||
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
|
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||||
@ -238,7 +254,10 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
zfree(g_pserver->repl_backlog);
|
zfree(g_pserver->repl_backlog);
|
||||||
g_pserver->repl_backlog = backlog;
|
g_pserver->repl_backlog = backlog;
|
||||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
||||||
g_pserver->repl_batch_idxStart = 0;
|
g_pserver->repl_batch_idxStart -= earliest_idx;
|
||||||
|
if (g_pserver->repl_batch_idxStart < 0)
|
||||||
|
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
|
||||||
|
g_pserver->repl_backlog_start = earliest_off;
|
||||||
} else {
|
} else {
|
||||||
zfree(g_pserver->repl_backlog);
|
zfree(g_pserver->repl_backlog);
|
||||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
||||||
@ -246,11 +265,13 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
g_pserver->repl_backlog_idx = 0;
|
g_pserver->repl_backlog_idx = 0;
|
||||||
/* Next byte we have is... the next since the buffer is empty. */
|
/* Next byte we have is... the next since the buffer is empty. */
|
||||||
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
||||||
|
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
g_pserver->repl_backlog_size = newsize;
|
g_pserver->repl_backlog_size = newsize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void freeReplicationBacklog(void) {
|
void freeReplicationBacklog(void) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
listIter li;
|
listIter li;
|
||||||
@ -273,12 +294,20 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
const unsigned char *p = (const unsigned char*)ptr;
|
const unsigned char *p = (const unsigned char*)ptr;
|
||||||
|
|
||||||
|
|
||||||
if (g_pserver->repl_batch_idxStart >= 0) {
|
if (g_pserver->repl_batch_idxStart >= 0) {
|
||||||
long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
|
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
|
||||||
|
long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
|
||||||
|
if (lower_bound == -1)
|
||||||
|
lower_bound = g_pserver->repl_batch_offStart;
|
||||||
|
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
if (minimumsize > g_pserver->repl_backlog_size) {
|
||||||
flushReplBacklogToClients();
|
flushReplBacklogToClients();
|
||||||
serverAssert(g_pserver->master_repl_offset == g_pserver->repl_batch_offStart);
|
lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
|
||||||
minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
|
if (lower_bound == -1)
|
||||||
|
lower_bound = g_pserver->repl_batch_offStart;
|
||||||
|
|
||||||
|
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||||
|
|
||||||
if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) {
|
if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) {
|
||||||
// This is an emergency overflow, we better resize to fit
|
// This is an emergency overflow, we better resize to fit
|
||||||
@ -293,6 +322,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
|
|
||||||
/* This is a circular buffer, so write as much data we can at every
|
/* This is a circular buffer, so write as much data we can at every
|
||||||
* iteration and rewind the "idx" index if we reach the limit. */
|
* iteration and rewind the "idx" index if we reach the limit. */
|
||||||
|
|
||||||
while(len) {
|
while(len) {
|
||||||
size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx;
|
size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx;
|
||||||
if (thislen > len) thislen = len;
|
if (thislen > len) thislen = len;
|
||||||
@ -479,7 +509,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
|
|||||||
if (fSendRaw)
|
if (fSendRaw)
|
||||||
{
|
{
|
||||||
char aux[LONG_STR_SIZE+3];
|
char aux[LONG_STR_SIZE+3];
|
||||||
|
|
||||||
/* Add the multi bulk reply length. */
|
/* Add the multi bulk reply length. */
|
||||||
aux[0] = '*';
|
aux[0] = '*';
|
||||||
int multilen = ll2string(aux+1,sizeof(aux)-1,argc);
|
int multilen = ll2string(aux+1,sizeof(aux)-1,argc);
|
||||||
@ -653,15 +682,19 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int prepareClientToWrite(client *c);
|
||||||
|
|
||||||
/* Feed the replica 'c' with the replication backlog starting from the
|
/* Feed the replica 'c' with the replication backlog starting from the
|
||||||
* specified 'offset' up to the end of the backlog. */
|
* specified 'offset' up to the end of the backlog. */
|
||||||
long long addReplyReplicationBacklog(client *c, long long offset) {
|
long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||||
long long j, skip, len;
|
long long skip, len;
|
||||||
|
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
|
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_histlen == 0) {
|
if (g_pserver->repl_backlog_histlen == 0) {
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
|
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
|
||||||
|
c->repl_curr_off = g_pserver->master_repl_offset;
|
||||||
|
c->repl_end_off = g_pserver->master_repl_offset;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -678,31 +711,20 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
|||||||
skip = offset - g_pserver->repl_backlog_off;
|
skip = offset - g_pserver->repl_backlog_off;
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
|
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
|
||||||
|
|
||||||
/* Point j to the oldest byte, that is actually our
|
|
||||||
* g_pserver->repl_backlog_off byte. */
|
|
||||||
j = (g_pserver->repl_backlog_idx +
|
|
||||||
(g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) %
|
|
||||||
g_pserver->repl_backlog_size;
|
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
|
|
||||||
|
|
||||||
/* Discard the amount of data to seek to the specified 'offset'. */
|
|
||||||
j = (j + skip) % g_pserver->repl_backlog_size;
|
|
||||||
|
|
||||||
/* Feed replica with data. Since it is a circular buffer we have to
|
|
||||||
* split the reply in two parts if we are cross-boundary. */
|
|
||||||
len = g_pserver->repl_backlog_histlen - skip;
|
len = g_pserver->repl_backlog_histlen - skip;
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
||||||
while(len) {
|
|
||||||
long long thislen =
|
|
||||||
((g_pserver->repl_backlog_size - j) < len) ?
|
|
||||||
(g_pserver->repl_backlog_size - j) : len;
|
|
||||||
|
|
||||||
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
|
/* Set the start and end offsets for the replica so that a future
|
||||||
addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen));
|
* writeToClient will send the backlog from the given offset to
|
||||||
len -= thislen;
|
* the current end of the backlog to said replica */
|
||||||
j = 0;
|
c->repl_curr_off = offset - 1;
|
||||||
}
|
c->repl_end_off = g_pserver->master_repl_offset;
|
||||||
return g_pserver->repl_backlog_histlen - skip;
|
|
||||||
|
/* Force the partial sync to be queued */
|
||||||
|
prepareClientToWrite(c);
|
||||||
|
c->fPendingReplicaWrite = true;
|
||||||
|
|
||||||
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return the offset to provide as reply to the PSYNC command received
|
/* Return the offset to provide as reply to the PSYNC command received
|
||||||
@ -735,6 +757,10 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
|
|||||||
|
|
||||||
replica->psync_initial_offset = offset;
|
replica->psync_initial_offset = offset;
|
||||||
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
||||||
|
|
||||||
|
replica->repl_curr_off = offset;
|
||||||
|
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
/* We are going to accumulate the incremental changes for this
|
/* We are going to accumulate the incremental changes for this
|
||||||
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
||||||
* a SELECT statement in the replication stream. */
|
* a SELECT statement in the replication stream. */
|
||||||
@ -1357,6 +1383,7 @@ void replconfCommand(client *c) {
|
|||||||
* 4) Update the count of "good replicas". */
|
* 4) Update the count of "good replicas". */
|
||||||
void putSlaveOnline(client *replica) {
|
void putSlaveOnline(client *replica) {
|
||||||
replica->replstate = SLAVE_STATE_ONLINE;
|
replica->replstate = SLAVE_STATE_ONLINE;
|
||||||
|
|
||||||
replica->repl_put_online_on_ack = 0;
|
replica->repl_put_online_on_ack = 0;
|
||||||
replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
|
replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
|
||||||
|
|
||||||
@ -3059,6 +3086,11 @@ void syncWithMaster(connection *conn) {
|
|||||||
|
|
||||||
if (psync_result == PSYNC_CONTINUE) {
|
if (psync_result == PSYNC_CONTINUE) {
|
||||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
|
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
|
||||||
|
/* Reset the bulklen information in case it is lingering from the last connection
|
||||||
|
* The partial sync will start from the beginning of a command so these should be reset */
|
||||||
|
mi->master->reqtype = 0;
|
||||||
|
mi->master->multibulklen = 0;
|
||||||
|
mi->master->bulklen = -1;
|
||||||
if (cserver.supervised_mode == SUPERVISED_SYSTEMD) {
|
if (cserver.supervised_mode == SUPERVISED_SYSTEMD) {
|
||||||
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
|
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
|
||||||
}
|
}
|
||||||
@ -4898,15 +4930,19 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
|
|||||||
}
|
}
|
||||||
|
|
||||||
void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
||||||
|
|
||||||
void flushReplBacklogToClients()
|
void flushReplBacklogToClients()
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
/* If we have the repl backlog lock, we will deadlock */
|
||||||
|
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
|
||||||
if (g_pserver->repl_batch_offStart < 0)
|
if (g_pserver->repl_batch_offStart < 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
||||||
bool fAsyncWrite = false;
|
bool fAsyncWrite = false;
|
||||||
|
long long min_offset = LONG_LONG_MAX;
|
||||||
|
// Ensure no overflow
|
||||||
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
|
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
|
||||||
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
|
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
|
||||||
// We overflowed
|
// We overflowed
|
||||||
@ -4930,33 +4966,36 @@ void flushReplBacklogToClients()
|
|||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->slaves, &li);
|
listRewind(g_pserver->slaves, &li);
|
||||||
|
/* We don't actually write any data in this function since we send data
|
||||||
|
* directly from the replication backlog to replicas in writeToClient.
|
||||||
|
*
|
||||||
|
* What we do however, is set the end offset of each replica here. This way,
|
||||||
|
* future calls to writeToClient will know up to where in the replication
|
||||||
|
* backlog is valid for writing. */
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
|
|
||||||
if (!canFeedReplicaReplBuffer(replica)) continue;
|
if (!canFeedReplicaReplBuffer(replica)) continue;
|
||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
if (FCorrectThread(replica))
|
if (!FCorrectThread(replica))
|
||||||
ul.lock();
|
|
||||||
else
|
|
||||||
fAsyncWrite = true;
|
fAsyncWrite = true;
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
|
||||||
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
serverAssert(replica->repl_curr_off != -1);
|
||||||
serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy);
|
|
||||||
serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy));
|
min_offset = std::min(min_offset, replica->repl_curr_off);
|
||||||
serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
|
|
||||||
|
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||||
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
|
|
||||||
} else {
|
/* Only if the there isn't already a pending write do we prepare the client to write */
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
|
if (!replica->fPendingReplicaWrite){
|
||||||
if (fAsyncWrite)
|
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
|
||||||
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
|
prepareClientToWrite(replica);
|
||||||
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
|
replica->fPendingReplicaWrite = true;
|
||||||
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
|
||||||
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if (fAsyncWrite)
|
if (fAsyncWrite)
|
||||||
ProcessPendingAsyncWrites();
|
ProcessPendingAsyncWrites();
|
||||||
@ -4965,7 +5004,8 @@ LDone:
|
|||||||
// This may be called multiple times per "frame" so update with our progress flushing to clients
|
// This may be called multiple times per "frame" so update with our progress flushing to clients
|
||||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||||
}
|
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2021,7 +2021,6 @@ void clientsCron(int iel) {
|
|||||||
while(listLength(g_pserver->clients) && iterations--) {
|
while(listLength(g_pserver->clients) && iterations--) {
|
||||||
client *c;
|
client *c;
|
||||||
listNode *head;
|
listNode *head;
|
||||||
|
|
||||||
/* Rotate the list, take the current head, process.
|
/* Rotate the list, take the current head, process.
|
||||||
* This way if the client must be removed from the list it's the
|
* This way if the client must be removed from the list it's the
|
||||||
* first element and we don't incur into O(N) computation. */
|
* first element and we don't incur into O(N) computation. */
|
||||||
@ -3261,6 +3260,7 @@ void initServerConfig(void) {
|
|||||||
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
|
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
|
||||||
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
|
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
|
||||||
g_pserver->master_repl_offset = 0;
|
g_pserver->master_repl_offset = 0;
|
||||||
|
g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst);
|
||||||
|
|
||||||
/* Replication partial resync backlog */
|
/* Replication partial resync backlog */
|
||||||
g_pserver->repl_backlog = NULL;
|
g_pserver->repl_backlog = NULL;
|
||||||
@ -7012,9 +7012,10 @@ void OnTerminate()
|
|||||||
void wakeTimeThread() {
|
void wakeTimeThread() {
|
||||||
updateCachedTime();
|
updateCachedTime();
|
||||||
std::lock_guard<std::mutex> lock(time_thread_mutex);
|
std::lock_guard<std::mutex> lock(time_thread_mutex);
|
||||||
|
if (sleeping_threads >= cserver.cthreads)
|
||||||
|
time_thread_cv.notify_one();
|
||||||
sleeping_threads--;
|
sleeping_threads--;
|
||||||
serverAssert(sleeping_threads >= 0);
|
serverAssert(sleeping_threads >= 0);
|
||||||
time_thread_cv.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *timeThreadMain(void*) {
|
void *timeThreadMain(void*) {
|
||||||
|
17
src/server.h
17
src/server.h
@ -1589,6 +1589,13 @@ struct client {
|
|||||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||||
copying this replica output buffer
|
copying this replica output buffer
|
||||||
should use. */
|
should use. */
|
||||||
|
|
||||||
|
long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from
|
||||||
|
* when sending data to this replica. */
|
||||||
|
long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset
|
||||||
|
* to prevent needing the global lock */
|
||||||
|
int fPendingReplicaWrite; /* Is there a write queued for this replica? */
|
||||||
|
|
||||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
||||||
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
||||||
char *slave_addr; /* Optionally given by REPLCONF ip-address */
|
char *slave_addr; /* Optionally given by REPLCONF ip-address */
|
||||||
@ -2357,6 +2364,9 @@ struct redisServer {
|
|||||||
that is the next byte will'll write to.*/
|
that is the next byte will'll write to.*/
|
||||||
long long repl_backlog_off; /* Replication "master offset" of first
|
long long repl_backlog_off; /* Replication "master offset" of first
|
||||||
byte in the replication backlog buffer.*/
|
byte in the replication backlog buffer.*/
|
||||||
|
long long repl_backlog_start; /* Used to compute indicies from offsets
|
||||||
|
basically, index = (offset - start) % size */
|
||||||
|
fastlock repl_backlog_lock {"replication backlog"};
|
||||||
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
|
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
|
||||||
gets released. */
|
gets released. */
|
||||||
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
time_t repl_no_slaves_since; /* We have no slaves since that time.
|
||||||
@ -2368,6 +2378,8 @@ struct redisServer {
|
|||||||
int repl_diskless_load; /* Slave parse RDB directly from the socket.
|
int repl_diskless_load; /* Slave parse RDB directly from the socket.
|
||||||
* see REPL_DISKLESS_LOAD_* enum */
|
* see REPL_DISKLESS_LOAD_* enum */
|
||||||
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
|
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
|
||||||
|
std::atomic <long long> repl_lowest_off; /* The lowest offset amongst all replicas
|
||||||
|
-1 if there are no replicas */
|
||||||
/* Replication (replica) */
|
/* Replication (replica) */
|
||||||
list *masters;
|
list *masters;
|
||||||
int enable_multimaster;
|
int enable_multimaster;
|
||||||
@ -3713,6 +3725,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len);
|
|||||||
void xorDigest(unsigned char *digest, const void *ptr, size_t len);
|
void xorDigest(unsigned char *digest, const void *ptr, size_t len);
|
||||||
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
|
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int moduleGILAcquiredByModule(void);
|
int moduleGILAcquiredByModule(void);
|
||||||
extern int g_fInCrash;
|
extern int g_fInCrash;
|
||||||
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
|
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
|
||||||
@ -3780,6 +3794,7 @@ void tlsCleanup(void);
|
|||||||
int tlsConfigure(redisTLSContextConfig *ctx_config);
|
int tlsConfigure(redisTLSContextConfig *ctx_config);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ShutdownException
|
class ShutdownException
|
||||||
{};
|
{};
|
||||||
|
|
||||||
@ -3791,3 +3806,5 @@ class ShutdownException
|
|||||||
int iAmMaster(void);
|
int iAmMaster(void);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
@ -33,7 +33,8 @@ start_server {tags {"maxmemory"}} {
|
|||||||
# Get the current memory limit and calculate a new limit.
|
# Get the current memory limit and calculate a new limit.
|
||||||
# We just add 100k to the current memory size so that it is
|
# We just add 100k to the current memory size so that it is
|
||||||
# fast for us to reach that limit.
|
# fast for us to reach that limit.
|
||||||
set used [s used_memory]
|
set overhead [s mem_not_counted_for_evict]
|
||||||
|
set used [expr [s used_memory] - $overhead]
|
||||||
set limit [expr {$used+100*1024}]
|
set limit [expr {$used+100*1024}]
|
||||||
r config set maxmemory $limit
|
r config set maxmemory $limit
|
||||||
r config set maxmemory-policy $policy
|
r config set maxmemory-policy $policy
|
||||||
@ -42,7 +43,7 @@ start_server {tags {"maxmemory"}} {
|
|||||||
while 1 {
|
while 1 {
|
||||||
r setex [randomKey] 10000 x
|
r setex [randomKey] 10000 x
|
||||||
incr numkeys
|
incr numkeys
|
||||||
if {[s used_memory]+4096 > $limit} {
|
if {[expr {[s used_memory] - $overhead + 4096}] > $limit} {
|
||||||
assert {$numkeys > 10}
|
assert {$numkeys > 10}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -52,7 +53,8 @@ start_server {tags {"maxmemory"}} {
|
|||||||
for {set j 0} {$j < $numkeys} {incr j} {
|
for {set j 0} {$j < $numkeys} {incr j} {
|
||||||
r setex [randomKey] 10000 x
|
r setex [randomKey] 10000 x
|
||||||
}
|
}
|
||||||
assert {[s used_memory] < ($limit+4096)}
|
set used_amt [expr [s used_memory] - $overhead]
|
||||||
|
assert {$used_amt < ($limit+4096)}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,7 +67,8 @@ start_server {tags {"maxmemory"}} {
|
|||||||
# Get the current memory limit and calculate a new limit.
|
# Get the current memory limit and calculate a new limit.
|
||||||
# We just add 100k to the current memory size so that it is
|
# We just add 100k to the current memory size so that it is
|
||||||
# fast for us to reach that limit.
|
# fast for us to reach that limit.
|
||||||
set used [s used_memory]
|
set overhead [s mem_not_counted_for_evict]
|
||||||
|
set used [expr [s used_memory] - $overhead]
|
||||||
set limit [expr {$used+100*1024}]
|
set limit [expr {$used+100*1024}]
|
||||||
r config set maxmemory $limit
|
r config set maxmemory $limit
|
||||||
r config set maxmemory-policy $policy
|
r config set maxmemory-policy $policy
|
||||||
@ -74,7 +77,7 @@ start_server {tags {"maxmemory"}} {
|
|||||||
while 1 {
|
while 1 {
|
||||||
r set [randomKey] x
|
r set [randomKey] x
|
||||||
incr numkeys
|
incr numkeys
|
||||||
if {[s used_memory]+4096 > $limit} {
|
if {[expr [s used_memory] - $overhead]+4096 > $limit} {
|
||||||
assert {$numkeys > 10}
|
assert {$numkeys > 10}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -91,7 +94,7 @@ start_server {tags {"maxmemory"}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if {[string match allkeys-* $policy]} {
|
if {[string match allkeys-* $policy]} {
|
||||||
assert {[s used_memory] < ($limit+4096)}
|
assert {[expr [s used_memory] - $overhead] < ($limit+4096)}
|
||||||
} else {
|
} else {
|
||||||
assert {$err == 1}
|
assert {$err == 1}
|
||||||
}
|
}
|
||||||
@ -107,7 +110,8 @@ start_server {tags {"maxmemory"}} {
|
|||||||
# Get the current memory limit and calculate a new limit.
|
# Get the current memory limit and calculate a new limit.
|
||||||
# We just add 100k to the current memory size so that it is
|
# We just add 100k to the current memory size so that it is
|
||||||
# fast for us to reach that limit.
|
# fast for us to reach that limit.
|
||||||
set used [s used_memory]
|
set overhead [s mem_not_counted_for_evict]
|
||||||
|
set used [expr [s used_memory] - $overhead]
|
||||||
set limit [expr {$used+100*1024}]
|
set limit [expr {$used+100*1024}]
|
||||||
r config set maxmemory $limit
|
r config set maxmemory $limit
|
||||||
r config set maxmemory-policy $policy
|
r config set maxmemory-policy $policy
|
||||||
@ -121,7 +125,7 @@ start_server {tags {"maxmemory"}} {
|
|||||||
} else {
|
} else {
|
||||||
r set "key:$numkeys" x
|
r set "key:$numkeys" x
|
||||||
}
|
}
|
||||||
if {[s used_memory]+4096 > $limit} {
|
if {[expr [s used_memory] - $overhead]+4096 > $limit} {
|
||||||
assert {$numkeys > 10}
|
assert {$numkeys > 10}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -135,7 +139,7 @@ start_server {tags {"maxmemory"}} {
|
|||||||
catch {r setex "foo:$j" 10000 x}
|
catch {r setex "foo:$j" 10000 x}
|
||||||
}
|
}
|
||||||
# We should still be under the limit.
|
# We should still be under the limit.
|
||||||
assert {[s used_memory] < ($limit+4096)}
|
assert {[expr [s used_memory] - $overhead] < ($limit+4096)}
|
||||||
# However all our non volatile keys should be here.
|
# However all our non volatile keys should be here.
|
||||||
for {set j 0} {$j < $numkeys} {incr j 2} {
|
for {set j 0} {$j < $numkeys} {incr j 2} {
|
||||||
assert {[r exists "key:$j"]}
|
assert {[r exists "key:$j"]}
|
||||||
@ -305,7 +309,8 @@ start_server {tags {"maxmemory"} overrides {server-threads 1}} {
|
|||||||
# we need to make sure to evict keynames of a total size of more than
|
# we need to make sure to evict keynames of a total size of more than
|
||||||
# 16kb since the (PROTO_REPLY_CHUNK_BYTES), only after that the
|
# 16kb since the (PROTO_REPLY_CHUNK_BYTES), only after that the
|
||||||
# invalidation messages have a chance to trigger further eviction.
|
# invalidation messages have a chance to trigger further eviction.
|
||||||
set used [s used_memory]
|
set overhead [s mem_not_counted_for_evict]
|
||||||
|
set used [expr [s used_memory] - $overhead]
|
||||||
set limit [expr {$used - 40000}]
|
set limit [expr {$used - 40000}]
|
||||||
r config set maxmemory $limit
|
r config set maxmemory $limit
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user