Diskless replica: a few aesthetic changes to rio.c

This commit is contained in:
antirez 2019-07-08 18:39:59 +02:00
parent 81b18fa3a0
commit dfcbeaf115

View File

@ -157,7 +157,11 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0; r->io.file.autosync = 0;
} }
/* ------------------- File descriptor implementation ------------------- */ /* ------------------- File descriptor implementation -------------------
* We use this RIO implemetnation when reading an RDB file directly from
* the socket to the memory via rdbLoadRio(), thus this implementation
* only implements reading from a file descriptor that is, normally,
* just a socket. */
static size_t rioFdWrite(rio *r, const void *buf, size_t len) { static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
UNUSED(r); UNUSED(r);
@ -170,27 +174,28 @@ static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
static size_t rioFdRead(rio *r, void *buf, size_t len) { static size_t rioFdRead(rio *r, void *buf, size_t len) {
size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
/* if the buffer is too small for the entire request: realloc */ /* If the buffer is too small for the entire request: realloc. */
if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
/* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ /* If the remaining unused buffer is not large enough: memmove so that we
* can read the rest. */
if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
sdsrange(r->io.fd.buf, r->io.fd.pos, -1); sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
r->io.fd.pos = 0; r->io.fd.pos = 0;
} }
/* if we don't already have all the data in the sds, read more */ /* If we don't already have all the data in the sds, read more */
while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
size_t toread = len - buffered; size_t toread = len - buffered;
/* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */ /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of
if (toread < PROTO_IOBUF_LEN) * the two. */
toread = PROTO_IOBUF_LEN; if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN;
if (toread > sdsavail(r->io.fd.buf)) if (toread > sdsavail(r->io.fd.buf)) toread = sdsavail(r->io.fd.buf);
toread = sdsavail(r->io.fd.buf);
if (r->io.fd.read_limit != 0 && if (r->io.fd.read_limit != 0 &&
r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) { r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit)
{
if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
else { else {
@ -198,7 +203,9 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) {
return 0; return 0;
} }
} }
int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread); int retval = read(r->io.fd.fd,
(char*)r->io.fd.buf + sdslen(r->io.fd.buf),
toread);
if (retval <= 0) { if (retval <= 0) {
if (errno == EWOULDBLOCK) errno = ETIMEDOUT; if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
return 0; return 0;
@ -237,8 +244,8 @@ static const rio rioFdIO = {
{ { NULL, 0 } } /* union for io-specific vars */ { { NULL, 0 } } /* union for io-specific vars */
}; };
/* create an rio that implements a buffered read from an fd /* Create an RIO that implements a buffered read from an fd
* read_limit argument stops buffering when the reaching the limit */ * read_limit argument stops buffering when the reaching the limit. */
void rioInitWithFd(rio *r, int fd, size_t read_limit) { void rioInitWithFd(rio *r, int fd, size_t read_limit) {
*r = rioFdIO; *r = rioFdIO;
r->io.fd.fd = fd; r->io.fd.fd = fd;
@ -249,24 +256,24 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) {
sdsclear(r->io.fd.buf); sdsclear(r->io.fd.buf);
} }
/* release the rio stream. /* Release the RIO tream. Optionally returns the unread buffered data
* optionally returns the unread buffered data. */ * when the SDS pointer 'remaining' is passed. */
void rioFreeFd(rio *r, sds *out_remainingBufferedData) { void rioFreeFd(rio *r, sds *remaining) {
if (out_remainingBufferedData && if (remaining && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
(size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
{ *remaining = r->io.fd.buf;
if (r->io.fd.pos > 0)
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
*out_remainingBufferedData = r->io.fd.buf;
} else { } else {
sdsfree(r->io.fd.buf); sdsfree(r->io.fd.buf);
if (out_remainingBufferedData) if (out_remainingBufferedData) *remaining = NULL;
*out_remainingBufferedData = NULL;
} }
r->io.fd.buf = NULL; r->io.fd.buf = NULL;
} }
/* ------------------- File descriptors set implementation ------------------ */ /* ------------------- File descriptors set implementation ------------------
* This target is used to write the RDB file to N different replicas via
* sockets, when the master just streams the data to the replicas without
* creating an RDB on-disk image (diskless replication option).
* It only implements writes. */
/* Returns 1 or 0 for success/failure. /* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write * The function returns success as long as we are able to correctly write