diff --git a/redis.conf b/redis.conf index 108b9ff63..368cf3a73 100644 --- a/redis.conf +++ b/redis.conf @@ -1836,6 +1836,13 @@ activerehashing yes # Instead there is a default limit for pubsub and replica clients, since # subscribers and replicas receive data in a push fashion. # +# Note that it doesn't make sense to set the replica clients output buffer +# limit lower than the repl-backlog-size config (partial sync will succeed +# and then replica will get disconnected). +# Such a configuration is ignored (the size of repl-backlog-size will be used). +# This doesn't have memory consumption implications since the replica client +# will share the backlog buffers memory. +# # Both the hard or the soft limit can be disabled by setting them to zero. client-output-buffer-limit normal 0 0 0 client-output-buffer-limit replica 256mb 64mb 60 diff --git a/src/config.c b/src/config.c index c044ca516..909929147 100644 --- a/src/config.c +++ b/src/config.c @@ -2399,10 +2399,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) { } static int updateReplBacklogSize(long long val, long long prev, const char **err) { - /* resizeReplicationBacklog sets server.cfg_repl_backlog_size, and relies on + /* resizeReplicationBacklog sets server.repl_backlog_size, and relies on * being able to tell when the size changes, so restore prev before calling it. */ UNUSED(err); - server.cfg_repl_backlog_size = prev; + server.repl_backlog_size = prev; resizeReplicationBacklog(val); return 1; } @@ -2684,7 +2684,7 @@ standardConfig configs[] = { createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("proto-max-bulk-len", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */ createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), - createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.cfg_repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ + createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), diff --git a/src/evict.c b/src/evict.c index 954f6f402..4186378a2 100644 --- a/src/evict.c +++ b/src/evict.c @@ -325,22 +325,44 @@ unsigned long LFUDecrAndReturn(robj *o) { } /* We don't want to count AOF buffers and slaves output buffers as - * used memory: the eviction should use mostly data size. This function - * returns the sum of AOF and slaves buffer. */ + * used memory: the eviction should use mostly data size, because + * it can cause feedback-loop when we push DELs into them, putting + * more and more DELs will make them bigger, if we count them, we + * need to evict more keys, and then generate more DELs, maybe cause + * massive eviction loop, even all keys are evicted. + * + * This function returns the sum of AOF and replication buffer. */ size_t freeMemoryGetNotCountedMemory(void) { size_t overhead = 0; - int slaves = listLength(server.slaves); - if (slaves) { - listIter li; - listNode *ln; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = listNodeValue(ln); - overhead += getClientOutputBufferMemoryUsage(slave); + /* Since all replicas and replication backlog share global replication + * buffer, we think only the part of exceeding backlog size is the extra + * separate consumption of replicas. + * + * Note that although the backlog is also initially incrementally grown + * (pushing DELs consumes memory), it'll eventually stop growing and + * remain constant in size, so even if its creation will cause some + * eviction, it's capped, and also here to stay (no resonance effect) + * + * Note that, because we trim backlog incrementally in the background, + * backlog size may exceeds our setting if slow replicas that reference + * vast replication buffer blocks disconnect. To avoid massive eviction + * loop, we don't count the delayed freed replication backlog into used + * memory even if there are no replicas, i.e. we still regard this memory + * as replicas'. */ + if ((long long)server.repl_buffer_mem > server.repl_backlog_size) { + /* We use list structure to manage replication buffer blocks, so backlog + * also occupies some extra memory, we can't know exact blocks numbers, + * we only get approximate size according to per block size. */ + size_t extra_approx_size = + (server.repl_backlog_size/PROTO_REPLY_CHUNK_BYTES + 1) * + (sizeof(replBufBlock)+sizeof(listNode)); + size_t counted_mem = server.repl_backlog_size + extra_approx_size; + if (server.repl_buffer_mem > counted_mem) { + overhead += (server.repl_buffer_mem - counted_mem); } } + if (server.aof_state != AOF_OFF) { overhead += sdsAllocSize(server.aof_buf)+aofRewriteBufferMemoryUsage(); } diff --git a/src/lazyfree.c b/src/lazyfree.c index 10f1ab39f..6127abe77 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -46,6 +46,18 @@ void lazyFreeLuaScripts(void *args[]) { atomicIncr(lazyfreed_objects,len); } +/* Release replication backlog referencing memory. */ +void lazyFreeReplicationBacklogRefMem(void *args[]) { + list *blocks = args[0]; + rax *index = args[1]; + long long len = listLength(blocks); + len += raxSize(index); + listRelease(blocks); + raxFree(index); + atomicDecr(lazyfree_objects,len); + atomicIncr(lazyfreed_objects,len); +} + /* Return the number of currently pending objects to free. */ size_t lazyfreeGetPendingObjectsCount(void) { size_t aux; @@ -180,3 +192,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) { dictRelease(lua_scripts); } } + +/* Free replication backlog referencing buffer blocks and rax index. */ +void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { + if (listLength(blocks) > LAZYFREE_THRESHOLD || + raxSize(index) > LAZYFREE_THRESHOLD) + { + atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index)); + bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); + } else { + listRelease(blocks); + raxFree(index); + } +} diff --git a/src/multi.c b/src/multi.c index 5cc846637..e1a1df93f 100644 --- a/src/multi.c +++ b/src/multi.c @@ -276,7 +276,7 @@ void execCommand(client *c) { * backlog with the final EXEC. */ if (server.repl_backlog && was_master && !is_master) { char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; - feedReplicationBacklog(execcmd,strlen(execcmd)); + feedReplicationBuffer(execcmd,strlen(execcmd)); } afterPropagateExec(); } diff --git a/src/networking.c b/src/networking.c index d514c11a5..9bb12787b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -140,6 +140,8 @@ client *createClient(connection *conn) { c->name = NULL; c->bufpos = 0; c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf); + c->ref_repl_buf_node = NULL; + c->ref_block_pos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); @@ -467,7 +469,7 @@ void afterErrorReply(client *c, const char *s, size_t len) { "to its %s: '%.*s' after processing the command " "'%s'", from, to, (int)len, s, cmdname); if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog && - server.repl_backlog_histlen > 0) + server.repl_backlog->histlen > 0) { showLatestBacklog(); } @@ -985,30 +987,37 @@ void AddReplyFromClient(client *dst, client *src) { closeClientOnOutputBufferLimitReached(dst, 1); } -/* Copy 'src' client output buffers into 'dst' client output buffers. - * The function takes care of freeing the old output buffers of the - * destination client. */ -void copyClientOutputBuffer(client *dst, client *src) { - listEmpty(dst->reply); - dst->sentlen = 0; - dst->bufpos = 0; - dst->reply_bytes = 0; +/* Logically copy 'src' replica client buffers info to 'dst' replica. + * Basically increase referenced buffer block node reference count. */ +void copyReplicaOutputBuffer(client *dst, client *src) { + serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); - /* First copy src static buffer into dst (either static buffer or reply - * list, maybe clients have different 'usable_buffer_size'). */ - _addReplyToBufferOrList(dst,src->buf,src->bufpos); - - /* Copy src reply list into the dest. */ - list* reply = listDup(src->reply); - listJoin(dst->reply,reply); - dst->reply_bytes += src->reply_bytes; - listRelease(reply); + if (src->ref_repl_buf_node == NULL) return; + dst->ref_repl_buf_node = src->ref_repl_buf_node; + dst->ref_block_pos = src->ref_block_pos; + ((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++; } /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { - return c->bufpos || listLength(c->reply); + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + /* Replicas use global shared replication buffer instead of + * private output buffer. */ + serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); + if (c->ref_repl_buf_node == NULL) return 0; + + /* If the last replication buffer block content is totally sent, + * we have nothing to send. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = listNodeValue(ln); + if (ln == c->ref_repl_buf_node && + c->ref_block_pos == tail->used) return 0; + + return 1; + } else { + return c->bufpos || listLength(c->reply); + } } void clientAcceptHandler(connection *conn) { @@ -1395,6 +1404,7 @@ void freeClient(client *c) { /* Free data structures. */ listRelease(c->reply); + freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); @@ -1542,6 +1552,77 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +/* This function does actual writing output buffers to different types of + * clients, it is called by writeToClient. + * If we write successfully, it return C_OK, otherwise, C_ERR is returned, + * And 'nwritten' is a output parameter, it means how many bytes server write + * to client. */ +int _writeToClient(client *c, ssize_t *nwritten) { + *nwritten = 0; + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); + + replBufBlock *o = listNodeValue(c->ref_repl_buf_node); + serverAssert(o->used >= c->ref_block_pos); + /* Send current block if it is not fully sent. */ + if (o->used > c->ref_block_pos) { + *nwritten = connWrite(c->conn, o->buf+c->ref_block_pos, + o->used-c->ref_block_pos); + if (*nwritten <= 0) return C_ERR; + c->ref_block_pos += *nwritten; + } + + /* If we fully sent the object on head, go to the next one. */ + listNode *next = listNextNode(c->ref_repl_buf_node); + if (next && c->ref_block_pos == o->used) { + o->refcount--; + ((replBufBlock *)(listNodeValue(next)))->refcount++; + c->ref_repl_buf_node = next; + c->ref_block_pos = 0; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + return C_OK; + } + + if (c->bufpos > 0) { + *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (*nwritten <= 0) return C_ERR; + c->sentlen += *nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); + size_t objlen = o->used; + + if (objlen == 0) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + return C_OK; + } + + *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); + if (*nwritten <= 0) return C_ERR; + c->sentlen += *nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } + } + return C_OK; +} + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the @@ -1555,48 +1636,11 @@ int writeToClient(client *c, int handler_installed) { atomicIncr(server.stat_total_writes_processed, 1); ssize_t nwritten = 0, totwritten = 0; - size_t objlen; - clientReplyBlock *o; while(clientHasPendingReplies(c)) { - if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } else { - o = listNodeValue(listFirst(c->reply)); - objlen = o->used; - - if (objlen == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - continue; - } - - nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); - } - } + int ret = _writeToClient(c, &nwritten); + if (ret == C_ERR) break; + totwritten += nwritten; /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from @@ -2077,8 +2121,7 @@ void commandProcessed(client *c) { if (c->flags & CLIENT_MASTER) { long long applied = c->reploff - prev_offset; if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); + replicationFeedStreamFromMasterStream(c->pending_querybuf,applied); sdsrange(c->pending_querybuf,applied,-1); } } @@ -2399,6 +2442,13 @@ sds catClientInfoString(sds s, client *client) { /* Compute the total memory consumed by this client. */ size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); + size_t used_blocks_of_repl_buf = 0; + if (client->ref_repl_buf_node) { + replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); + replBufBlock *cur = listNodeValue(client->ref_repl_buf_node); + used_blocks_of_repl_buf = last->id - cur->id + 1; + } + sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL; sds ret = sdscatfmt(s, "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", @@ -2419,7 +2469,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) client->argv_len_sum, (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, - (unsigned long long) listLength(client->reply), + (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ (unsigned long long) total_mem, events, @@ -3247,8 +3297,21 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ size_t getClientOutputBufferMemoryUsage(client *c) { - size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)); + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + size_t repl_buf_size = 0; + size_t repl_node_num = 0; + size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock); + if (c->ref_repl_buf_node) { + replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks)); + replBufBlock *cur = listNodeValue(c->ref_repl_buf_node); + repl_buf_size = last->repl_offset + last->size - cur->repl_offset; + repl_node_num = last->id - cur->id + 1; + } + return repl_buf_size + (repl_node_size*repl_node_num); + } else { + size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); + return c->reply_bytes + (list_item_size*listLength(c->reply)); + } } /* Returns the total client's memory usage. @@ -3332,8 +3395,18 @@ int checkClientOutputBufferLimits(client *c) { * like normal clients. */ if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL; + /* Note that it doesn't make sense to set the replica clients output buffer + * limit lower than the repl-backlog-size config (partial sync will succeed + * and then replica will get disconnected). + * Such a configuration is ignored (the size of repl-backlog-size will be used). + * This doesn't have memory consumption implications since the replica client + * will share the backlog buffers memory. */ + size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes; + if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes && + (long long)hard_limit_bytes < server.repl_backlog_size) + hard_limit_bytes = server.repl_backlog_size; if (server.client_obuf_limits[class].hard_limit_bytes && - used_mem >= server.client_obuf_limits[class].hard_limit_bytes) + used_mem >= hard_limit_bytes) hard = 1; if (server.client_obuf_limits[class].soft_limit_bytes && used_mem >= server.client_obuf_limits[class].soft_limit_bytes) @@ -3375,7 +3448,10 @@ int checkClientOutputBufferLimits(client *c) { int closeClientOnOutputBufferLimitReached(client *c, int async) { if (!c->conn) return 0; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); - if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0; + /* Note that c->reply_bytes is irrelevant for replica clients + * (they use the global repl buffers). */ + if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) || + c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); @@ -3740,6 +3816,15 @@ int handleClientsWithPendingWritesUsingThreads(void) { continue; } + /* Since all replicas and replication backlog use global replication + * buffer, to guarantee data accessing thread safe, we must put all + * replicas client into io_threads_list[0] i.e. main thread handles + * sending the output buffer of all replicas. */ + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + listAddNodeTail(io_threads_list[0],c); + continue; + } + int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; diff --git a/src/object.c b/src/object.c index 4c62df3f0..c2d917ae4 100644 --- a/src/object.c +++ b/src/object.c @@ -1172,20 +1172,34 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mem_total += server.initial_memory_usage; - mem = 0; - if (server.repl_backlog) - mem += zmalloc_size(server.repl_backlog); - mh->repl_backlog = mem; - mem_total += mem; + /* Replication backlog and replicas share one global replication buffer, + * only if replication buffer memory is more than the repl backlog setting, + * we consider the excess as replicas' memory. Otherwise, replication buffer + * memory is the consumption of repl backlog. */ + if (listLength(server.slaves) && + (long long)server.repl_buffer_mem > server.repl_backlog_size) + { + mh->clients_slaves = server.repl_buffer_mem - server.repl_backlog_size; + mh->repl_backlog = server.repl_backlog_size; + } else { + mh->clients_slaves = 0; + mh->repl_backlog = server.repl_buffer_mem; + } + if (server.repl_backlog) { + /* The approximate memory of rax tree for indexed blocks. */ + mh->repl_backlog += + server.repl_backlog->blocks_index->numnodes * sizeof(raxNode) + + raxSize(server.repl_backlog->blocks_index) * sizeof(void*); + } + mem_total += mh->repl_backlog; + mem_total += mh->clients_slaves; /* Computing the memory used by the clients would be O(N) if done * here online. We use our values computed incrementally by * updateClientMemUsage(). */ - mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE]; mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+ server.stat_clients_type_memory[CLIENT_TYPE_NORMAL]; - mem_total += mh->clients_slaves; mem_total += mh->clients_normal; mem = 0; @@ -1312,7 +1326,7 @@ sds getMemoryDoctorReport(void) { } /* Slaves using more than 10 MB each? */ - if (numslaves > 0 && mh->clients_slaves / numslaves > (1024*1024*10)) { + if (numslaves > 0 && mh->clients_slaves > (1024*1024*10)) { big_slave_buf = 1; num_reports++; } diff --git a/src/replication.c b/src/replication.c index 1c8836c02..0629a4ca9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -33,6 +33,7 @@ #include "cluster.h" #include "bio.h" +#include #include #include #include @@ -109,95 +110,59 @@ int bg_unlink(const char *filename) { void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); - server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); - server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - + server.repl_backlog = zmalloc(sizeof(replBacklog)); + server.repl_backlog->ref_repl_buf_node = NULL; + server.repl_backlog->unindexed_count = 0; + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->histlen = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ - server.repl_backlog_off = server.master_repl_offset+1; + server.repl_backlog->offset = server.master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the - * server.cfg_repl_backlog_size and to resize the buffer and setup it so that + * server.repl_backlog_size and to resize the buffer and setup it so that * it contains the same data as the previous one (possibly less data, but * the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(long long newsize) { if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; - if (server.cfg_repl_backlog_size == newsize) return; + if (server.repl_backlog_size == newsize) return; - server.cfg_repl_backlog_size = newsize; - if (server.repl_backlog != NULL) { - /* What we actually do is to flush the old buffer and realloc a new - * empty one. It will refill with new data incrementally. - * The reason is that copying a few gigabytes adds latency and even - * worse often we need to alloc additional space before freeing the - * old buffer. */ - zfree(server.repl_backlog); - server.repl_backlog = zmalloc(server.cfg_repl_backlog_size); - server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog); - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - server.repl_backlog_off = server.master_repl_offset+1; - } + server.repl_backlog_size = newsize; + if (server.repl_backlog) + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); + if (server.repl_backlog == NULL) return; + + /* Decrease the start buffer node reference count. */ + if (server.repl_backlog->ref_repl_buf_node) { + replBufBlock *o = listNodeValue( + server.repl_backlog->ref_repl_buf_node); + serverAssert(o->refcount == 1); /* Last reference. */ + o->refcount--; + } + + /* Replication buffer blocks are completely released when we free the + * backlog, since the backlog is released only when there are no replicas + * and the backlog keeps the last reference of all blocks. */ + freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, + server.repl_backlog->blocks_index); + resetReplicationBuffer(); zfree(server.repl_backlog); server.repl_backlog = NULL; } -/* Add data to the replication backlog. - * This function also increments the global replication offset stored at - * server.master_repl_offset, because there is no case where we want to feed - * the backlog without incrementing the offset. */ -void feedReplicationBacklog(void *ptr, size_t len) { - unsigned char *p = ptr; - - server.master_repl_offset += len; - - /* This is a circular buffer, so write as much data we can at every - * iteration and rewind the "idx" index if we reach the limit. */ - while(len) { - size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; - if (thislen > len) thislen = len; - memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); - server.repl_backlog_idx += thislen; - if (server.repl_backlog_idx == server.repl_backlog_size) - server.repl_backlog_idx = 0; - len -= thislen; - p += thislen; - server.repl_backlog_histlen += thislen; - } - if (server.repl_backlog_histlen > server.repl_backlog_size) - server.repl_backlog_histlen = server.repl_backlog_size; - /* Set the offset of the first byte we have in the backlog. */ - server.repl_backlog_off = server.master_repl_offset - - server.repl_backlog_histlen + 1; -} - -/* Wrapper for feedReplicationBacklog() that takes Redis string objects - * as input. */ -void feedReplicationBacklogWithObject(robj *o) { - char llstr[LONG_STR_SIZE]; - void *p; - size_t len; - - if (o->encoding == OBJ_ENCODING_INT) { - len = ll2string(llstr,sizeof(llstr),(long)o->ptr); - p = llstr; - } else { - len = sdslen(o->ptr); - p = o->ptr; - } - feedReplicationBacklog(p,len); +void resetReplicationBuffer(void) { + server.repl_buffer_mem = 0; + server.repl_buffer_blocks = listCreate(); + listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree); } int canFeedReplicaReplBuffer(client *replica) { @@ -210,14 +175,231 @@ int canFeedReplicaReplBuffer(client *replica) { return 1; } -/* Propagate write commands to slaves, and populate the replication backlog - * as well. This function is used if the instance is a master: we use - * the commands received by our clients in order to create the replication - * stream. Instead if the instance is a slave and has sub-slaves attached, - * we use replicationFeedSlavesFromMasterStream() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln; +/* Similar with 'prepareClientToWrite', note that we must call this function + * before feeding replication stream into global replication buffer, since + * clientHasPendingReplies in prepareClientToWrite will access the global + * replication buffer to make judgements. */ +int prepareReplicasToWrite(void) { listIter li; + listNode *ln; + int prepared = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + if (prepareClientToWrite(slave) == C_ERR) continue; + prepared++; + } + + return prepared; +} + +/* Wrapper for feedReplicationBuffer() that takes Redis string objects + * as input. */ +void feedReplicationBufferWithObject(robj *o) { + char llstr[LONG_STR_SIZE]; + void *p; + size_t len; + + if (o->encoding == OBJ_ENCODING_INT) { + len = ll2string(llstr,sizeof(llstr),(long)o->ptr); + p = llstr; + } else { + len = sdslen(o->ptr); + p = o->ptr; + } + feedReplicationBuffer(p,len); +} + +/* Generally, we only have one replication buffer block to trim when replication + * backlog size exceeds our setting and no replica reference it. But if replica + * clients disconnect, we need to free many replication buffer blocks that are + * referenced. It would cost much time if there are a lots blocks to free, that + * will freeze server, so we trim replication backlog incrementally. */ +void incrementalTrimReplicationBacklog(size_t max_blocks) { + serverAssert(server.repl_backlog != NULL); + + size_t trimmed_blocks = 0, trimmed_bytes = 0; + while (server.repl_backlog->histlen > server.repl_backlog_size && + trimmed_blocks < max_blocks) + { + /* We never trim backlog to less than one block. */ + if (listLength(server.repl_buffer_blocks) <= 1) break; + + /* Replicas increment the refcount of the first replication buffer block + * they refer to, in that case, we don't trim the backlog even if + * backlog_histlen exceeds backlog_size. This implicitly makes backlog + * bigger than our setting, but makes the master accept partial resync as + * much as possible. So that backlog must be the last reference of + * replication buffer blocks. */ + listNode *first = listFirst(server.repl_buffer_blocks); + serverAssert(first == server.repl_backlog->ref_repl_buf_node); + replBufBlock *fo = listNodeValue(first); + if (fo->refcount != 1) break; + + /* We don't try trim backlog if backlog valid size will be lessen than + * setting backlog size once we release the first repl buffer block. */ + if (server.repl_backlog->histlen - (long long)fo->size <= + server.repl_backlog_size) break; + + /* Decr refcount and release the first block later. */ + fo->refcount--; + trimmed_bytes += fo->size; + trimmed_blocks++; + + /* Go to use next replication buffer block node. */ + listNode *next = listNextNode(first); + server.repl_backlog->ref_repl_buf_node = next; + serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); + /* Incr reference count to keep the new head node. */ + ((replBufBlock *)listNodeValue(next))->refcount++; + + /* Remove the node in recorded blocks. */ + uint64_t encoded_offset = htonu64(fo->repl_offset); + raxRemove(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); + + /* Delete the first node from global replication buffer. */ + serverAssert(fo->refcount == 0 && fo->used == fo->size); + server.repl_buffer_mem -= (fo->size + + sizeof(listNode) + sizeof(replBufBlock)); + listDelNode(server.repl_buffer_blocks, first); + } + + server.repl_backlog->histlen -= trimmed_bytes; + /* Set the offset of the first byte we have in the backlog. */ + server.repl_backlog->offset = server.master_repl_offset - + server.repl_backlog->histlen + 1; +} + +/* Free replication buffer blocks that are referenced by this client. */ +void freeReplicaReferencedReplBuffer(client *replica) { + if (replica->ref_repl_buf_node != NULL) { + /* Decrease the start buffer node reference count. */ + replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); + serverAssert(o->refcount > 0); + o->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + replica->ref_repl_buf_node = NULL; + replica->ref_block_pos = 0; +} + +/* Append bytes into the global replication buffer list, replication backlog and + * all replica clients use replication buffers collectively, this function replace + * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, + * First we add buffer into global replication buffer block list, and then + * update replica / replication-backlog referenced node and block position. */ +void feedReplicationBuffer(char *s, size_t len) { + static long long repl_block_id = 0; + + if (server.repl_backlog == NULL) return; + server.master_repl_offset += len; + server.repl_backlog->histlen += len; + + /* Install write handler for all replicas. */ + prepareReplicasToWrite(); + + size_t start_pos = 0; /* The position of referenced blok to start sending. */ + listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ + int add_new_block = 0; /* Create new block if current block is total used. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + /* Append to tail string when possible. */ + if (tail && tail->size > tail->used) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = tail->used; + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = (avail >= len) ? len : avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t usable_size; + size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len; + tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + tail->used = len; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset - tail->used + 1; + tail->id = repl_block_id++; + memcpy(tail->buf, s, len); + listAddNodeTail(server.repl_buffer_blocks, tail); + /* We also count the list node memory into replication buffer memory. */ + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + add_new_block = 1; + if (start_node == NULL) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = 0; + } + } + + /* For output buffer of replicas. */ + listIter li; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = start_node; + slave->ref_block_pos = start_pos; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + } + + /* Check output buffer limit only when add new block. */ + if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); + } + + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(add_new_block == 1 && start_pos == 0); + } + if (add_new_block) { + /* To make search offset from replication buffer blocks quickly + * when replicas ask partial resynchronization, we create one index + * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ + server.repl_backlog->unindexed_count++; + if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { + uint64_t encoded_offset = htonu64(tail->repl_offset); + raxInsert(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), + listLast(server.repl_buffer_blocks), NULL); + server.repl_backlog->unindexed_count = 0; + } + } + /* Try to trim replication backlog since replication backlog may exceed + * our setting when we add replication stream. Note that it is important to + * try to trim at least one node since in the common case this is where one + * new backlog node is added and one should be removed. See also comments + * in freeMemoryGetNotCountedMemory for details. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); +} + +/* Propagate write commands to replication stream. + * + * This function is used if the instance is a master: we use the commands + * received by our clients in order to create the replication stream. + * Instead if the instance is a replica and has sub-replicas attached, we use + * replicationFeedStreamFromMasterStream() */ +void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; @@ -252,68 +434,36 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { dictid_len, llstr)); } - /* Add the SELECT command into the backlog. */ - if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); - - /* Send it to slaves. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (!canFeedReplicaReplBuffer(slave)) continue; - addReply(slave,selectcmd); - } + feedReplicationBufferWithObject(selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } server.slaveseldb = dictid; - /* Write the command to the replication backlog if any. */ - if (server.repl_backlog) { - char aux[LONG_STR_SIZE+3]; + /* Write the command to the replication buffer if any. */ + char aux[LONG_STR_SIZE+3]; - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); + /* Add the multi bulk reply length. */ + aux[0] = '*'; + len = ll2string(aux+1,sizeof(aux)-1,argc); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBuffer(aux,len+3); + + for (j = 0; j < argc; j++) { + long objlen = stringObjectLen(argv[j]); + + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * and add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - - for (j = 0; j < argc; j++) { - long objlen = stringObjectLen(argv[j]); - - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - feedReplicationBacklogWithObject(argv[j]); - feedReplicationBacklog(aux+len+1,2); - } - } - - /* Write the command to every slave. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (!canFeedReplicaReplBuffer(slave)) continue; - - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ - - /* Add the multi bulk length. */ - addReplyArrayLen(slave,argc); - - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (j = 0; j < argc; j++) - addReplyBulk(slave,argv[j]); + feedReplicationBuffer(aux,len+3); + feedReplicationBufferWithObject(argv[j]); + feedReplicationBuffer(aux+len+1,2); } } @@ -323,26 +473,24 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * guess what kind of bug it could be. */ void showLatestBacklog(void) { if (server.repl_backlog == NULL) return; + if (listLength(server.repl_buffer_blocks) == 0) return; - long long dumplen = 256; - if (server.repl_backlog_histlen < dumplen) - dumplen = server.repl_backlog_histlen; + size_t dumplen = 256; + if (server.repl_backlog->histlen < (long long)dumplen) + dumplen = server.repl_backlog->histlen; - /* Identify the first byte to dump. */ - long long idx = - (server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) % - server.repl_backlog_size; - - /* Scan the circular buffer to collect 'dumplen' bytes. */ sds dump = sdsempty(); + listNode *node = listLast(server.repl_buffer_blocks); while(dumplen) { - long long thislen = - ((server.repl_backlog_size - idx) < dumplen) ? - (server.repl_backlog_size - idx) : dumplen; - - dump = sdscatrepr(dump,server.repl_backlog+idx,thislen); + if (node == NULL) break; + replBufBlock *o = listNodeValue(node); + size_t thislen = o->used >= dumplen ? dumplen : o->used; + sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); + sds tmp = sdscatsds(head, dump); + sdsfree(dump); + dump = tmp; dumplen -= thislen; - idx = 0; + node = listPrevNode(node); } /* Finally log such bytes: this is vital debugging info to @@ -354,10 +502,7 @@ void showLatestBacklog(void) { /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { - listNode *ln; - listIter li; - +void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { @@ -368,14 +513,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle printf("\n"); } - if (server.repl_backlog) feedReplicationBacklog(buf,buflen); - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (!canFeedReplicaReplBuffer(slave)) continue; - addReplyProto(slave,buf,buflen); - } + /* There must be replication backlog if having attached slaves. */ + if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); + if (server.repl_backlog) feedReplicationBuffer(buf,buflen); } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -422,11 +562,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; + long long skip; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); - if (server.repl_backlog_histlen == 0) { + if (server.repl_backlog->histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } @@ -434,41 +574,58 @@ long long addReplyReplicationBacklog(client *c, long long offset) { serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", - server.repl_backlog_off); + server.repl_backlog->offset); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", - server.repl_backlog_histlen); - serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", - server.repl_backlog_idx); + server.repl_backlog->histlen); /* Compute the amount of bytes we need to discard. */ - skip = offset - server.repl_backlog_off; + skip = offset - server.repl_backlog->offset; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actually our - * server.repl_backlog_off byte. */ - j = (server.repl_backlog_idx + - (server.repl_backlog_size-server.repl_backlog_histlen)) % - server.repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); - - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % server.repl_backlog_size; - - /* Feed slave with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ - len = server.repl_backlog_histlen - skip; - serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - while(len) { - long long thislen = - ((server.repl_backlog_size - j) < len) ? - (server.repl_backlog_size - j) : len; - - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplyProto(c,server.repl_backlog + j, thislen); - len -= thislen; - j = 0; + /* Iterate recorded blocks, quickly search the approximate node. */ + listNode *node = NULL; + if (raxSize(server.repl_backlog->blocks_index) > 0) { + uint64_t encoded_offset = htonu64(offset); + raxIterator ri; + raxStart(&ri, server.repl_backlog->blocks_index); + raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t)); + if (raxEOF(&ri)) { + /* No found, so search from the last recorded node. */ + raxSeek(&ri, "$", NULL, 0); + raxPrev(&ri); + node = (listNode *)ri.data; + } else { + raxPrev(&ri); /* Skip the sought node. */ + /* We should search from the prev node since the offset of current + * sought node exceeds searching offset. */ + if (raxPrev(&ri)) + node = (listNode *)ri.data; + else + node = server.repl_backlog->ref_repl_buf_node; + } + raxStop(&ri); + } else { + /* No recorded blocks, just from the start node to search. */ + node = server.repl_backlog->ref_repl_buf_node; } - return server.repl_backlog_histlen - skip; + + /* Search the exact node. */ + while (node != NULL) { + replBufBlock *o = listNodeValue(node); + if (o->repl_offset + (long long)o->used >= offset) break; + node = listNextNode(node); + } + serverAssert(node != NULL); + + /* Install a writer handler first.*/ + prepareClientToWrite(c); + /* Setting output buffer of the replica. */ + replBufBlock *o = listNodeValue(node); + o->refcount++; + c->ref_repl_buf_node = node; + c->ref_block_pos = offset - o->repl_offset; + + return server.repl_backlog->histlen - skip; } /* Return the offset to provide as reply to the PSYNC command received @@ -569,8 +726,8 @@ int masterTryPartialResynchronization(client *c) { /* We still have the data our slave is asking for? */ if (!server.repl_backlog || - psync_offset < server.repl_backlog_off || - psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) + psync_offset < server.repl_backlog->offset || + psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); @@ -853,7 +1010,8 @@ void syncCommand(client *c) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ - if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); + if (!(c->flags & CLIENT_REPL_RDBONLY)) + copyReplicaOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -3488,6 +3646,16 @@ void replicationCron(void) { * with any persistence. */ removeRDBUsedToSyncReplicas(); + /* Sanity check replication buffer, the first block of replication buffer blocks + * must be referenced by someone, since it will be freed when not referenced, + * otherwise, server will OOM. also, its refcount must not be more than + * replicas number + 1(replication backlog). */ + if (listLength(server.repl_buffer_blocks) > 0) { + replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks)); + serverAssert(o->refcount > 0 && + o->refcount <= (int)listLength(server.slaves)+1); + } + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ diff --git a/src/server.c b/src/server.c index ee1b689a6..dcce3d177 100644 --- a/src/server.c +++ b/src/server.c @@ -3450,6 +3450,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); + /* Incrementally trim replication backlog, 10 times the normal speed is + * to free replication backlog as much as possible. */ + if (server.repl_backlog) + incrementalTrimReplicationBacklog(10*REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + /* Disconnect some clients if they are consuming too much memory. */ evictClients(); @@ -3717,9 +3722,6 @@ void initServerConfig(void) { /* Replication partial resync backlog */ server.repl_backlog = NULL; - server.repl_backlog_histlen = 0; - server.repl_backlog_idx = 0; - server.repl_backlog_off = 0; server.repl_no_slaves_since = time(NULL); /* Failover related */ @@ -4171,6 +4173,7 @@ void initServer(void) { server.blocked_last_cron = 0; server.blocking_op_nesting = 0; server.thp_enabled = 0; + resetReplicationBuffer(); if ((server.tls_port || server.tls_replication || server.tls_cluster) && tlsConfigure(&server.tls_ctx_config) == C_ERR) { @@ -6330,6 +6333,7 @@ sds genRedisInfoString(const char *section) { "mem_fragmentation_bytes:%zd\r\n" "mem_not_counted_for_evict:%zu\r\n" "mem_replication_backlog:%zu\r\n" + "mem_total_replication_buffers:%zu\r\n" "mem_clients_slaves:%zu\r\n" "mem_clients_normal:%zu\r\n" "mem_aof_buffer:%zu\r\n" @@ -6374,6 +6378,7 @@ sds genRedisInfoString(const char *section) { mh->total_frag_bytes, freeMemoryGetNotCountedMemory(), mh->repl_backlog, + server.repl_buffer_mem, mh->clients_slaves, mh->clients_normal, mh->aof_buffer, @@ -6762,8 +6767,8 @@ sds genRedisInfoString(const char *section) { server.second_replid_offset, server.repl_backlog != NULL, server.repl_backlog_size, - server.repl_backlog_off, - server.repl_backlog_histlen); + server.repl_backlog ? server.repl_backlog->offset : 0, + server.repl_backlog ? server.repl_backlog->histlen : 0); } /* CPU */ @@ -7515,15 +7520,19 @@ void dismissMemoryInChild(void) { /* Currently we use zmadvise_dontneed only when we use jemalloc with Linux. * so we avoid these pointless loops when they're not going to do anything. */ #if defined(USE_JEMALLOC) && defined(__linux__) + listIter li; + listNode *ln; - /* Dismiss replication backlog. */ - if (server.repl_backlog != NULL) { - dismissMemory(server.repl_backlog, server.repl_backlog_size); + /* Dismiss replication buffer. We don't need to separately dismiss replication + * backlog and replica' output buffer, because they just reference the global + * replication buffer but don't cost real memory. */ + listRewind(server.repl_buffer_blocks, &li); + while((ln = listNext(&li))) { + replBufBlock *o = listNodeValue(ln); + dismissMemory(o, o->size); } /* Dismiss all clients memory. */ - listIter li; - listNode *ln; listRewind(server.clients, &li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); @@ -7592,15 +7601,34 @@ void loadDataFromDisk(void) { server.second_replid_offset = rsi.repl_offset+1; /* Rebase master_repl_offset from rsi.repl_offset. */ server.master_repl_offset += rsi.repl_offset; - server.repl_backlog_off = server.master_repl_offset - - server.repl_backlog_histlen + 1; + serverAssert(server.repl_backlog); + server.repl_backlog->offset = server.master_repl_offset - + server.repl_backlog->histlen + 1; server.repl_no_slaves_since = time(NULL); + + /* Rebase replication buffer blocks' offset since the previous + * setting offset starts from 0. */ + listIter li; + listNode *ln; + listRewind(server.repl_buffer_blocks, &li); + while ((ln = listNext(&li))) { + replBufBlock *o = listNodeValue(ln); + o->repl_offset += rsi.repl_offset; + } } } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } + + /* We always create replication backlog if server is a master, we need + * it because we put DELs in it when loading expired keys in RDB, but + * if RDB doesn't have replication info or there is no rdb, it is not + * possible to support partial resynchronization, to avoid extra memory + * of replication backlog, we drop it. */ + if (server.master_repl_offset == 0 && server.repl_backlog) + freeReplicationBacklog(); } } diff --git a/src/server.h b/src/server.h index b911697a1..21d5fcd65 100644 --- a/src/server.h +++ b/src/server.h @@ -377,6 +377,13 @@ typedef enum { /* Synchronous read timeout - slave side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 +/* The default number of replication backlog blocks to trim per call. */ +#define REPL_BACKLOG_TRIM_BLOCKS_PER_CALL 64 + +/* In order to quickly find the requested offset for PSYNC requests, + * we index some nodes in the replication buffer linked list into a rax. */ +#define REPL_BACKLOG_INDEX_PER_BLOCKS 64 + /* List related stuff */ #define LIST_HEAD 0 #define LIST_TAIL 1 @@ -767,6 +774,33 @@ typedef struct clientReplyBlock { char buf[]; } clientReplyBlock; +/* Replication buffer blocks is the list of replBufBlock. + * + * +--------------+ +--------------+ +--------------+ + * | refcount = 1 | ... | refcount = 0 | ... | refcount = 2 | + * +--------------+ +--------------+ +--------------+ + * | / \ + * | / \ + * | / \ + * Repl Backlog Replia_A Replia_B + * + * Each replica or replication backlog increments only the refcount of the + * 'ref_repl_buf_node' which it points to. So when replica walks to the next + * node, it should first increase the next node's refcount, and when we trim + * the replication buffer nodes, we remove node always from the head node which + * refcount is 0. If the refcount of the head node is not 0, we must stop + * trimming and never iterate the next node. */ + +/* Similar with 'clientReplyBlock', it is used for shared buffers between + * all replica clients and replication backlog. */ +typedef struct replBufBlock { + int refcount; /* Number of replicas or repl backlog using. */ + long long id; /* The unique incremental number. */ + long long repl_offset; /* Start replication offset of the block. */ + size_t size, used; + char buf[]; +} replBufBlock; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -929,6 +963,24 @@ typedef struct { need more reserved IDs use UINT64_MAX-1, -2, ... and so forth. */ +/* Replication backlog is not separate memory, it just is one consumer of + * the global replication buffer. This structure records the reference of + * replication buffers. Since the replication buffer block list may be very long, + * it would cost much time to search replication offset on partial resync, so + * we use one rax tree to index some blocks every REPL_BACKLOG_INDEX_PER_BLOCKS + * to make searching offset from replication buffer blocks list faster. */ +typedef struct replBacklog { + listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, + * see the definition of replBufBlock. */ + size_t unindexed_count; /* The count from last creating index block. */ + rax *blocks_index; /* The index of reocrded blocks of replication + * buffer for quickly searching replication + * offset on partial resynchronization. */ + long long histlen; /* Backlog actual data length */ + long long offset; /* Replication "master offset" of first + * byte in the replication backlog buffer.*/ +} replBacklog; + typedef struct { list *clients; size_t mem_usage_sum; @@ -1029,6 +1081,11 @@ typedef struct client { listNode *mem_usage_bucket_node; clientMemUsageBucket *mem_usage_bucket; + listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, + * see the definition of replBufBlock. */ + size_t ref_block_pos; /* Access position of referenced buffer block, + * i.e. the next offset to send. */ + /* Response buffer */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ @@ -1528,14 +1585,8 @@ struct redisServer { long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ - char *repl_backlog; /* Replication backlog for partial syncs */ + replBacklog *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ - long long cfg_repl_backlog_size;/* Backlog circular buffer size in config */ - long long repl_backlog_histlen; /* Backlog actual data length */ - long long repl_backlog_idx; /* Backlog circular buffer current offset, - that is the next byte will'll write to.*/ - long long repl_backlog_off; /* Replication "master offset" of first - byte in the replication backlog buffer.*/ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -1547,6 +1598,9 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + size_t repl_buffer_mem; /* The memory of replication buffer. */ + list *repl_buffer_blocks; /* Replication buffers blocks list + * (serving replica clients and repl backlog) */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ sds masterauth; /* AUTH with this password with master */ @@ -2031,6 +2085,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(connection *conn); +int prepareClientToWrite(client *c); void addReplyNull(client *c); void addReplyNullArray(client *c); void addReplyBool(client *c, int b); @@ -2063,8 +2118,8 @@ void addReplyPushLen(client *c, long length); void addReplyHelp(client *c, const char **help); void addReplySubcommandSyntaxError(client *c); void addReplyLoadedModules(client *c); +void copyReplicaOutputBuffer(client *dst, client *src); void addListRangeReply(client *c, robj *o, long start, long end, int reverse); -void copyClientOutputBuffer(client *dst, client *src); size_t sdsZmallocSize(sds s); size_t getStringObjectSdsUsedMemory(robj *o); void freeClientReplyValue(void *o); @@ -2238,7 +2293,10 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); +void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); +void resetReplicationBuffer(void); +void feedReplicationBuffer(char *buf, size_t len); +void freeReplicaReferencedReplBuffer(client *replica); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -2264,8 +2322,11 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset); void changeReplicationId(void); void clearReplicationId2(void); void createReplicationBacklog(void); +void freeReplicationBacklog(void); void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); +void incrementalTrimReplicationBacklog(size_t blocks); +int canFeedReplicaReplBuffer(client *replica); void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); @@ -2613,7 +2674,7 @@ size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); void lazyfreeResetStats(void); void freeObjAsync(robj *key, robj *obj, int dbid); - +void freeReplicationBacklogRefMemAsync(list *blocks, rax *index); /* API to get key arguments from commands */ int *getKeysPrepareResult(getKeysResult *result, int numkeys); diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 301a4f63d..672143102 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -118,7 +118,8 @@ start_server {} { $master config rewrite $master debug set-active-expire 0 - for {set j 0} {$j < 1024} {incr j} { + # Make sure replication backlog is full and will be trimmed. + for {set j 0} {$j < 2048} {incr j} { $master select [expr $j%16] $master set $j somevalue px 10 } @@ -149,7 +150,7 @@ start_server {} { assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]} assert {[status $master sync_partial_ok] == 0} assert {[status $master sync_full] == 1} - assert {[status $master rdb_last_load_keys_expired] == 1024} + assert {[status $master rdb_last_load_keys_expired] == 2048} assert {[status $replica sync_full] == 1} set digest [$master debug digest] diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl new file mode 100644 index 000000000..905766ae2 --- /dev/null +++ b/tests/integration/replication-buffer.tcl @@ -0,0 +1,218 @@ +# This test group aims to test that all replicas share one global replication buffer, +# two replicas don't make replication buffer size double, and when there is no replica, +# replica buffer will shrink. +start_server {tags {"repl external:skip"}} { +start_server {} { +start_server {} { +start_server {} { + set replica1 [srv -3 client] + set replica2 [srv -2 client] + set replica3 [srv -1 client] + + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set save "" + $master config set repl-backlog-size 16384 + $master config set client-output-buffer-limit "replica 0 0 0" + + # Make sure replica3 is synchronized with master + $replica3 replicaof $master_host $master_port + wait_for_sync $replica3 + + # Generating RDB will take some 100 seconds + $master config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + # Make sure replica1 and replica2 are waiting bgsave + $replica1 replicaof $master_host $master_port + $replica2 replicaof $master_host $master_port + wait_for_condition 50 100 { + ([s rdb_bgsave_in_progress] == 1) && + [lindex [$replica1 role] 3] eq {sync} && + [lindex [$replica2 role] 3] eq {sync} + } else { + fail "fail to sync with replicas" + } + + test {All replicas share one global replication buffer} { + set before_used [s used_memory] + populate 1024 "" 1024 ; # Write extra 1M data + # New data uses 1M memory, but all replicas use only one + # replication buffer, so all replicas output memory is not + # more than double of replication buffer. + set repl_buf_mem [s mem_total_replication_buffers] + set extra_mem [expr {[s used_memory]-$before_used-1024*1024}] + assert {$extra_mem < 2*$repl_buf_mem} + + # Kill replica1, replication_buffer will not become smaller + catch {$replica1 shutdown nosave} + wait_for_condition 50 100 { + [s connected_slaves] eq {2} + } else { + fail "replica doesn't disconnect with master" + } + assert_equal $repl_buf_mem [s mem_total_replication_buffers] + } + + test {Replication buffer will become smaller when no replica uses} { + # Make sure replica3 catch up with the master + wait_for_ofs_sync $master $replica3 + + set repl_buf_mem [s mem_total_replication_buffers] + # Kill replica2, replication_buffer will become smaller + catch {$replica2 shutdown nosave} + wait_for_condition 50 100 { + [s connected_slaves] eq {1} + } else { + fail "replica2 doesn't disconnect with master" + } + assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]} + } +} +} +} +} + +# This test group aims to test replication backlog size can outgrow the backlog +# limit config if there is a slow replica which keep massive replication buffers, +# and replicas could use this replication buffer (beyond backlog config) for +# partial re-synchronization. Of course, replication backlog memory also can +# become smaller when master disconnects with slow replicas since output buffer +# limit is reached. +start_server {tags {"repl external:skip"}} { +start_server {} { +start_server {} { + set replica1 [srv -2 client] + set replica1_pid [s -2 process_id] + set replica2 [srv -1 client] + set replica2_pid [s -1 process_id] + + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + $master config set save "" + $master config set repl-backlog-size 16384 + $master config set client-output-buffer-limit "replica 0 0 0" + $replica1 replicaof $master_host $master_port + wait_for_sync $replica1 + + test {Replication backlog size can outgrow the backlog limit config} { + # Generating RDB will take 1000 seconds + $master config set rdb-key-save-delay 1000000 + populate 1000 master 10000 + $replica2 replicaof $master_host $master_port + # Make sure replica2 is waiting bgsave + wait_for_condition 5000 100 { + ([s rdb_bgsave_in_progress] == 1) && + [lindex [$replica2 role] 3] eq {sync} + } else { + fail "fail to sync with replicas" + } + # Replication actual backlog grow more than backlog setting since + # the slow replica2 kept replication buffer. + populate 10000 master 10000 + assert {[s repl_backlog_histlen] > [expr 10000*10000]} + } + + # Wait replica1 catch up with the master + wait_for_condition 1000 100 { + [s -2 master_repl_offset] eq [s master_repl_offset] + } else { + fail "Replica offset didn't catch up with the master after too long time" + } + + test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} { + # replica1 disconnects with master + $replica1 replicaof [srv -1 host] [srv -1 port] + # Write a mass of data that exceeds repl-backlog-size + populate 10000 master 10000 + # replica1 reconnects with master + $replica1 replicaof $master_host $master_port + wait_for_condition 1000 100 { + [s -2 master_repl_offset] eq [s master_repl_offset] + } else { + fail "Replica offset didn't catch up with the master after too long time" + } + + # replica2 still waits for bgsave ending + assert {[s rdb_bgsave_in_progress] eq {1} && [lindex [$replica2 role] 3] eq {sync}} + # master accepted replica1 partial resync + assert_equal [s sync_partial_ok] {1} + assert_equal [$master debug digest] [$replica1 debug digest] + } + + test {Replication backlog memory will become smaller if disconnecting with replica} { + assert {[s repl_backlog_histlen] > [expr 2*10000*10000]} + assert_equal [s connected_slaves] {2} + + exec kill -SIGSTOP $replica2_pid + r config set client-output-buffer-limit "replica 128k 0 0" + # trigger output buffer limit check + r set key [string repeat A [expr 64*1024]] + # master will close replica2's connection since replica2's output + # buffer limit is reached, so there only is replica1. + wait_for_condition 100 100 { + [s connected_slaves] eq {1} + } else { + fail "master didn't disconnect with replica2" + } + + # Since we trim replication backlog inrementally, replication backlog + # memory may take time to be reclaimed. + wait_for_condition 1000 100 { + [s repl_backlog_histlen] < [expr 10000*10000] + } else { + fail "Replication backlog memory is not smaller" + } + exec kill -SIGCONT $replica2_pid + } +} +} +} + +test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} { + start_server {tags {"repl external:skip"}} { + start_server {} { + r config set save "" + r config set repl-backlog-size 100mb + r config set client-output-buffer-limit "replica 512k 0 0" + + set replica [srv -1 client] + $replica replicaof [srv 0 host] [srv 0 port] + wait_for_sync $replica + + set big_str [string repeat A [expr 10*1024*1024]] ;# 10mb big string + r multi + r client kill type replica + r set key $big_str + r set key $big_str + r debug sleep 2 ;# wait for replica reconnecting + r exec + # When replica reconnects with master, master accepts partial resync, + # and don't close replica client even client output buffer limit is + # reached. + r set key $big_str ;# trigger output buffer limit check + wait_for_ofs_sync r $replica + # master accepted replica partial resync + assert_equal [s sync_full] {1} + assert_equal [s sync_partial_ok] {1} + + r multi + r set key $big_str + r set key $big_str + r exec + # replica's reply buffer size is more than client-output-buffer-limit but + # doesn't exceed repl-backlog-size, we don't close replica client. + wait_for_condition 1000 100 { + [s -1 master_repl_offset] eq [s master_repl_offset] + } else { + fail "Replica offset didn't catch up with the master after too long time" + } + assert_equal [s sync_full] {1} + assert_equal [s sync_partial_ok] {1} + } + } +} diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 3169245b1..e0529dcc3 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -527,8 +527,11 @@ test {diskless loading short read} { $master multi $master client kill type replica $master set asdf asdf - # the side effect of resizing the backlog is that it is flushed (16k is the min size) - $master config set repl-backlog-size [expr {16384 + $i}] + # fill replication backlog with new content + $master config set repl-backlog-size 16384 + for {set keyid 0} {$keyid < 10} {incr keyid} { + $master set "$keyid string_$keyid" [string repeat A 16384] + } $master exec } # wait for loading to stop (fail) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 48f8d7d47..dd2f1c970 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -43,6 +43,7 @@ set ::all_tests { integration/replication-3 integration/replication-4 integration/replication-psync + integration/replication-buffer integration/aof integration/rdb integration/corrupt-dump diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index a98bfdb37..2e84a908d 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -355,7 +355,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} $rd_master setrange key:0 0 [string repeat A $payload_len] } for {set k 0} {$k < $cmd_count} {incr k} { - #$rd_master read + $rd_master read } } else { for {set k 0} {$k < $cmd_count} {incr k} { @@ -382,12 +382,14 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} assert {$delta < $delta_max && $delta > -$delta_max} $master client kill type slave - set killed_used [s -1 used_memory] + set info_str [$master info memory] + set killed_used [getInfoProperty $info_str used_memory] + set killed_mem_not_counted_for_evict [getInfoProperty $info_str mem_not_counted_for_evict] set killed_slave_buf [s -1 mem_clients_slaves] - set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] # we need to exclude replies buffer and query buffer of slave from used memory after kill slave set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict - [slave_query_buffer $master]}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] + assert {[$master dbsize] == 100} assert {$killed_slave_buf == 0} assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max} diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index 5ba102284..8bc0f7cd4 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -107,8 +107,11 @@ tags "modules" { $master multi $master client kill type replica $master set asdf asdf - # the side effect of resizing the backlog is that it is flushed (16k is the min size) - $master config set repl-backlog-size [expr {16384 + $i}] + # fill replication backlog with new content + $master config set repl-backlog-size 16384 + for {set keyid 0} {$keyid < 10} {incr keyid} { + $master set "$keyid string_$keyid" [string repeat A 16384] + } $master exec } # wait for loading to stop (fail)