
# Background The RDB file is usually generated and used once and seldom used again, but the content would reside in page cache until OS evicts it. A potential problem is that once the free memory exhausts, the OS have to reclaim some memory from page cache or swap anonymous page out, which may result in a jitters to the Redis service. Supposing an exact scenario, a high-capacity machine hosts many redis instances, and we're upgrading the Redis together. The page cache in host machine increases as RDBs are generated. Once the free memory drop into low watermark(which is more likely to happen in older Linux kernel like 3.10, before [watermark_scale_factor](https://lore.kernel.org/lkml/1455813719-2395-1-git-send-email-hannes@cmpxchg.org/) is introduced, the `low watermark` is linear to `min watermark`, and there'is not too much buffer space for `kswapd` to be wake up to reclaim memory), a `direct reclaim` happens, which means the process would stall to wait for memory allocation. # What the PR does The PR introduces a capability to reclaim the cache when the RDB is operated. Generally there're two cases, read and write the RDB. For read it's a little messy to address the incremental reclaim, so the reclaim is done in one go in background after the load is finished to avoid blocking the work thread. For write, incremental reclaim amortizes the work of reclaim so no need to put it into background, and the peak watermark of cache can be reduced in this way. Two cases are addresses specially, replication and restart, for both of which the cache is leveraged to speed up the processing, so the reclaim is postponed to a right time. To do this, a flag is added to`rdbSave` and `rdbLoad` to control whether the cache need to be kept, with the default value false. # Something deserve noting 1. Though `posix_fadvise` is the POSIX standard, but only few platform support it, e.g. Linux, FreeBSD 10.0. 2. In Linux `posix_fadvise` only take effect on writeback-ed pages, so a `sync`(or `fsync`, `fdatasync`) is needed to flush the dirty page before `posix_fadvise` if we reclaim write cache. # About test A unit test is added to verify the effect of `posix_fadvise`. In integration test overall cache increase is checked, as well as the cache backed by RDB as a specific TCL test is executed in isolated Github action job.
186 lines
6.8 KiB
C
186 lines
6.8 KiB
C
/*
|
|
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
|
|
* Copyright (c) 2009-2019, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
|
|
#ifndef __REDIS_RIO_H
|
|
#define __REDIS_RIO_H
|
|
|
|
#include <stdio.h>
|
|
#include <stdint.h>
|
|
#include "sds.h"
|
|
#include "connection.h"
|
|
|
|
#define RIO_FLAG_READ_ERROR (1<<0)
|
|
#define RIO_FLAG_WRITE_ERROR (1<<1)
|
|
|
|
#define RIO_TYPE_FILE (1<<0)
|
|
#define RIO_TYPE_BUFFER (1<<1)
|
|
#define RIO_TYPE_CONN (1<<2)
|
|
#define RIO_TYPE_FD (1<<3)
|
|
|
|
struct _rio {
|
|
/* Backend functions.
|
|
* Since this functions do not tolerate short writes or reads the return
|
|
* value is simplified to: zero on error, non zero on complete success. */
|
|
size_t (*read)(struct _rio *, void *buf, size_t len);
|
|
size_t (*write)(struct _rio *, const void *buf, size_t len);
|
|
off_t (*tell)(struct _rio *);
|
|
int (*flush)(struct _rio *);
|
|
/* The update_cksum method if not NULL is used to compute the checksum of
|
|
* all the data that was read or written so far. The method should be
|
|
* designed so that can be called with the current checksum, and the buf
|
|
* and len fields pointing to the new block of data to add to the checksum
|
|
* computation. */
|
|
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
|
|
|
|
/* The current checksum and flags (see RIO_FLAG_*) */
|
|
uint64_t cksum, flags;
|
|
|
|
/* number of bytes read or written */
|
|
size_t processed_bytes;
|
|
|
|
/* maximum single read or write chunk size */
|
|
size_t max_processing_chunk;
|
|
|
|
/* Backend-specific vars. */
|
|
union {
|
|
/* In-memory buffer target. */
|
|
struct {
|
|
sds ptr;
|
|
off_t pos;
|
|
} buffer;
|
|
/* Stdio file pointer target. */
|
|
struct {
|
|
FILE *fp;
|
|
off_t buffered; /* Bytes written since last fsync. */
|
|
off_t autosync; /* fsync after 'autosync' bytes written. */
|
|
unsigned reclaim_cache:1; /* A flag to indicate reclaim cache after fsync */
|
|
} file;
|
|
/* Connection object (used to read from socket) */
|
|
struct {
|
|
connection *conn; /* Connection */
|
|
off_t pos; /* pos in buf that was returned */
|
|
sds buf; /* buffered data */
|
|
size_t read_limit; /* don't allow to buffer/read more than that */
|
|
size_t read_so_far; /* amount of data read from the rio (not buffered) */
|
|
} conn;
|
|
/* FD target (used to write to pipe). */
|
|
struct {
|
|
int fd; /* File descriptor. */
|
|
off_t pos;
|
|
sds buf;
|
|
} fd;
|
|
} io;
|
|
};
|
|
|
|
typedef struct _rio rio;
|
|
|
|
/* The following functions are our interface with the stream. They'll call the
|
|
* actual implementation of read / write / tell, and will update the checksum
|
|
* if needed. */
|
|
|
|
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
|
|
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
|
|
while (len) {
|
|
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
|
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
|
|
if (r->write(r,buf,bytes_to_write) == 0) {
|
|
r->flags |= RIO_FLAG_WRITE_ERROR;
|
|
return 0;
|
|
}
|
|
buf = (char*)buf + bytes_to_write;
|
|
len -= bytes_to_write;
|
|
r->processed_bytes += bytes_to_write;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
static inline size_t rioRead(rio *r, void *buf, size_t len) {
|
|
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
|
|
while (len) {
|
|
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
|
|
if (r->read(r,buf,bytes_to_read) == 0) {
|
|
r->flags |= RIO_FLAG_READ_ERROR;
|
|
return 0;
|
|
}
|
|
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
|
|
buf = (char*)buf + bytes_to_read;
|
|
len -= bytes_to_read;
|
|
r->processed_bytes += bytes_to_read;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
static inline off_t rioTell(rio *r) {
|
|
return r->tell(r);
|
|
}
|
|
|
|
static inline int rioFlush(rio *r) {
|
|
return r->flush(r);
|
|
}
|
|
|
|
/* This function allows to know if there was a read error in any past
|
|
* operation, since the rio stream was created or since the last call
|
|
* to rioClearError(). */
|
|
static inline int rioGetReadError(rio *r) {
|
|
return (r->flags & RIO_FLAG_READ_ERROR) != 0;
|
|
}
|
|
|
|
/* Like rioGetReadError() but for write errors. */
|
|
static inline int rioGetWriteError(rio *r) {
|
|
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
|
|
}
|
|
|
|
static inline void rioClearErrors(rio *r) {
|
|
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
|
|
}
|
|
|
|
void rioInitWithFile(rio *r, FILE *fp);
|
|
void rioInitWithBuffer(rio *r, sds s);
|
|
void rioInitWithConn(rio *r, connection *conn, size_t read_limit);
|
|
void rioInitWithFd(rio *r, int fd);
|
|
|
|
void rioFreeFd(rio *r);
|
|
void rioFreeConn(rio *r, sds* out_remainingBufferedData);
|
|
|
|
size_t rioWriteBulkCount(rio *r, char prefix, long count);
|
|
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
|
|
size_t rioWriteBulkLongLong(rio *r, long long l);
|
|
size_t rioWriteBulkDouble(rio *r, double d);
|
|
|
|
struct redisObject;
|
|
int rioWriteBulkObject(rio *r, struct redisObject *obj);
|
|
|
|
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
|
|
void rioSetAutoSync(rio *r, off_t bytes);
|
|
void rioSetReclaimCache(rio *r, int enabled);
|
|
uint8_t rioCheckType(rio *r);
|
|
#endif
|