Introduce pending data for connection type

Introduce .has_pending_data and .process_pending_data for connection
type, and hide tlsHasPendingData() and tlsProcessPendingData(). Also
set .has_pending_data and .process_pending_data as NULL explicitly in
socket.c.

Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
This commit is contained in:
zhenwei pi 2022-07-27 10:39:49 +08:00
parent 8234a5123d
commit 709b55b09d
5 changed files with 57 additions and 17 deletions

View File

@ -100,3 +100,35 @@ int connTypeConfigure(int type, void *priv, int reconfigure) {
return C_ERR;
}
/* walk all the connection types until has pending data */
int connTypeHasPendingData(void) {
ConnectionType *ct;
int type;
int ret = 0;
for (type = 0; type < CONN_TYPE_MAX; type++) {
ct = connTypes[type];
if (ct && ct->has_pending_data && (ret = ct->has_pending_data())) {
return ret;
}
}
return ret;
}
/* walk all the connection types and process pending data for each connection type */
int connTypeProcessPendingData(void) {
ConnectionType *ct;
int type;
int ret = 0;
for (type = 0; type < CONN_TYPE_MAX; type++) {
ct = connTypes[type];
if (ct && ct->process_pending_data) {
ret += ct->process_pending_data();
}
}
return ret;
}

View File

@ -91,6 +91,10 @@ typedef struct ConnectionType {
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
/* pending data */
int (*has_pending_data)(void);
int (*process_pending_data)(void);
} ConnectionType;
struct connection {
@ -332,8 +336,6 @@ int connRecvTimeout(connection *conn, long long ms);
/* Helpers for tls special considerations */
sds connTLSGetPeerCert(connection *conn);
int tlsHasPendingData();
int tlsProcessPendingData();
/* Initialize the redis connection framework */
int connTypeInitialize();
@ -352,6 +354,12 @@ void connTypeCleanup(int type);
/* Walk all the connection type, and cleanup them all if possible */
void connTypeCleanupAll();
/* Test all the connection type has pending data or not. */
int connTypeHasPendingData(void);
/* walk all the connection types and process pending data for each connection type */
int connTypeProcessPendingData(void);
int RedisRegisterConnectionTypeSocket();
int RedisRegisterConnectionTypeTLS();

View File

@ -1535,7 +1535,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
processed += connTypeProcessPendingData();
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
@ -1550,11 +1550,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
tlsProcessPendingData();
/* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */
connTypeProcessPendingData();
/* If tls still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, tlsHasPendingData());
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, connTypeHasPendingData());
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),

View File

@ -379,6 +379,10 @@ ConnectionType CT_Socket = {
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
/* pending data */
.has_pending_data = NULL,
.process_pending_data = NULL,
};
int connBlock(connection *conn) {

View File

@ -1021,13 +1021,13 @@ static int connTLSGetType(connection *conn_) {
return CONN_TYPE_TLS;
}
int tlsHasPendingData() {
static int tlsHasPendingData() {
if (!pending_list)
return 0;
return listLength(pending_list) > 0;
}
int tlsProcessPendingData() {
static int tlsProcessPendingData() {
listIter li;
listNode *ln;
@ -1095,6 +1095,10 @@ ConnectionType CT_TLS = {
.sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine,
/* pending data */
.has_pending_data = tlsHasPendingData,
.process_pending_data = tlsProcessPendingData,
};
int RedisRegisterConnectionTypeTLS()
@ -1120,14 +1124,6 @@ connection *connCreateAcceptedTLS(int fd, int require_auth) {
return NULL;
}
int tlsHasPendingData() {
return 0;
}
int tlsProcessPendingData() {
return 0;
}
sds connTLSGetPeerCert(connection *conn_) {
(void) conn_;
return NULL;