TLS: Implement support for write barrier.
This commit is contained in:
parent
5a47794606
commit
6b6294807c
10
TLS.md
10
TLS.md
@ -57,22 +57,18 @@ Connections
|
||||
Connection abstraction API is mostly done and seems to hold well for hiding
|
||||
implementation details between TLS and TCP.
|
||||
|
||||
1. Still need to implement the equivalent of AE_BARRIER. Because TLS
|
||||
socket-level read/write events don't correspond to logical operations, this
|
||||
should probably be done at the Read/Write handler level.
|
||||
|
||||
2. Multi-threading I/O is not supported. The main issue to address is the need
|
||||
1. Multi-threading I/O is not supported. The main issue to address is the need
|
||||
to manipulate AE based on OpenSSL return codes. We can either propagate this
|
||||
out of the thread, or explore ways of further optimizing MT I/O by having
|
||||
event loops that live inside the thread and borrow connections in/out.
|
||||
|
||||
3. Finish cleaning up the implementation. Make sure all error cases are handled
|
||||
2. Finish cleaning up the implementation. Make sure all error cases are handled
|
||||
and reflected into connection state, connection state validated before
|
||||
certain operations, etc.
|
||||
- Clean (non-errno) interface to report would-block.
|
||||
- Consistent error reporting.
|
||||
|
||||
4. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
|
||||
3. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
|
||||
socket blocking and configuring socket-level timeout. This means the timeout
|
||||
value may not be so accurate, and there would be a lot of syscall overhead.
|
||||
However I believe that getting rid of syncio completely in favor of pure
|
||||
|
@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) {
|
||||
* there is much to do about the whole server stopping for power problems
|
||||
* or alike */
|
||||
|
||||
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
|
||||
usleep(server.aof_flush_sleep);
|
||||
}
|
||||
|
||||
latencyStartMonitor(latency);
|
||||
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
|
||||
latencyEndMonitor(latency);
|
||||
|
@ -2277,7 +2277,7 @@ void clusterReadHandler(connection *conn) {
|
||||
* from event handlers that will do stuff with the same link later. */
|
||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||
connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */
|
||||
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
|
||||
|
||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||
|
||||
|
@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
|
||||
/* Register a write handler, to be called when the connection is writable.
|
||||
* If NULL, the existing handler is removed.
|
||||
*/
|
||||
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
|
||||
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
|
||||
if (func == conn->write_handler) return C_OK;
|
||||
|
||||
conn->write_handler = func;
|
||||
if (barrier)
|
||||
conn->flags |= CONN_FLAG_WRITE_BARRIER;
|
||||
else
|
||||
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
|
||||
if (!conn->write_handler)
|
||||
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
|
||||
else
|
||||
@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
|
||||
conn->conn_handler = NULL;
|
||||
}
|
||||
|
||||
/* Normally we execute the readable event first, and the writable
|
||||
* event laster. This is useful as sometimes we may be able
|
||||
* to serve the reply of a query immediately after processing the
|
||||
* query.
|
||||
*
|
||||
* However if WRITE_BARRIER is set in the mask, our application is
|
||||
* asking us to do the reverse: never fire the writable event
|
||||
* after the readable. In such a case, we invert the calls.
|
||||
* This is useful when, for instance, we want to do things
|
||||
* in the beforeSleep() hook, like fsynching a file to disk,
|
||||
* before replying to a client. */
|
||||
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
|
||||
|
||||
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
|
||||
int call_read = (mask & AE_READABLE) && conn->read_handler;
|
||||
|
||||
/* Handle normal I/O flows */
|
||||
if ((mask & AE_READABLE) && conn->read_handler) {
|
||||
if (!invert && call_read) {
|
||||
if (!callHandler(conn, conn->read_handler)) return;
|
||||
}
|
||||
if ((mask & AE_WRITABLE) && conn->write_handler) {
|
||||
/* Fire the writable event. */
|
||||
if (call_write) {
|
||||
if (!callHandler(conn, conn->write_handler)) return;
|
||||
}
|
||||
/* If we have to invert the call, fire the readable event now
|
||||
* after the writable one. */
|
||||
if (invert && call_read) {
|
||||
if (!callHandler(conn, conn->read_handler)) return;
|
||||
}
|
||||
}
|
||||
|
||||
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
|
||||
|
@ -47,6 +47,7 @@ typedef enum {
|
||||
|
||||
#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
|
||||
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
|
||||
#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */
|
||||
|
||||
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
|
||||
|
||||
@ -57,7 +58,7 @@ typedef struct ConnectionType {
|
||||
int (*read)(struct connection *conn, void *buf, size_t buf_len);
|
||||
void (*close)(struct connection *conn);
|
||||
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
|
||||
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
|
||||
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
|
||||
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
|
||||
const char *(*get_last_error)(struct connection *conn);
|
||||
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
|
||||
@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
|
||||
* If NULL, the existing handler is removed.
|
||||
*/
|
||||
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
|
||||
return conn->type->set_write_handler(conn, func);
|
||||
return conn->type->set_write_handler(conn, func, 0);
|
||||
}
|
||||
|
||||
/* Register a read handler, to be called when the connection is readable.
|
||||
@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
|
||||
return conn->type->set_read_handler(conn, func);
|
||||
}
|
||||
|
||||
/* Set a write handler, and possibly enable a write barrier, this flag is
|
||||
* cleared when write handler is changed or removed.
|
||||
* With barroer enabled, we never fire the event if the read handler already
|
||||
* fired in the same event loop iteration. Useful when you want to persist
|
||||
* things to disk before sending replies, and want to do that in a group fashion. */
|
||||
static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
|
||||
return conn->type->set_write_handler(conn, func, barrier);
|
||||
}
|
||||
|
||||
static inline void connClose(connection *conn) {
|
||||
conn->type->close(conn);
|
||||
}
|
||||
|
@ -319,6 +319,7 @@ void debugCommand(client *c) {
|
||||
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
|
||||
"SEGFAULT -- Crash the server with sigsegv.",
|
||||
"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.",
|
||||
"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing",
|
||||
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
|
||||
"STRUCTSIZE -- Return the size of different Redis core C structures.",
|
||||
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
|
||||
@ -595,6 +596,11 @@ NULL
|
||||
{
|
||||
server.active_expire_enabled = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") &&
|
||||
c->argc == 3)
|
||||
{
|
||||
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
|
||||
c->argc == 3)
|
||||
{
|
||||
|
@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) {
|
||||
/* If after the synchronous writes above we still have data to
|
||||
* output to the client, we need to install the writable handler. */
|
||||
if (clientHasPendingReplies(c)) {
|
||||
int ae_flags = AE_WRITABLE;
|
||||
int ae_barrier = 0;
|
||||
/* For the fsync=always policy, we want that a given FD is never
|
||||
* served for reading and writing in the same event loop iteration,
|
||||
* so that in the middle of receiving the query, and serving it
|
||||
* to the client, we'll call beforeSleep() that will do the
|
||||
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
||||
* actual fsync of AOF to disk. the write barrier ensures that. */
|
||||
if (server.aof_state == AOF_ON &&
|
||||
server.aof_fsync == AOF_FSYNC_ALWAYS)
|
||||
{
|
||||
ae_flags |= AE_BARRIER;
|
||||
ae_barrier = 1;
|
||||
}
|
||||
/* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
|
||||
if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
|
||||
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
|
||||
freeClientAsync(c);
|
||||
}
|
||||
}
|
||||
|
11
src/server.c
11
src/server.c
@ -2048,6 +2048,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
|
||||
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
|
||||
tlsProcessPendingData();
|
||||
/* If tls still has pending unread data don't sleep at all. */
|
||||
aeDontWait(server.el, tlsHasPendingData());
|
||||
|
||||
/* Call the Redis Cluster before sleep function. Note that this function
|
||||
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
||||
* so it's a good idea to call it before serving the unblocked clients
|
||||
@ -2093,11 +2098,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWritesUsingThreads();
|
||||
|
||||
/* TODO: How do i handle write barriers flag */
|
||||
tlsProcessPendingData();
|
||||
/* If tls already has pending unread data don't sleep at all. */
|
||||
aeDontWait(server.el, tlsHasPendingData());
|
||||
|
||||
/* Close clients that need to be closed asynchronous */
|
||||
freeClientsInAsyncFreeQueue();
|
||||
|
||||
@ -2286,6 +2286,7 @@ void initServerConfig(void) {
|
||||
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
|
||||
server.aof_rewrite_base_size = 0;
|
||||
server.aof_rewrite_scheduled = 0;
|
||||
server.aof_flush_sleep = 0;
|
||||
server.aof_last_fsync = time(NULL);
|
||||
server.aof_rewrite_time_last = -1;
|
||||
server.aof_rewrite_time_start = -1;
|
||||
|
@ -1192,6 +1192,7 @@ struct redisServer {
|
||||
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
|
||||
off_t aof_current_size; /* AOF current size. */
|
||||
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
|
||||
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
|
||||
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
|
||||
pid_t aof_child_pid; /* PID if rewriting process */
|
||||
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
|
||||
|
51
src/tls.c
51
src/tls.c
@ -361,18 +361,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
|
||||
conn->c.conn_handler = NULL;
|
||||
break;
|
||||
case CONN_STATE_CONNECTED:
|
||||
if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) {
|
||||
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
|
||||
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
|
||||
}
|
||||
{
|
||||
int call_read = ((mask & AE_READABLE) && conn->c.read_handler) ||
|
||||
((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE));
|
||||
int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) ||
|
||||
((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ));
|
||||
|
||||
if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) {
|
||||
/* Normally we execute the readable event first, and the writable
|
||||
* event laster. This is useful as sometimes we may be able
|
||||
* to serve the reply of a query immediately after processing the
|
||||
* query.
|
||||
*
|
||||
* However if WRITE_BARRIER is set in the mask, our application is
|
||||
* asking us to do the reverse: never fire the writable event
|
||||
* after the readable. In such a case, we invert the calls.
|
||||
* This is useful when, for instance, we want to do things
|
||||
* in the beforeSleep() hook, like fsynching a file to disk,
|
||||
* before replying to a client. */
|
||||
int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER;
|
||||
|
||||
if (!invert && call_read) {
|
||||
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
|
||||
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
|
||||
}
|
||||
|
||||
if ((mask & AE_READABLE) && conn->c.read_handler) {
|
||||
/* Fire the writable event. */
|
||||
if (call_write) {
|
||||
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
|
||||
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
|
||||
}
|
||||
|
||||
/* If we have to invert the call, fire the readable event now
|
||||
* after the writable one. */
|
||||
if (invert && call_read) {
|
||||
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
|
||||
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
|
||||
}
|
||||
|
||||
/* If SSL has pending that, already read from the socket, we're at
|
||||
* risk of not calling the read handler again, make sure to add it
|
||||
* to a list of pending connection that should be handled anyway. */
|
||||
if ((mask & AE_READABLE)) {
|
||||
if (SSL_has_pending(conn->ssl)) {
|
||||
if (!conn->pending_list_node) {
|
||||
listAddNodeTail(pending_list, conn);
|
||||
@ -384,10 +413,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
|
||||
}
|
||||
}
|
||||
|
||||
if ((mask & AE_WRITABLE) && conn->c.write_handler) {
|
||||
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -535,8 +562,12 @@ static const char *connTLSGetLastError(connection *conn_) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
|
||||
int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
|
||||
conn->write_handler = func;
|
||||
if (barrier)
|
||||
conn->flags |= CONN_FLAG_WRITE_BARRIER;
|
||||
else
|
||||
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
|
||||
updateSSLEvent((tls_connection *) conn);
|
||||
return C_OK;
|
||||
}
|
||||
|
@ -257,4 +257,35 @@ tags {"aof"} {
|
||||
r expire x -1
|
||||
}
|
||||
}
|
||||
|
||||
start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} appendfsync always}} {
|
||||
test {AOF fsync always barrier issue} {
|
||||
set rd [redis_deferring_client]
|
||||
# Set a sleep when aof is flushed, so that we have a chance to look
|
||||
# at the aof size and detect if the response of an incr command
|
||||
# arrives before the data was written (and hopefully fsynced)
|
||||
# We create a big reply, which will hopefully not have room in the
|
||||
# socket buffers, and will install a write handler, then we sleep
|
||||
# a big and issue the incr command, hoping that the last portion of
|
||||
# the output buffer write, and the processing of the incr will happen
|
||||
# in the same event loop cycle.
|
||||
# Since the socket buffers and timing are unpredictable, we fuzz this
|
||||
# test with slightly different sizes and sleeps a few times.
|
||||
for {set i 0} {$i < 10} {incr i} {
|
||||
r debug aof-flush-sleep 0
|
||||
r del x
|
||||
r setrange x [expr {int(rand()*5000000)+10000000}] x
|
||||
r debug aof-flush-sleep 500000
|
||||
set aof [file join [lindex [r config get dir] 1] appendonly.aof]
|
||||
set size1 [file size $aof]
|
||||
$rd get x
|
||||
after [expr {int(rand()*30)}]
|
||||
$rd incr new_value
|
||||
$rd read
|
||||
$rd read
|
||||
set size2 [file size $aof]
|
||||
assert {$size1 != $size2}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user