diskless replication rdb transfer uses pipe, and writes to sockets form the parent process.

misc:
- handle SSL_has_pending by iterating though these in beforeSleep, and setting timeout of 0 to aeProcessEvents
- fix issue with epoll signaling EPOLLHUP and EPOLLERR only to the write handlers. (needed to detect the rdb pipe was closed)
- add key-load-delay config for testing
- trim connShutdown which is no longer needed
- rioFdsetWrite -> rioFdWrite - simplified since there's no longer need to write to multiple FDs
- don't detect rdb child exited (don't call wait3) until we detect the pipe is closed
- Cleanup bad optimization from rio.c, add another one
This commit is contained in:
Oran Agra 2019-08-11 16:07:53 +03:00 committed by Yossi Gottlieb
parent b087dd1db6
commit 5a47794606
17 changed files with 582 additions and 355 deletions

21
TLS.md
View File

@ -81,23 +81,6 @@ implementation details between TLS and TCP.
difficult, but there are probably other good reasons to improve that part difficult, but there are probably other good reasons to improve that part
anyway. anyway.
5. A mechanism to re-trigger read callbacks for connections with unread buffers
(the case of reading partial TLS frames):
a) Before sleep should iterate connections looking for those with a read handler,
SSL_pending() != 0 and no read event.
b) If found, trigger read handler for these conns.
c) After iteration if this state persists, epoll should be called in a way
that won't block so the process continues and this behave the same as a
level trigerred epoll.
Replication
-----------
Diskless master replication is broken, until child/parent connection proxying is
implemented.
TLS Features TLS Features
------------ ------------
@ -119,6 +102,10 @@ most actions.
This will need to be cleaned up for proper TLS support. The best approach is This will need to be cleaned up for proper TLS support. The best approach is
probably to migrate to hiredis async mode. probably to migrate to hiredis async mode.
redis-cli
---------
1. Support tls in --slave and --rdb
Others Others
------ ------

View File

@ -76,6 +76,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1; eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL; eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL; eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err; if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the /* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */ * vector with it. */
@ -97,6 +98,14 @@ int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize; return eventLoop->setsize;
} }
/* Tells the next iteration/s of the event processing to set timeout of 0. */
void aeDontWait(aeEventLoop *eventLoop, int noWait) {
if (noWait)
eventLoop->flags |= AE_DONT_WAIT;
else
eventLoop->flags &= ~AE_DONT_WAIT;
}
/* Resize the maximum set size of the event loop. /* Resize the maximum set size of the event loop.
* If the requested set size is smaller than the current set size, but * If the requested set size is smaller than the current set size, but
* there is already a file descriptor in use that is >= the requested * there is already a file descriptor in use that is >= the requested
@ -406,6 +415,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
} }
} }
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
/* Call the multiplexing API, will return only on timeout or when /* Call the multiplexing API, will return only on timeout or when
* some event fires. */ * some event fires. */
numevents = aeApiPoll(eventLoop, tvp); numevents = aeApiPoll(eventLoop, tvp);

View File

@ -106,6 +106,7 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */ void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep; aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop; } aeEventLoop;
/* Prototypes */ /* Prototypes */
@ -128,5 +129,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop); int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeDontWait(aeEventLoop *eventLoop, int noWait);
#endif #endif

View File

@ -121,8 +121,8 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask; eventLoop->fired[j].mask = mask;
} }

View File

@ -836,6 +836,8 @@ int loadAppendOnlyFile(char *filename) {
freeFakeClientArgv(fakeClient); freeFakeClientArgv(fakeClient);
fakeClient->cmd = NULL; fakeClient->cmd = NULL;
if (server.aof_load_truncated) valid_up_to = ftello(fp); if (server.aof_load_truncated) valid_up_to = ftello(fp);
if (server.key_load_delay)
usleep(server.key_load_delay);
} }
/* This point can only be reached when EOF is reached without errors. /* This point can only be reached when EOF is reached without errors.

View File

@ -522,6 +522,12 @@ void loadServerConfigFromString(char *config) {
err = "rdb-key-save-delay can't be negative"; err = "rdb-key-save-delay can't be negative";
goto loaderr; goto loaderr;
} }
} else if (!strcasecmp(argv[0],"key-load-delay") && argc==2) {
server.key_load_delay = atoi(argv[1]);
if (server.key_load_delay < 0) {
err = "key-load-delay can't be negative";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@ -1191,6 +1197,8 @@ void configSetCommand(client *c) {
"replica-priority",server.slave_priority,0,INT_MAX) { "replica-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field( } config_set_numerical_field(
"rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) { "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
} config_set_numerical_field(
"key-load-delay",server.key_load_delay,0,LLONG_MAX) {
} config_set_numerical_field( } config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) { "slave-announce-port",server.slave_announce_port,0,65535) {
} config_set_numerical_field( } config_set_numerical_field(
@ -1452,6 +1460,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay); config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
config_get_numerical_field("key-load-delay",server.key_load_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive); config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */ /* Bool (yes/no) values */
@ -2272,6 +2281,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY); rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
rewriteConfigNumericalOption(state,"key-load-delay",server.key_load_delay,CONFIG_DEFAULT_KEY_LOAD_DELAY);
rewriteConfigStringOption(state,"tls-cert-file",server.tls_cert_file,NULL); rewriteConfigStringOption(state,"tls-cert-file",server.tls_cert_file,NULL);
rewriteConfigStringOption(state,"tls-key-file",server.tls_key_file,NULL); rewriteConfigStringOption(state,"tls-key-file",server.tls_key_file,NULL);
rewriteConfigStringOption(state,"tls-dh-params-file",server.tls_dh_params_file,NULL); rewriteConfigStringOption(state,"tls-dh-params-file",server.tls_dh_params_file,NULL);

View File

@ -140,10 +140,6 @@ void *connGetPrivateData(connection *conn) {
* move here as we implement additional connection types. * move here as we implement additional connection types.
*/ */
static int connSocketShutdown(connection *conn, int how) {
return shutdown(conn->fd, how);
}
/* Close the connection and free resources. */ /* Close the connection and free resources. */
static void connSocketClose(connection *conn) { static void connSocketClose(connection *conn) {
if (conn->fd != -1) { if (conn->fd != -1) {
@ -298,7 +294,6 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size,
ConnectionType CT_Socket = { ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler, .ae_handler = connSocketEventHandler,
.close = connSocketClose, .close = connSocketClose,
.shutdown = connSocketShutdown,
.write = connSocketWrite, .write = connSocketWrite,
.read = connSocketRead, .read = connSocketRead,
.accept = connSocketAccept, .accept = connSocketAccept,

View File

@ -55,7 +55,6 @@ typedef struct ConnectionType {
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len); int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len); int (*read)(struct connection *conn, void *buf, size_t buf_len);
int (*shutdown)(struct connection *conn, int how);
void (*close)(struct connection *conn); void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler); int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
@ -159,10 +158,6 @@ static inline void connClose(connection *conn) {
conn->type->close(conn); conn->type->close(conn);
} }
static inline int connShutdown(connection *conn, int how) {
return conn->type->shutdown(conn, how);
}
/* Returns the last error encountered by the connection, as a string. If no error, /* Returns the last error encountered by the connection, as a string. If no error,
* a NULL is returned. * a NULL is returned.
*/ */
@ -208,4 +203,8 @@ int connFormatPeer(connection *conn, char *buf, size_t buf_len);
int connSockName(connection *conn, char *ip, size_t ip_len, int *port); int connSockName(connection *conn, char *ip, size_t ip_len, int *port);
const char *connGetInfo(connection *conn, char *buf, size_t buf_len); const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
/* Helpers for tls special considerations */
int tlsHasPendingData();
void tlsProcessPendingData();
#endif /* __REDIS_CONNECTION_H */ #endif /* __REDIS_CONNECTION_H */

View File

@ -881,7 +881,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)", "Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); freeClient(connGetPrivateData(conn));
return; return;
} }
} }
@ -984,14 +984,21 @@ void unlinkClient(client *c) {
c->client_list_node = NULL; c->client_list_node = NULL;
} }
/* In the case of diskless replication the fork is writing to the /* Check if this is a replica waiting for diskless replication (rdb pipe),
* sockets and just closing the fd isn't enough, if we don't also * in which case it needs to be cleaned from that list */
* shutdown the socket the fork will continue to write to the slave if (c->flags & CLIENT_SLAVE &&
* and the salve will only find out that it was disconnected when c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
* it will finish reading the rdb. */ server.rdb_pipe_conns)
int need_shutdown = ((c->flags & CLIENT_SLAVE) && {
(c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)); int i;
if (need_shutdown) connShutdown(c->conn, SHUT_RDWR); for (i=0; i < server.rdb_pipe_numconns; i++) {
if (server.rdb_pipe_conns[i] == c->conn) {
rdbPipeWriteHandlerConnRemoved(c->conn);
server.rdb_pipe_conns[i] = NULL;
break;
}
}
}
connClose(c->conn); connClose(c->conn);
c->conn = NULL; c->conn = NULL;
} }
@ -1309,7 +1316,7 @@ int handleClientsWithPendingWrites(void) {
{ {
ae_flags |= AE_BARRIER; ae_flags |= AE_BARRIER;
} }
/* TODO: Handle write barriers in connection */ /* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) { if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
freeClientAsync(c); freeClientAsync(c);
} }

192
src/rdb.c
View File

@ -2211,6 +2211,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* own reference. */ * own reference. */
decrRefCount(key); decrRefCount(key);
} }
if (server.key_load_delay)
usleep(server.key_load_delay);
/* Reset the state that is key-specified and is populated by /* Reset the state that is key-specified and is populated by
* opcodes before the key, so that we start from scratch again. */ * opcodes before the key, so that we start from scratch again. */
@ -2306,8 +2308,6 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
* This function covers the case of RDB -> Salves socket transfers for * This function covers the case of RDB -> Salves socket transfers for
* diskless replication. */ * diskless replication. */
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
uint64_t *ok_slaves;
if (!bysignal && exitcode == 0) { if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE, serverLog(LL_NOTICE,
"Background RDB transfer terminated with success"); "Background RDB transfer terminated with success");
@ -2321,79 +2321,6 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_start = -1; server.rdb_save_time_start = -1;
/* If the child returns an OK exit code, read the set of slave client
* IDs and the associated status code. We'll terminate all the slaves
* in error state.
*
* If the process returned an error, consider the list of slaves that
* can continue to be empty, so that it's just a special case of the
* normal code path. */
ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
ok_slaves[0] = 0;
if (!bysignal && exitcode == 0) {
int readlen = sizeof(uint64_t);
if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
readlen)
{
readlen = ok_slaves[0]*sizeof(uint64_t)*2;
/* Make space for enough elements as specified by the first
* uint64_t element in the array. */
ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
if (readlen &&
read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
readlen) != readlen)
{
ok_slaves[0] = 0;
}
}
}
close(server.rdb_pipe_read_result_from_child);
close(server.rdb_pipe_write_result_to_parent);
/* We can continue the replication process with all the slaves that
* correctly received the full payload. Others are terminated. */
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
uint64_t j;
int errorcode = 0;
/* Search for the slave ID in the reply. In order for a slave to
* continue the replication process, we need to find it in the list,
* and it must have an error code set to 0 (which means success). */
for (j = 0; j < ok_slaves[0]; j++) {
if (slave->id == ok_slaves[2*j+1]) {
errorcode = ok_slaves[2*j+2];
break; /* Found in slaves list. */
}
}
if (j == ok_slaves[0] || errorcode != 0) {
serverLog(LL_WARNING,
"Closing slave %s: child->slave RDB transfer failed: %s",
replicationGetSlaveName(slave),
(errorcode == 0) ? "RDB transfer child aborted"
: strerror(errorcode));
freeClient(slave);
} else {
serverLog(LL_WARNING,
"Slave %s correctly received the streamed RDB file.",
replicationGetSlaveName(slave));
/* Restore the socket as non-blocking. */
connNonBlock(slave->conn);
connSendTimeout(slave->conn,0);
}
}
}
zfree(ok_slaves);
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET); updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
} }
@ -2425,9 +2352,6 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves /* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
connection **conns;
uint64_t *clientids;
int numconns;
listNode *ln; listNode *ln;
listIter li; listIter li;
pid_t childpid; pid_t childpid;
@ -2436,35 +2360,30 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
/* Before to fork, create a pipe that will be used in order to /* Even if the previous fork child exited, don't start a new one until we
* send back to the parent the IDs of the slaves that successfully * drained the pipe. */
* received all the writes. */ if (server.rdb_pipe_conns) return C_ERR;
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parant, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a contineous TLS state when the
* child terminates and parent takes over. */
if (pipe(pipefds) == -1) return C_ERR; if (pipe(pipefds) == -1) return C_ERR;
server.rdb_pipe_read_result_from_child = pipefds[0]; server.rdb_pipe_read = pipefds[0];
server.rdb_pipe_write_result_to_parent = pipefds[1]; server.rdb_pipe_write = pipefds[1];
anetNonBlock(NULL, server.rdb_pipe_read);
/* Collect the file descriptors of the slaves we want to transfer /* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */ * the RDB to, which are i WAIT_BGSAVE_START state. */
conns = zmalloc(sizeof(connection *)*listLength(server.slaves)); server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
/* We also allocate an array of corresponding client IDs. This will server.rdb_pipe_numconns = 0;
* be useful for the child process in order to build the report server.rdb_pipe_numconns_writing = 0;
* (sent via unix pipe) that will be sent to the parent. */
clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
numconns = 0;
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = ln->value; client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numconns] = slave->id; server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
conns[numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset()); replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
connBlock(slave->conn);
connSendTimeout(slave->conn,server.repl_timeout*1000);
} }
} }
@ -2474,16 +2393,15 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
if ((childpid = fork()) == 0) { if ((childpid = fork()) == 0) {
/* Child */ /* Child */
int retval; int retval;
rio slave_sockets; rio rdb;
rioInitWithConnset(&slave_sockets,conns,numconns); rioInitWithFd(&rdb,server.rdb_pipe_write);
zfree(conns);
closeListeningSockets(0); closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves"); redisSetProcTitle("redis-rdb-to-slaves");
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0) if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR; retval = C_ERR;
if (retval == C_OK) { if (retval == C_OK) {
@ -2497,48 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
server.child_info_data.cow_size = private_dirty; server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB); sendChildInfo(CHILD_INFO_TYPE_RDB);
/* If we are returning OK, at least one slave was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <slave[0].id> <slave[0].error> ...
*
* len, slave IDs, and slave errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the slave's client ID, so that the master
* can match the report with a specific slave, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numconns));
uint64_t *len = msg;
uint64_t *ids = len+1;
int j, msglen;
*len = numconns;
for (j = 0; j < numconns; j++) {
*ids++ = clientids[j];
*ids++ = slave_sockets.io.connset.state[j];
}
/* Write the message to the parent. If we have no good slaves or
* we are unable to transfer the message to the parent, we exit
* with an error so that the parent will abort the replication
* process with all the childre that were waiting. */
msglen = sizeof(uint64_t)*(1+2*numconns);
if (*len == 0 ||
write(server.rdb_pipe_write_result_to_parent,msg,msglen)
!= msglen)
{
retval = C_ERR;
}
zfree(msg);
} }
zfree(clientids); rioFreeFd(&rdb);
rioFreeConnset(&slave_sockets); close(server.rdb_pipe_write); /* wake up the reader, tell it we're done. */
exitFromChild((retval == C_OK) ? 0 : 1); exitFromChild((retval == C_OK) ? 0 : 1);
} else { } else {
/* Parent */ /* Parent */
@ -2552,17 +2431,16 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = ln->value; client *slave = ln->value;
int j; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
for (j = 0; j < numconns; j++) {
if (slave->id == clientids[j]) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break;
}
} }
} }
close(pipefds[0]); close(server.rdb_pipe_write);
close(pipefds[1]); close(server.rdb_pipe_read);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
closeChildInfoPipe(); closeChildInfoPipe();
} else { } else {
server.stat_fork_time = ustime()-start; server.stat_fork_time = ustime()-start;
@ -2574,10 +2452,12 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
server.rdb_save_time_start = time(NULL); server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid; server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(server.rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
updateDictResizePolicy(); updateDictResizePolicy();
} }
zfree(clientids);
zfree(conns);
return (childpid == -1) ? C_ERR : C_OK; return (childpid == -1) ? C_ERR : C_OK;
} }
return C_OK; /* Unreached. */ return C_OK; /* Unreached. */

View File

@ -884,7 +884,7 @@ void sendBulkToSlave(connection *conn) {
nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble)); nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) { if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s", serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
strerror(errno)); connGetLastError(conn));
freeClient(slave); freeClient(slave);
return; return;
} }
@ -911,7 +911,7 @@ void sendBulkToSlave(connection *conn) {
if ((nwritten = connWrite(conn,buf,buflen)) == -1) { if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) { if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s", serverLog(LL_WARNING,"Write error sending DB to replica: %s",
strerror(errno)); connGetLastError(conn));
freeClient(slave); freeClient(slave);
} }
return; return;
@ -926,6 +926,152 @@ void sendBulkToSlave(connection *conn) {
} }
} }
/* Remove one write handler from the list of connections waiting to be writable
* during rdb pipe transfer. */
void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn))
return;
connSetWriteHandler(conn, NULL);
server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) {
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
/* Called in diskless master during transfer of data from the rdb pipe, when
* the replica becomes writable again. */
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
int nwritten;
if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
if (connGetState(conn) == CONN_STATE_CONNECTED)
return; /* equivalent to EAGAIN */
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
return;
} else {
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff < server.rdb_pipe_bufflen)
return; /* more data to write.. */
}
rdbPipeWriteHandlerConnRemoved(conn);
}
/* When the the pipe serving diskless rdb transfer is drained (write end was
* closed), we can clean up all the temporary variables, and cleanup after the
* fork child. */
void RdbPipeCleanup() {
close(server.rdb_pipe_read);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
zfree(server.rdb_pipe_buff);
server.rdb_pipe_buff = NULL;
server.rdb_pipe_bufflen = 0;
/* Since we're avoiding to detect the child exited as long as the pipe is
* not drained, so now is the time to check. */
checkChildrenDone();
}
/* Called in diskless master, when there's data to read from the child's rdb pipe */
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask);
UNUSED(clientData);
UNUSED(eventLoop);
int i;
if (!server.rdb_pipe_buff)
server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
serverAssert(server.rdb_pipe_numconns_writing==0);
while (1) {
server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
if (server.rdb_pipe_bufflen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
for (i=0; i < server.rdb_pipe_numconns; i++) {
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
client *slave = connGetPrivateData(conn);
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
}
killRDBChild();
return;
}
if (server.rdb_pipe_bufflen == 0) {
/* EOF - write end was closed. */
int stillUp = 0;
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
for (i=0; i < server.rdb_pipe_numconns; i++)
{
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
stillUp++;
}
serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
RdbPipeCleanup();
return;
}
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++)
{
int nwritten;
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
client *slave = connGetPrivateData(conn);
if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
continue;
}
/* An error and still in connected state, is equivalent to EAGAIN */
slave->repldboff = 0;
} else {
slave->repldboff = nwritten;
server.stat_net_output_bytes += nwritten;
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
if (nwritten != server.rdb_pipe_bufflen) {
server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler);
}
stillAlive++;
}
if (stillAlive == 0) {
serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
killRDBChild();
RdbPipeCleanup();
}
/* Remove the pipe read handler if at least one write handler was set. */
if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
break;
}
}
}
/* This function is called at the end of every background saving, /* This function is called at the end of every background saving,
* or when the replication RDB transfer strategy is modified from * or when the replication RDB transfer strategy is modified from
* disk to socket or the other way around. * disk to socket or the other way around.

146
src/rio.c
View File

@ -272,85 +272,67 @@ void rioFreeConn(rio *r, sds *remaining) {
r->io.conn.buf = NULL; r->io.conn.buf = NULL;
} }
/* ------------------- File descriptors set implementation ------------------ /* ------------------- File descriptor implementation ------------------
* This target is used to write the RDB file to N different replicas via * This target is used to write the RDB file to pipe, when the master just
* sockets, when the master just streams the data to the replicas without * streams the data to the replicas without creating an RDB on-disk image
* creating an RDB on-disk image (diskless replication option). * (diskless replication option).
* It only implements writes. */ * It only implements writes. */
/* Returns 1 or 0 for success/failure. /* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write
* to at least one file descriptor.
* *
* When buf is NULL and len is 0, the function performs a flush operation * When buf is NULL and len is 0, the function performs a flush operation
* if there is some pending buffer, so this function is also used in order * if there is some pending buffer, so this function is also used in order
* to implement rioFdsetFlush(). */ * to implement rioFdFlush(). */
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
ssize_t retval; ssize_t retval;
int j;
unsigned char *p = (unsigned char*) buf; unsigned char *p = (unsigned char*) buf;
int doflush = (buf == NULL && len == 0); int doflush = (buf == NULL && len == 0);
/* To start we always append to our buffer. If it gets larger than /* For small writes, we rather keep the data in user-space buffer, and flush
* a given size, we actually write to the sockets. */ * it only when it grows. however for larger writes, we prefer to flush
if (len) { * any pre-existing buffer, and write the new one directly without reallocs
r->io.connset.buf = sdscatlen(r->io.connset.buf,buf,len); * and memory copying. */
len = 0; /* Prevent entering the while below if we don't flush. */ if (len > PROTO_IOBUF_LEN) {
if (sdslen(r->io.connset.buf) > PROTO_IOBUF_LEN) doflush = 1; /* First, flush any pre-existing buffered data. */
} if (sdslen(r->io.fd.buf)) {
if (rioFdWrite(r, NULL, 0) == 0)
if (doflush) { return 0;
p = (unsigned char*) r->io.connset.buf;
len = sdslen(r->io.connset.buf);
}
/* Write in little chunchs so that when there are big writes we
* parallelize while the kernel is sending data in background to
* the TCP socket. */
while(len) {
size_t count = len < 1024 ? len : 1024;
int broken = 0;
for (j = 0; j < r->io.connset.numconns; j++) {
if (r->io.connset.state[j] != 0) {
/* Skip FDs alraedy in error. */
broken++;
continue;
}
/* Make sure to write 'count' bytes to the socket regardless
* of short writes. */
size_t nwritten = 0;
while(nwritten != count) {
retval = connWrite(r->io.connset.conns[j],p+nwritten,count-nwritten);
if (retval <= 0) {
/* With blocking sockets, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
break;
}
nwritten += retval;
}
if (nwritten != count) {
/* Mark this FD as broken. */
r->io.connset.state[j] = errno;
if (r->io.connset.state[j] == 0) r->io.connset.state[j] = EIO;
}
} }
if (broken == r->io.connset.numconns) return 0; /* All the FDs in error. */ /* Write the new data, keeping 'p' and 'len' from the input. */
p += count; } else {
len -= count; if (len) {
r->io.connset.pos += count; r->io.fd.buf = sdscatlen(r->io.fd.buf,buf,len);
if (sdslen(r->io.fd.buf) > PROTO_IOBUF_LEN)
doflush = 1;
if (!doflush)
return 1;
}
/* Flusing the buffered data. set 'p' and 'len' accordintly. */
p = (unsigned char*) r->io.fd.buf;
len = sdslen(r->io.fd.buf);
} }
if (doflush) sdsclear(r->io.connset.buf); size_t nwritten = 0;
while(nwritten != len) {
retval = write(r->io.fd.fd,p+nwritten,len-nwritten);
if (retval <= 0) {
/* With blocking io, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
return 0; /* error. */
}
nwritten += retval;
}
r->io.fd.pos += len;
sdsclear(r->io.fd.buf);
return 1; return 1;
} }
/* Returns 1 or 0 for success/failure. */ /* Returns 1 or 0 for success/failure. */
static size_t rioFdsetRead(rio *r, void *buf, size_t len) { static size_t rioFdRead(rio *r, void *buf, size_t len) {
UNUSED(r); UNUSED(r);
UNUSED(buf); UNUSED(buf);
UNUSED(len); UNUSED(len);
@ -358,23 +340,23 @@ static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
} }
/* Returns read/write position in file. */ /* Returns read/write position in file. */
static off_t rioFdsetTell(rio *r) { static off_t rioFdTell(rio *r) {
return r->io.connset.pos; return r->io.fd.pos;
} }
/* Flushes any buffer to target device if applicable. Returns 1 on success /* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */ * and 0 on failures. */
static int rioFdsetFlush(rio *r) { static int rioFdFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a /* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */ * buffer set to NULL with a count of zero as a flush request. */
return rioFdsetWrite(r,NULL,0); return rioFdWrite(r,NULL,0);
} }
static const rio rioFdsetIO = { static const rio rioFdIO = {
rioFdsetRead, rioFdRead,
rioFdsetWrite, rioFdWrite,
rioFdsetTell, rioFdTell,
rioFdsetFlush, rioFdFlush,
NULL, /* update_checksum */ NULL, /* update_checksum */
0, /* current checksum */ 0, /* current checksum */
0, /* flags */ 0, /* flags */
@ -383,24 +365,16 @@ static const rio rioFdsetIO = {
{ { NULL, 0 } } /* union for io-specific vars */ { { NULL, 0 } } /* union for io-specific vars */
}; };
void rioInitWithConnset(rio *r, connection **conns, int numconns) { void rioInitWithFd(rio *r, int fd) {
int j; *r = rioFdIO;
r->io.fd.fd = fd;
*r = rioFdsetIO; r->io.fd.pos = 0;
r->io.connset.conns = zmalloc(sizeof(connection *)*numconns); r->io.fd.buf = sdsempty();
r->io.connset.state = zmalloc(sizeof(int)*numconns);
memcpy(r->io.connset.conns,conns,sizeof(connection *)*numconns);
for (j = 0; j < numconns; j++) r->io.connset.state[j] = 0;
r->io.connset.numconns = numconns;
r->io.connset.pos = 0;
r->io.connset.buf = sdsempty();
} }
/* release the rio stream. */ /* release the rio stream. */
void rioFreeConnset(rio *r) { void rioFreeFd(rio *r) {
zfree(r->io.connset.conns); sdsfree(r->io.fd.buf);
zfree(r->io.connset.state);
sdsfree(r->io.connset.buf);
} }
/* ---------------------------- Generic functions ---------------------------- */ /* ---------------------------- Generic functions ---------------------------- */

View File

@ -77,7 +77,7 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */ off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */ off_t autosync; /* fsync after 'autosync' bytes written. */
} file; } file;
/* Connection object */ /* Connection object (used to read from socket) */
struct { struct {
connection *conn; /* Connection */ connection *conn; /* Connection */
off_t pos; /* pos in buf that was returned */ off_t pos; /* pos in buf that was returned */
@ -85,14 +85,12 @@ struct _rio {
size_t read_limit; /* don't allow to buffer/read more than that */ size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */ size_t read_so_far; /* amount of data read from the rio (not buffered) */
} conn; } conn;
/* Multiple FDs target (used to write to N sockets). */ /* FD target (used to write to pipe). */
struct { struct {
connection **conns; /* Connections */ int fd; /* File descriptor. */
int *state; /* Error state of each fd. 0 (if ok) or errno. */
int numconns;
off_t pos; off_t pos;
sds buf; sds buf;
} connset; } fd;
} io; } io;
}; };
@ -161,9 +159,9 @@ static inline void rioClearErrors(rio *r) {
void rioInitWithFile(rio *r, FILE *fp); void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s); void rioInitWithBuffer(rio *r, sds s);
void rioInitWithConn(rio *r, connection *conn, size_t read_limit); void rioInitWithConn(rio *r, connection *conn, size_t read_limit);
void rioInitWithConnset(rio *r, connection **conns, int numconns); void rioInitWithFd(rio *r, int fd);
void rioFreeConnset(rio *r); void rioFreeFd(rio *r);
void rioFreeConn(rio *r, sds* out_remainingBufferedData); void rioFreeConn(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count); size_t rioWriteBulkCount(rio *r, char prefix, long count);

View File

@ -1746,6 +1746,48 @@ void updateCachedTime(void) {
server.daylight_active = tm.tm_isdst; server.daylight_active = tm.tm_isdst;
} }
void checkChildrenDone(void) {
int statloc;
pid_t pid;
/* If we have a diskless rdb child (note that we support only one concurrent
* child), we want to avoid collecting it's exit status and acting on it
* as long as we didn't finish to drain the pipe, since then we're at risk
* of starting a new fork and a new pipe before we're done with the previous
* one. */
if (server.rdb_child_pid != -1 && server.rdb_pipe_conns)
return;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
}
/* This is our timer interrupt, called server.hz times per second. /* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously. * Here is where we do a number of things that need to be done asynchronously.
* For instance: * For instance:
@ -1898,37 +1940,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) ldbPendingChildren())
{ {
int statloc; checkChildrenDone();
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
} else { } else {
/* If there is not a background saving/rewrite in progress check if /* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */ * we have to save/rewrite now. */
@ -2081,6 +2093,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads(); handleClientsWithPendingWritesUsingThreads();
/* TODO: How do i handle write barriers flag */
tlsProcessPendingData();
/* If tls already has pending unread data don't sleep at all. */
aeDontWait(server.el, tlsHasPendingData());
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(); freeClientsInAsyncFreeQueue();
@ -2280,6 +2297,7 @@ void initServerConfig(void) {
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
server.key_load_delay = CONFIG_DEFAULT_KEY_LOAD_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL; server.pidfile = NULL;
@ -2813,6 +2831,11 @@ void initServer(void) {
server.rdb_child_pid = -1; server.rdb_child_pid = -1;
server.aof_child_pid = -1; server.aof_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
server.rdb_pipe_buff = NULL;
server.rdb_pipe_bufflen = 0;
server.rdb_bgsave_scheduled = 0; server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1; server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1; server.child_info_pipe[1] = -1;

View File

@ -135,6 +135,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 #define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
#define CONFIG_DEFAULT_KEY_LOAD_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1 #define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1 #define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1
@ -1236,10 +1237,17 @@ struct redisServer {
int rdb_child_type; /* Type of save by active child. */ int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* C_OK or C_ERR */ int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_write; /* RDB pipes used to transfer the rdb */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ int rdb_pipe_read; /* data to the parent process in diskless repl. */
connection **rdb_pipe_conns; /* Connections which are currently the */
int rdb_pipe_numconns; /* target of diskless rdb fork child. */
int rdb_pipe_numconns_writing; /* Number of rdb conns with pending writes. */
char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */
int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */
int rdb_key_save_delay; /* Delay in microseconds between keys while int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing the RDB. (for testings) */ * writing the RDB. (for testings) */
int key_load_delay; /* Delay in microseconds between keys while
* loading aof or rdb. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */ /* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct { struct {
@ -1779,6 +1787,8 @@ void clearReplicationId2(void);
void chopReplicationBacklog(void); void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void); void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len); void feedReplicationBacklog(void *ptr, size_t len);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename); void startLoadingFile(FILE* fp, char* filename);
@ -1944,6 +1954,7 @@ unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void); const char *evictPolicyToString(void);
struct redisMemOverhead *getMemoryOverheadData(void); struct redisMemOverhead *getMemoryOverheadData(void);
void freeMemoryOverheadData(struct redisMemOverhead *mh); void freeMemoryOverheadData(struct redisMemOverhead *mh);
void checkChildrenDone(void);
#define RESTART_SERVER_NONE 0 #define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */

View File

@ -30,6 +30,7 @@
#include "server.h" #include "server.h"
#include "connhelpers.h" #include "connhelpers.h"
#include "adlist.h"
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
@ -41,6 +42,10 @@ extern ConnectionType CT_Socket;
SSL_CTX *redis_tls_ctx; SSL_CTX *redis_tls_ctx;
/* list of connections with pending data already read from the socket, but not
* served to the reader yet. */
static list *pending_list = NULL;
void tlsInit(void) { void tlsInit(void) {
ERR_load_crypto_strings(); ERR_load_crypto_strings();
SSL_load_error_strings(); SSL_load_error_strings();
@ -49,6 +54,8 @@ void tlsInit(void) {
if (!RAND_poll()) { if (!RAND_poll()) {
serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator.");
} }
pending_list = listCreate();
} }
int tlsConfigureServer(void) { int tlsConfigureServer(void) {
@ -188,6 +195,7 @@ typedef struct tls_connection {
int flags; int flags;
SSL *ssl; SSL *ssl;
char *ssl_error; char *ssl_error;
listNode *pending_list_node;
} tls_connection; } tls_connection;
connection *connCreateTLS(void) { connection *connCreateTLS(void) {
@ -288,11 +296,7 @@ void updateSSLEvent(tls_connection *conn) {
aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE);
} }
static void tlsHandleEvent(tls_connection *conn, int mask) {
static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
UNUSED(el);
UNUSED(fd);
tls_connection *conn = clientData;
int ret; int ret;
TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d", TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d",
@ -369,6 +373,15 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in
if ((mask & AE_READABLE) && conn->c.read_handler) { if ((mask & AE_READABLE) && conn->c.read_handler) {
if (!callHandler((connection *) conn, conn->c.read_handler)) return; if (!callHandler((connection *) conn, conn->c.read_handler)) return;
if (SSL_has_pending(conn->ssl)) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
conn->pending_list_node = listLast(pending_list);
}
} else if (conn->pending_list_node) {
listDelNode(pending_list, conn->pending_list_node);
conn->pending_list_node = NULL;
}
} }
if ((mask & AE_WRITABLE) && conn->c.write_handler) { if ((mask & AE_WRITABLE) && conn->c.write_handler) {
@ -382,6 +395,13 @@ static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, in
updateSSLEvent(conn); updateSSLEvent(conn);
} }
static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
UNUSED(el);
UNUSED(fd);
tls_connection *conn = clientData;
tlsHandleEvent(conn, mask);
}
static void connTLSClose(connection *conn_) { static void connTLSClose(connection *conn_) {
tls_connection *conn = (tls_connection *) conn_; tls_connection *conn = (tls_connection *) conn_;
@ -395,6 +415,11 @@ static void connTLSClose(connection *conn_) {
conn->ssl_error = NULL; conn->ssl_error = NULL;
} }
if (conn->pending_list_node) {
listDelNode(pending_list, conn->pending_list_node);
conn->pending_list_node = NULL;
}
CT_Socket.close(conn_); CT_Socket.close(conn_);
} }
@ -610,17 +635,6 @@ exit:
return nread; return nread;
} }
/* TODO: This is probably not the right thing to do, but as we handle proxying from child
* processes we'll probably not need any shutdown mechanism anyway so this is just a
* place holder for now.
*/
static int connTLSShutdown(connection *conn_, int how) {
UNUSED(how);
tls_connection *conn = (tls_connection *) conn_;
return SSL_shutdown(conn->ssl);
}
ConnectionType CT_TLS = { ConnectionType CT_TLS = {
.ae_handler = tlsEventHandler, .ae_handler = tlsEventHandler,
.accept = connTLSAccept, .accept = connTLSAccept,
@ -635,9 +649,25 @@ ConnectionType CT_TLS = {
.sync_write = connTLSSyncWrite, .sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead, .sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine, .sync_readline = connTLSSyncReadLine,
.shutdown = connTLSShutdown
}; };
int tlsHasPendingData() {
if (!pending_list)
return 0;
return listLength(pending_list) > 0;
}
void tlsProcessPendingData() {
listIter li;
listNode *ln;
listRewind(pending_list,&li);
while((ln = listNext(&li))) {
tls_connection *conn = listNodeValue(ln);
tlsHandleEvent(conn, AE_READABLE);
}
}
#else /* USE_OPENSSL */ #else /* USE_OPENSSL */
void tlsInit(void) { void tlsInit(void) {
@ -666,4 +696,11 @@ connection *connCreateAcceptedTLS(int fd, int require_auth) {
return NULL; return NULL;
} }
int tlsHasPendingData() {
return 0;
}
void tlsProcessPendingData() {
}
#endif #endif

View File

@ -29,9 +29,6 @@ start_server {tags {"repl"}} {
$slave slaveof $master_host $master_port $slave slaveof $master_host $master_port
test {Slave enters handshake} { test {Slave enters handshake} {
if {$::tls} {
fail "TLS with repl-diskless-sync not supported yet."
}
wait_for_condition 50 1000 { wait_for_condition 50 1000 {
[string match *handshake* [$slave role]] [string match *handshake* [$slave role]]
} else { } else {
@ -187,10 +184,6 @@ start_server {tags {"repl"}} {
} }
foreach mdl {no yes} { foreach mdl {no yes} {
if {$::tls && $mdl eq "yes"} {
puts "** Skipping test: TLS with repl-diskless-sync not supported yet."
continue
}
foreach sdl {disabled swapdb} { foreach sdl {disabled swapdb} {
start_server {tags {"repl"}} { start_server {tags {"repl"}} {
set master [srv 0 client] set master [srv 0 client]
@ -327,9 +320,6 @@ start_server {tags {"repl"}} {
} }
test {slave fails full sync and diskless load swapdb recoveres it} { test {slave fails full sync and diskless load swapdb recoveres it} {
if {$::tls} {
fail ""
}
start_server {tags {"repl"}} { start_server {tags {"repl"}} {
set slave [srv 0 client] set slave [srv 0 client]
set slave_host [srv 0 host] set slave_host [srv 0 host]
@ -397,10 +387,6 @@ test {slave fails full sync and diskless load swapdb recoveres it} {
} }
test {diskless loading short read} { test {diskless loading short read} {
if {$::tls} {
fail "TLS with repl-diskless-sync not supported yet."
}
start_server {tags {"repl"}} { start_server {tags {"repl"}} {
set replica [srv 0 client] set replica [srv 0 client]
set replica_host [srv 0 host] set replica_host [srv 0 host]
@ -480,3 +466,159 @@ test {diskless loading short read} {
} }
} }
# get current stime and utime metrics for a thread (since it's creation)
proc get_cpu_metrics { statfile } {
if { [ catch {
set fid [ open $statfile r ]
set data [ read $fid 1024 ]
::close $fid
set data [ split $data ]
;## number of jiffies it has been scheduled...
set utime [ lindex $data 13 ]
set stime [ lindex $data 14 ]
} err ] } {
error "assertion:can't parse /proc: $err"
}
set mstime [clock milliseconds]
return [ list $mstime $utime $stime ]
}
# compute %utime and %stime of a thread between two measurements
proc compute_cpu_usage {start end} {
set clock_ticks [exec getconf CLK_TCK]
# convert ms time to jiffies and calc delta
set dtime [ expr { ([lindex $end 0] - [lindex $start 0]) * double($clock_ticks) / 1000 } ]
set utime [ expr { [lindex $end 1] - [lindex $start 1] } ]
set stime [ expr { [lindex $end 2] - [lindex $start 2] } ]
set pucpu [ expr { ($utime / $dtime) * 100 } ]
set pscpu [ expr { ($stime / $dtime) * 100 } ]
return [ list $pucpu $pscpu ]
}
# test diskless rdb pipe with multiple replicas, which may drop half way
start_server {tags {"repl"}} {
set master [srv 0 client]
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 1
set master_host [srv 0 host]
set master_port [srv 0 port]
# put enough data in the db that the rdb file will be bigger than the socket buffers
# and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second
# we also need the replica to process requests during transfer (which it does only once in 2mb)
$master debug populate 10000 test 10000
$master config set rdbcompression no
foreach all_drop {no slow fast all} {
test "diskless $all_drop replicas drop during rdb pipe" {
set replicas {}
set replicas_alive {}
# start one replica that will read the rdb fast, and one that will be slow
start_server {} {
lappend replicas [srv 0 client]
lappend replicas_alive [srv 0 client]
start_server {} {
lappend replicas [srv 0 client]
lappend replicas_alive [srv 0 client]
# start replication
# it's enough for just one replica to be slow, and have it's write handler enabled
# so that the whole rdb generation process is bound to that
[lindex $replicas 0] config set repl-diskless-load swapdb
[lindex $replicas 0] config set key-load-delay 100
[lindex $replicas 0] replicaof $master_host $master_port
[lindex $replicas 1] replicaof $master_host $master_port
# wait for the replicas to start reading the rdb
# using the log file since the replica only responds to INFO once in 2mb
wait_for_log_message -1 "*Loading DB in memory*" 8 800 10
set master_statfile [format "/proc/%s/stat" [srv -2 pid]]
set master_start_metrics [get_cpu_metrics $master_statfile]
set start_time [clock seconds]
# wait a while so that the pipe socket writer will be
# blocked on write (since replica 0 is slow to read from the socket)
after 500
# add some command to be present in the command stream after the rdb.
$master incr $all_drop
# disconnect replicas depending on the current test
if {$all_drop == "all" || $all_drop == "fast"} {
exec kill [srv 0 pid]
set replicas_alive [lreplace $replicas_alive 1 1]
}
if {$all_drop == "all" || $all_drop == "slow"} {
exec kill [srv -1 pid]
set replicas_alive [lreplace $replicas_alive 0 0]
}
# wait for rdb child to exit
wait_for_condition 500 100 {
[s -2 rdb_bgsave_in_progress] == 0
} else {
fail "rdb child didn't terminate"
}
# make sure we got what we were aiming for, by looking for the message in the log file
if {$all_drop == "all"} {
wait_for_log_message -2 "*Diskless rdb transfer, last replica dropped, killing fork child*" 12 1 1
}
if {$all_drop == "no"} {
wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 2 replicas still up*" 12 1 1
}
if {$all_drop == "slow" || $all_drop == "fast"} {
wait_for_log_message -2 "*Diskless rdb transfer, done reading from pipe, 1 replicas still up*" 12 1 1
}
# make sure we don't have a busy loop going thought epoll_wait
set master_end_metrics [get_cpu_metrics $master_statfile]
set time_elapsed [expr {[clock seconds]-$start_time}]
set master_cpu [compute_cpu_usage $master_start_metrics $master_end_metrics]
set master_utime [lindex $master_cpu 0]
set master_stime [lindex $master_cpu 1]
if {$::verbose} {
puts "elapsed: $time_elapsed"
puts "master utime: $master_utime"
puts "master stime: $master_stime"
}
if {$all_drop == "all" || $all_drop == "slow"} {
assert {$master_utime < 30}
assert {$master_stime < 30}
}
if {$all_drop == "none" || $all_drop == "fast"} {
assert {$master_utime < 15}
assert {$master_stime < 15}
}
# verify the data integrity
foreach replica $replicas_alive {
# Wait that replicas acknowledge they are online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues.
wait_for_condition 50 100 {
[lindex [$replica role] 3] eq {connected}
} else {
fail "replicas still not connected after some time"
}
# Make sure that replicas and master have same
# number of keys
wait_for_condition 50 100 {
[$master dbsize] == [$replica dbsize]
} else {
fail "Different number of keys between master and replicas after too long time."
}
# Check digests
set digest [$master debug digest]
set digest0 [$replica debug digest]
assert {$digest ne 0000000000000000000000000000000000000000}
assert {$digest eq $digest0}
}
}
}
}
}
}