Reply offload

Signed-off-by: Alexander Shabanov <alexander.shabanov@gmail.com>
This commit is contained in:
Alexander Shabanov 2024-12-17 16:44:17 +00:00
parent 26c6f1af9b
commit 48b3740bee
19 changed files with 793 additions and 119 deletions

View File

@ -132,22 +132,27 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}
static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
static int canAddNetworkBytesOut(int slot) {
return clusterSlotStatsEnabled() && slot != -1;
}
/* Accumulates egress bytes for the slot. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
if (!canAddNetworkBytesOut(slot)) return;
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
}
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
}
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
if (c == NULL || !canAddNetworkBytesOut(c->slot)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
@ -174,24 +179,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
if (!canAddNetworkBytesOut(slot)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;
/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}
/* Adds reply for the ORDERBY variant.
@ -219,8 +214,7 @@ void clusterSlotStatResetAll(void) {
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
return clusterSlotStatsEnabled() &&
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
@ -248,7 +242,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}
@ -343,3 +337,8 @@ void clusterSlotStatsCommand(client *c) {
addReplySubcommandSyntaxError(c);
}
}
int clusterSlotStatsEnabled(void) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled; /* Cluster mode should be enabled. */
}

View File

@ -6,6 +6,7 @@
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
int clusterSlotStatsEnabled(void);
/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);
/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);

View File

@ -3192,6 +3192,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL),
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),

View File

@ -398,9 +398,13 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
/* If reply offload enabled force new header */
block->last_header = NULL;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
c->last_header = NULL;
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

View File

@ -119,6 +119,9 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
/* If reply offload enabled no need to prefetch value because main thread will not access it */
} else if (server.reply_offload_enabled) {
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}

View File

@ -42,6 +42,7 @@
#include <sys/uio.h>
#include <math.h>
#include <ctype.h>
#include <stdbool.h>
#include <stdatomic.h>
/* This struct is used to encapsulate filtering criteria for operations on clients
@ -66,6 +67,20 @@ typedef struct {
int skipme;
} clientFilter;
typedef enum {
CLIENT_REPLY_PAYLOAD_DATA = 0,
CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD,
} clientReplyPayloadType;
/* Reply payload header */
typedef struct __attribute__((__packed__)) payloadHeader {
size_t len; /* payload length in a reply buffer */
size_t actual_len; /* actual reply length after offload expanding */
uint8_t type; /* one of clientReplyPayloadType */
int16_t slot; /* to report network-bytes-out for offloads */
} payloadHeader;
static void setProtocolError(const char *errstr, client *c);
static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
@ -149,6 +164,24 @@ static inline int isReplicaReadyForReplData(client *replica) {
!(replica->flag.close_asap);
}
/*
* Reply offload can be allowed only for regular Valkey clients
* that use _writeToClient handler to write replies to client connection
*/
static bool isReplyOffloadAllowable(client *c) {
if (c->flag.fake) {
return false;
}
switch (getClientType(c)) {
case CLIENT_TYPE_NORMAL:
case CLIENT_TYPE_PUBSUB:
return true;
default:
return false;
}
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
@ -178,6 +211,7 @@ client *createClient(connection *conn) {
c->lib_name = NULL;
c->lib_ver = NULL;
c->bufpos = 0;
c->last_header = NULL;
c->buf_peak = c->buf_usable_size;
c->buf_peak_last_reset_time = server.unixtime;
c->qb_pos = 0;
@ -197,7 +231,6 @@ client *createClient(connection *conn) {
c->cur_script = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->raw_flag = 0;
c->capa = 0;
c->slot = -1;
@ -236,6 +269,9 @@ client *createClient(connection *conn) {
c->commands_processed = 0;
c->io_last_reply_block = NULL;
c->io_last_bufpos = 0;
c->io_last_written_buf = NULL;
c->io_last_written_bufpos = 0;
c->io_last_written_data_len = 0;
return c;
}
@ -280,6 +316,18 @@ void putClientInPendingWriteQueue(client *c) {
}
}
/*
* Activate/deactivate reply offload for the client
* according to server config
*/
static void updateReplyOffloadFlag(client *c) {
if (server.reply_offload_enabled && !c->flag.reply_offload && isReplyOffloadAllowable(c)) {
c->flag.reply_offload = 1;
} else if (!server.reply_offload_enabled && c->flag.reply_offload) {
c->flag.reply_offload = 0;
}
}
/* This function is called every time we are going to transmit new data
* to the client. The behavior is the following:
*
@ -326,7 +374,11 @@ int prepareClientToWrite(client *c) {
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
if (!clientHasPendingReplies(c)) {
/* We can change reply offload mode for the client only when its reply buffers are empty. */
updateReplyOffloadFlag(c);
putClientInPendingWriteQueue(c);
}
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
@ -378,6 +430,49 @@ void deleteCachedResponseClient(client *recording_client) {
/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
static inline void insertPayloadHeader(char *buf, size_t *bufpos, uint8_t type, size_t len, int slot, payloadHeader **last_header) {
/* Save the latest header */
*last_header = (payloadHeader *)(buf + *bufpos);
(*last_header)->type = type;
(*last_header)->len = len;
(*last_header)->slot = slot;
(*last_header)->actual_len = 0;
*bufpos += sizeof(payloadHeader);
}
static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type, size_t len, int slot) {
if (last_header->type == type && last_header->slot == slot) {
last_header->len += len;
return C_OK;
}
return C_ERR;
}
static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) {
/* Enforce min len for offloads as whole pointers must be written to the buffer */
size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1);
if (min_len > available) return 0;
size_t reply_len = min(available, len);
// If cluster slots stats disabled set slot to -1 to prevent excessive per slot headers
if (!clusterSlotStatsEnabled()) slot = -1;
/* Try to add payload to last chunk if possible */
if (*last_header != NULL) {
if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len;
}
/* Recheck min len condition and recalcuate allowed len with a new header to be added */
if (sizeof(payloadHeader) + min_len > available) return 0;
available -= sizeof(payloadHeader);
if (len > available) reply_len = available;
/* Start a new payload chunk */
insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header);
return reply_len;
}
/* Attempts to add the reply to the static buffer in the client struct.
* Returns the length of data that is added to the reply buffer.
@ -386,24 +481,36 @@ void deleteCachedResponseClient(client *recording_client) {
* zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses
* sanitizer and generates a false positive out-of-bounds error */
VALKEY_NO_SANITIZE("bounds")
size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = c->buf_usable_size - c->bufpos;
static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t len, uint8_t payload_type) {
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return 0;
size_t reply_len = len > available ? available : len;
memcpy(c->buf + c->bufpos, s, reply_len);
size_t available = c->buf_usable_size - c->bufpos;
size_t reply_len = min(available, len);
if (c->flag.reply_offload) {
reply_len = upsertPayloadHeader(c->buf, &c->bufpos, &c->last_header, payload_type, len, c->slot, available);
}
if (!reply_len) return 0;
memcpy(c->buf + c->bufpos, payload, reply_len);
c->bufpos += reply_len;
/* We update the buffer peak after appending the reply to the buffer */
if (c->buf_peak < (size_t)c->bufpos) c->buf_peak = (size_t)c->bufpos;
return reply_len;
}
size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA);
}
size_t _addBulkOffloadToBuffer(client *c, robj *obj) {
return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
}
/* Adds the reply to the reply linked list.
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
static void _addReplyPayloadToList(client *c, list *reply_list, const char *payload, size_t len, uint8_t payload_type) {
listNode *ln = listLast(reply_list);
clientReplyBlock *tail = ln ? listNodeValue(ln) : NULL;
@ -417,21 +524,32 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len
* new node */
size_t avail = tail->size - tail->used;
size_t copy = avail >= len ? len : avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
if (c->flag.reply_offload) {
copy = upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, avail);
}
if (copy) {
memcpy(tail->buf + tail->used, payload, copy);
tail->used += copy;
payload += copy;
len -= copy;
}
}
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len;
size_t required_size = c->flag.reply_offload ? len + sizeof(payloadHeader) : len;
size_t size = required_size < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : required_size;
tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
/* take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
tail->used = 0;
tail->last_header = NULL;
if (c->flag.reply_offload) {
upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, tail->size);
}
memcpy(tail->buf + tail->used, payload, len);
tail->used += len;
listAddNodeTail(reply_list, tail);
c->reply_bytes += tail->size;
@ -439,6 +557,14 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len
}
}
void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
_addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA);
}
void _addBulkOffloadToList(client *c, robj *obj) {
_addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
}
/* The subscribe / unsubscribe command family has a push as a reply,
* or in other words, it responds with a push (or several of them
* depending on how many arguments it got), and has no reply. */
@ -484,6 +610,17 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
if (len > reply_len) _addReplyProtoToList(c, c->reply, s + reply_len, len - reply_len);
}
void _addBulkOffloadToBufferOrList(client *c, robj *obj) {
if (c->flag.close_after_reply) return;
/* Refcount will be decremented in post write handler (i.e. in _postWriteToClient) */
incrRefCount(obj);
if (!_addBulkOffloadToBuffer(c, obj)) {
_addBulkOffloadToList(c, obj);
}
}
/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
@ -776,7 +913,7 @@ void trimReplyUnusedTailSpace(client *c) {
* Also, to avoid large memmove which happens as part of realloc, we only do
* that if the used part is small. */
if (tail->size - tail->used > tail->size / 4 && tail->used < PROTO_REPLY_CHUNK_BYTES &&
c->io_write_state != CLIENT_PENDING_IO) {
c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) {
size_t usable_size;
size_t old_size = tail->size;
tail = zrealloc_usable(tail, tail->used + sizeof(clientReplyBlock), &usable_size);
@ -838,7 +975,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
* - And not too large (avoid large memmove)
* - And the client is not in a pending I/O state */
if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 &&
c->io_write_state != CLIENT_PENDING_IO) {
c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) {
size_t len_to_copy = prev->size - prev->used;
if (len_to_copy > length) len_to_copy = length;
memcpy(prev->buf + prev->used, s, len_to_copy);
@ -852,7 +989,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
}
if (ln->next != NULL && (next = listNodeValue(ln->next)) && next->size - next->used >= length &&
next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO) {
next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) {
memmove(next->buf + length, next->buf, next->used);
memcpy(next->buf, s, length);
next->used += length;
@ -860,11 +997,18 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
} else {
/* Create a new node */
size_t usable_size;
clientReplyBlock *buf = zmalloc_usable(length + sizeof(clientReplyBlock), &usable_size);
size_t required_size = c->flag.reply_offload ? length + sizeof(payloadHeader) : length;
clientReplyBlock *buf = zmalloc_usable(required_size + sizeof(clientReplyBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
buf->size = usable_size - sizeof(clientReplyBlock);
buf->used = length;
memcpy(buf->buf, s, length);
buf->used = 0;
buf->last_header = 0;
if (c->flag.reply_offload) {
upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size);
}
memcpy(buf->buf + buf->used, s, length);
buf->used += length;
listNodeValue(ln) = buf;
c->reply_bytes += buf->size;
@ -1127,8 +1271,20 @@ void addReplyBulkLen(client *c, robj *obj) {
_addReplyLongLongWithPrefix(c, len, '$');
}
int tryOffloadBulkReply(client *c, robj *obj) {
if (!c->flag.reply_offload) return C_ERR;
if (obj->encoding != OBJ_ENCODING_RAW) return C_ERR;
if (obj->refcount == OBJ_STATIC_REFCOUNT) return C_ERR;
if (prepareClientToWrite(c) != C_OK) return C_ERR;
_addBulkOffloadToBufferOrList(c, obj);
return C_OK;
}
/* Add an Object as a bulk reply */
void addReplyBulk(client *c, robj *obj) {
if (tryOffloadBulkReply(c, obj) == C_OK) return;
addReplyBulkLen(c, obj);
addReply(c, obj);
addReplyProto(c, "\r\n", 2);
@ -1729,6 +1885,7 @@ void freeClient(client *c) {
freeClientPubSubData(c);
/* Free data structures. */
if (c->flag.reply_offload) releaseReplyOffloads(c);
listRelease(c->reply);
c->reply = NULL;
zfree_with_size(c->buf, c->buf_usable_size);
@ -1966,19 +2123,215 @@ void writeToReplica(client *c) {
}
}
/*
* The replyIOV struct is used by writevToClient to prepare iovec array
* for submitting to connWritev
*/
typedef struct replyIOV {
int cnt;
int max;
struct iovec *iov;
ssize_t iov_len_total; /* Total length of data pointed by iov array */
size_t last_written_len; /* Length of data in the last written buffer
* partially written in previous writevToClient invocation */
int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit
* reached during iov prepearation */
int offload_active;
int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields
* for expanding reply offloads */
char (*prefixes)[LONG_STR_SIZE + 3];
char *crlf;
} replyIOV;
/*
* The bufWriteMetadata struct is used by writevToClient to record metadata
* about client reply buffers submitted to connWritev
*/
typedef struct bufWriteMetadata {
char *buf;
size_t bufpos;
uint64_t data_len; /* Actual bytes out, differ from bufpos in case of reply offload */
int complete; /* Was the buffer completely scattered to iov or
process stopped due encountered limit */
} bufWriteMetadata;
static void initReplyIOV(client *c, int iovmax, struct iovec *iov_arr, char (*prefixes)[], char *crlf, replyIOV *reply) {
reply->cnt = 0;
reply->max = iovmax;
reply->limit_reached = 0;
reply->iov = iov_arr;
reply->iov_len_total = 0;
reply->last_written_len = c->io_last_written_data_len;
reply->offload_active = c->flag.reply_offload;
if (reply->offload_active) {
reply->prfxcnt = 0;
reply->prefixes = prefixes;
reply->crlf = crlf;
}
}
static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
if (reply->limit_reached) return;
if (reply->cnt == reply->max || reply->iov_len_total > NET_MAX_WRITES_PER_EVENT) {
reply->limit_reached = 1;
return;
}
/* Aggregate data len from the beginning of the buffer even though
* part of the data should be skipped in this round due to last_written_len */
metadata->data_len += buf_len;
if (reply->last_written_len >= buf_len) {
reply->last_written_len -= buf_len;
return;
}
reply->iov[reply->cnt].iov_base = buf + reply->last_written_len;
reply->iov[reply->cnt].iov_len = buf_len - reply->last_written_len;
reply->last_written_len = 0;
reply->iov_len_total += reply->iov[reply->cnt++].iov_len;
}
static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
while (buf_len > 0 && !reply->limit_reached) {
robj **obj = (robj **)buf;
char *str = (*obj)->ptr;
size_t str_len = stringObjectLen(*obj);
char* prefix = reply->prefixes[reply->prfxcnt];
prefix[0] = '$';
size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len);
prefix[num_len + 1] = '\r';
prefix[num_len + 2] = '\n';
int cnt = reply->cnt;
addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata);
/* Increment prfxcnt only if prefix was added to reply in this round */
if (reply->cnt > cnt) reply->prfxcnt++;
addPlainBufferToReplyIOV(str, str_len, reply, metadata);
addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata);
buf += sizeof(void*);
buf_len -= sizeof(void*);
}
}
static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) {
char *ptr = buf;
while (ptr < buf + bufpos && !reply->limit_reached) {
payloadHeader *header = (payloadHeader*)ptr;
ptr += sizeof(payloadHeader);
if (header->type == CLIENT_REPLY_PAYLOAD_DATA) {
addPlainBufferToReplyIOV(ptr, header->len, reply, metadata);
} else {
serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
uint64_t data_len = metadata->data_len;
addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata);
/* Store actual reply len for cluster slot stats */
header->actual_len = metadata->data_len - data_len;
}
ptr += header->len;
}
serverAssert(ptr <= buf + bufpos);
}
static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) {
metadata->data_len = 0;
if (reply->offload_active) {
addCompoundBufferToReplyIOV(buf, bufpos, reply, metadata);
metadata->complete = !reply->limit_reached;
} else {
addPlainBufferToReplyIOV(buf, bufpos, reply, metadata);
metadata->complete = 1;
}
metadata->buf = buf;
metadata->bufpos = bufpos;
}
static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) {
int last = bufcnt - 1;
if (totwritten == totlen) {
c->io_last_written_buf = metadata[last].buf;
/* Zero io_last_written_bufpos indicates buffer written incompletely */
c->io_last_written_bufpos = (metadata[last].complete ? metadata[last].bufpos : 0);
c->io_last_written_data_len = metadata[last].data_len;
return;
}
last = -1;
int64_t remaining = totwritten + c->io_last_written_data_len;
while (remaining > 0) remaining -= metadata[++last].data_len;
serverAssert(last < bufcnt);
c->io_last_written_buf = metadata[last].buf;
/* Zero io_last_written_bufpos indicates buffer written incompletely */
c->io_last_written_bufpos = (metadata[last].complete && remaining == 0 ? metadata[last].bufpos : 0);
c->io_last_written_data_len = (size_t)(metadata[last].data_len + remaining);
}
void proceedToUnwritten(replyIOV *reply, int nwritten) {
while (nwritten > 0) {
if ((size_t)nwritten < reply->iov[0].iov_len) {
reply->iov[0].iov_base = (char *)reply->iov[0].iov_base + nwritten;
reply->iov[0].iov_len -= nwritten;
break;
}
nwritten -= reply->iov[0].iov_len;
reply->iov++;
reply->cnt--;
}
}
/* This function should be called from _writeToClient when the reply list is not empty,
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned.
* Sets the c->nwritten to the number of bytes the server wrote to the client.
* Can be called from the main thread or an I/O thread */
* Can be called from the main thread or an I/O thread
*
* INTERNALS
* The writevToClient strives to write all client reply buffers to the client connection.
* However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case,
* some client reply buffers will be written completely and some partially.
* In next invocation writevToClient should resume from the exact position where it stopped.
* Also writevToClient should communicate to _postWriteToClient which buffers written completely
* and can be released. It is intricate in case of reply offloading as length of reply buffer does not match
* to network bytes out.
*
* For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters:
* io_last_written_buf - Last buffer that has been written to the client connection
* io_last_written_bufpos - The buffer has been written until this position
* io_last_written_data_len - The actual length of the data written from this buffer
* This length differs from written bufpos in case of reply offload
*
* The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV
* to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV
*
* In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len
* and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that
* helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply
* from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly
*
* The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers
* and release them
*
* */
static int writevToClient(client *c) {
int iovcnt = 0;
int iovmax = min(IOV_MAX, c->conn->iovcnt);
struct iovec iov_arr[iovmax];
struct iovec *iov = iov_arr;
ssize_t bufpos, iov_bytes_len = 0;
listNode *lastblock;
char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3];
char crlf[2] = {'\r', '\n'};
int bufcnt = 0;
bufWriteMetadata metadata[listLength(c->reply) + 1];
replyIOV reply;
initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply);
size_t bufpos = 0;
listNode *lastblock;
if (inMainThread()) {
lastblock = listLast(c->reply);
bufpos = c->bufpos;
@ -1990,22 +2343,16 @@ static int writevToClient(client *c) {
/* If the static reply buffer is not empty,
* add it to the iov array for writev() as well. */
if (bufpos > 0) {
iov[iovcnt].iov_base = c->buf + c->sentlen;
iov[iovcnt].iov_len = bufpos - c->sentlen;
iov_bytes_len += iov[iovcnt++].iov_len;
addBufferToReplyIOV(c->buf, bufpos, &reply, &metadata[bufcnt++]);
}
/* The first node of reply list might be incomplete from the last call,
* thus it needs to be calibrated to get the actual data address and length. */
size_t sentlen = bufpos > 0 ? 0 : c->sentlen;
listIter iter;
listNode *next;
clientReplyBlock *o;
size_t used;
listRewind(c->reply, &iter);
while ((next = listNext(&iter)) && iovcnt < iovmax && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) {
o = listNodeValue(next);
while ((next = listNext(&iter)) && !reply.limit_reached) {
clientReplyBlock *o = listNodeValue(next);
used = o->used;
size_t used = o->used;
/* Use c->io_last_bufpos as the currently used portion of the block.
* We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the
* current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data
@ -2014,23 +2361,22 @@ static int writevToClient(client *c) {
if (used == 0) { /* empty node, skip over it. */
if (next == lastblock) break;
sentlen = 0;
continue;
}
iov[iovcnt].iov_base = o->buf + sentlen;
iov[iovcnt].iov_len = used - sentlen;
iov_bytes_len += iov[iovcnt++].iov_len;
addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]);
if (!metadata[bufcnt].data_len) break;
bufcnt++;
sentlen = 0;
if (next == lastblock) break;
}
serverAssert(iovcnt != 0);
serverAssert(reply.last_written_len == 0);
serverAssert(reply.cnt != 0);
ssize_t totwritten = 0;
while (1) {
int nwritten = connWritev(c->conn, iov, iovcnt);
int nwritten = connWritev(c->conn, reply.iov, reply.cnt);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
totwritten = totwritten > 0 ? totwritten : nwritten;
@ -2038,7 +2384,7 @@ static int writevToClient(client *c) {
}
totwritten += nwritten;
if (totwritten == iov_bytes_len) break;
if (totwritten == reply.iov_len_total) break;
if (totwritten > NET_MAX_WRITES_PER_EVENT) {
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
@ -2055,20 +2401,13 @@ static int writevToClient(client *c) {
}
}
/* proceed to the unwritten blocks */
while (nwritten > 0) {
if ((size_t)nwritten < iov[0].iov_len) {
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
iov[0].iov_len -= nwritten;
break;
}
nwritten -= iov[0].iov_len;
iov++;
iovcnt--;
}
proceedToUnwritten(&reply, nwritten);
}
c->nwritten = totwritten;
if (totwritten > 0) {
saveLastWrittenBuf(c, metadata, bufcnt, reply.iov_len_total, totwritten);
}
return totwritten > 0 ? C_OK : C_ERR;
}
@ -2090,13 +2429,14 @@ int _writeToClient(client *c) {
}
/* If the reply list is not empty, use writev to save system calls and TCP packets */
if (lastblock) return writevToClient(c);
if (lastblock || c->flag.reply_offload) return writevToClient(c);
ssize_t bytes_to_write = bufpos - c->sentlen;
serverAssert(c->io_last_written_data_len == 0 || c->io_last_written_buf == c->buf);
ssize_t bytes_to_write = bufpos - c->io_last_written_data_len;
ssize_t tot_written = 0;
while (tot_written < bytes_to_write) {
int nwritten = connWrite(c->conn, c->buf + c->sentlen + tot_written, bytes_to_write - tot_written);
int nwritten = connWrite(c->conn, c->buf + c->io_last_written_data_len + tot_written, bytes_to_write - tot_written);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
tot_written = tot_written > 0 ? tot_written : nwritten;
@ -2106,44 +2446,89 @@ int _writeToClient(client *c) {
}
c->nwritten = tot_written;
if (tot_written > 0) {
c->io_last_written_buf = c->buf;
c->io_last_written_bufpos = (tot_written == bytes_to_write ? bufpos : 0);
c->io_last_written_data_len = c->io_last_written_data_len + tot_written;
}
return tot_written > 0 ? C_OK : C_ERR;
}
static void _postWriteToClient(client *c) {
if (c->nwritten <= 0) return;
void resetLastWrittenBuf(client *c) {
c->io_last_written_buf = NULL;
c->io_last_written_bufpos = 0;
c->io_last_written_data_len = 0;
}
static void releaseBufOffloads(char *buf, size_t bufpos) {
char *ptr = buf;
while (ptr < buf + bufpos) {
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);
if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) {
clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len);
robj** obj_ptr = (robj**)ptr;
size_t len = header->len;
while (len > 0) {
decrRefCount(*obj_ptr);
obj_ptr++;
len -= sizeof(obj_ptr);
}
}
ptr += header->len;
}
}
void releaseReplyOffloads(client *c) {
if (c->bufpos > 0) {
releaseBufOffloads(c->buf, c->bufpos);
}
listIter iter;
listNode *next;
clientReplyBlock *o;
listRewind(c->reply, &iter);
while ((next = listNext(&iter))) {
clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next);
releaseBufOffloads(o->buf, o->used);
}
}
/*
* See INTERNALS note on writevToClient for explanation about
* io_last_written_buf and io_last_written_bufpos
*/
static void _postWriteToClient(client *c) {
if (c->nwritten <= 0) return;
server.stat_net_output_bytes += c->nwritten;
/* Locate the new node which has leftover data and
* release all nodes in front of it. */
ssize_t remaining = c->nwritten;
if (c->bufpos > 0) { /* Deal with static reply buffer first. */
int buf_len = c->bufpos - c->sentlen;
c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if (c->nwritten >= buf_len) {
int last_written = 0;
if (c->bufpos > 0) {
last_written = (c->buf == c->io_last_written_buf);
if (!last_written || c->bufpos == c->io_last_written_bufpos) {
if (c->flag.reply_offload) releaseBufOffloads(c->buf, c->bufpos);
c->bufpos = 0;
c->sentlen = 0;
c->last_header = 0;
if (last_written) resetLastWrittenBuf(c);
}
remaining -= buf_len;
if (last_written) return;
}
listIter iter;
listNode *next;
listRewind(c->reply, &iter);
while (remaining > 0) {
next = listNext(&iter);
o = listNodeValue(next);
if (remaining < (ssize_t)(o->used - c->sentlen)) {
c->sentlen += remaining;
break;
while ((next = listNext(&iter))) {
clientReplyBlock *o = listNodeValue(next);
last_written = (o->buf == c->io_last_written_buf);
if (!last_written || o->used == c->io_last_written_bufpos) {
c->reply_bytes -= o->size;
if (c->flag.reply_offload) releaseBufOffloads(o->buf, o->used);
listDelNode(c->reply, next);
if (last_written) resetLastWrittenBuf(c);
}
remaining -= (ssize_t)(o->used - c->sentlen);
c->reply_bytes -= o->size;
listDelNode(c->reply, next);
c->sentlen = 0;
if (last_written) return;
}
}
@ -2177,7 +2562,7 @@ int postWriteToClient(client *c) {
if (!c->flag.primary) c->last_interaction = server.unixtime;
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
resetLastWrittenBuf(c);
if (connHasWriteHandler(c->conn)) {
connSetWriteHandler(c->conn, NULL);
}

View File

@ -889,6 +889,7 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) {
* 4) Send the backlog data (from the offset to the end) to the replica. */
waitForClientIO(c);
c->flag.replica = 1;
c->flag.reply_offload = 0;
if (c->repl_data->associated_rdb_client_id && lookupRdbClientByID(c->repl_data->associated_rdb_client_id)) {
c->repl_data->repl_state = REPLICA_STATE_BG_RDB_LOAD;
removeReplicaFromPsyncWait(c);
@ -1155,6 +1156,7 @@ void syncCommand(client *c) {
if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repl_data->repldbfd = -1;
c->flag.replica = 1;
c->flag.reply_offload = 0;
listAddNodeTail(server.replicas, c);
/* Create the replication backlog if needed. */
@ -4220,8 +4222,11 @@ void replicationCachePrimary(client *c) {
server.primary->repl_data->repl_applied = 0;
server.primary->repl_data->read_reploff = server.primary->repl_data->reploff;
if (c->flag.multi) discardTransaction(c);
if (c->flag.reply_offload) {
releaseReplyOffloads(c);
resetLastWrittenBuf(c);
}
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);

View File

@ -782,10 +782,13 @@ char *getObjectTypeName(robj *);
struct evictionPoolEntry; /* Defined in evict.c */
typedef struct payloadHeader payloadHeader; /* Defined in networking.c */
/* This structure is used in order to represent the output buffer of a client,
* which is actually a linked list of blocks like that, that is: client->reply. */
typedef struct clientReplyBlock {
size_t size, used;
payloadHeader* last_header;
char buf[];
} clientReplyBlock;
@ -1054,6 +1057,7 @@ typedef struct ClientFlags {
uint64_t prevent_prop : 1; /* Don't propagate to AOF or replicas. */
uint64_t pending_write : 1; /* Client has output to send but a write handler is yet not installed. */
uint64_t pending_read : 1; /* Client has output to send but a write handler is yet not installed. */
uint64_t reply_offload : 1; /* Client is in reply offload mode */
uint64_t reply_off : 1; /* Don't send replies to client. */
uint64_t reply_skip_next : 1; /* Set CLIENT_REPLY_SKIP for next cmd */
uint64_t reply_skip : 1; /* Don't send just this reply. */
@ -1106,7 +1110,7 @@ typedef struct ClientFlags {
* flag, we won't cache the primary in freeClient. */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */
uint64_t reserved : 4; /* Reserved for future use */
uint64_t reserved : 3; /* Reserved for future use */
} ClientFlags;
typedef struct ClientPubSubData {
@ -1207,10 +1211,14 @@ typedef struct client {
list *reply; /* List of reply objects to send to the client. */
listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */
size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */
char* io_last_written_buf; /* Last buffer that has been written to the client connection */
size_t io_last_written_bufpos; /* The buffer has been written until this position */
size_t io_last_written_data_len; /* The actual length of the data written from this buffer
This length differs from written bufpos in case of reply offload */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */
listNode clients_pending_write_node; /* list node in clients_pending_write or in clients_pending_io_write list */
int bufpos;
size_t bufpos;
payloadHeader* last_header; /* Pointer to the last header in a buffer in reply offload mode */
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
/* Client flags and state indicators */
@ -1660,6 +1668,7 @@ struct valkeyServer {
int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */
int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int reply_offload_enabled; /* Reply offload enabled or not */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
@ -2762,6 +2771,9 @@ void ioThreadWriteToClient(void *data);
int canParseCommand(client *c);
int processIOThreadsReadDone(void);
int processIOThreadsWriteDone(void);
void releaseReplyOffloads(client *c);
void resetLastWrittenBuf(client *c);
/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);

File diff suppressed because one or more lines are too long

View File

@ -129,3 +129,241 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) {
return 0;
}
static client* createTestClient(void) {
client *c = zcalloc(sizeof(client));
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
c->reply = listCreate();
listSetFreeMethod(c->reply, freeClientReplyValue);
listSetDupMethod(c->reply, dupClientReplyValue);
c->flag.reply_offload = 1;
c->flag.fake = 1;
return c;
}
static void freeReplyOffloadClient(client *c) {
listRelease(c->reply);
zfree(c->buf);
zfree(c);
}
int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
client * c = createTestClient();
/* Test 1: Add bulk offloads to the buffer */
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
_addBulkOffloadToBufferOrList(c, obj);
TEST_ASSERT(obj->refcount == 2);
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void*));
payloadHeader *header1 = c->last_header;
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->len == sizeof(void*));
robj **ptr = (robj **)(c->buf + sizeof(payloadHeader));
TEST_ASSERT(obj == *ptr);
robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2"));
_addBulkOffloadToBufferOrList(c, obj2);
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void*));
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));
ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void*));
TEST_ASSERT(obj2 == *ptr);
/* Test 2: Add plain reply to the buffer */
const char* plain = "+OK\r\n";
size_t plain_len = strlen(plain);
_addReplyToBufferOrList(c, plain, plain_len);
TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + plain_len);
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));
payloadHeader *header2 = c->last_header;
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT(header2->len == plain_len);
for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len);
TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + 10 * plain_len);
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT(header2->len == plain_len * 10);
/* Test 3: Add one more bulk offload to the buffer */
_addBulkOffloadToBufferOrList(c, obj);
TEST_ASSERT(obj->refcount == 3);
TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void*) + 10 * plain_len);
payloadHeader *header3 = c->last_header;
TEST_ASSERT(header3->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
ptr = (robj **)((char*)c->last_header + sizeof(payloadHeader));
TEST_ASSERT(obj == *ptr);
decrRefCount(obj);
decrRefCount(obj);
decrRefCount(obj);
decrRefCount(obj2);
decrRefCount(obj2);
freeReplyOffloadClient(c);
return 0;
}
int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
client *c = createTestClient();
/* Test 1: Add bulk offloads to the reply list */
/* Reply len to fill the buffer almost completely */
size_t reply_len = c->buf_usable_size - 2 * sizeof(payloadHeader) - 4;
char *reply = zmalloc(reply_len);
memset(reply, 'a', reply_len);
_addReplyToBufferOrList(c, reply, reply_len);
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len);
TEST_ASSERT(listLength(c->reply) == 0);
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
_addBulkOffloadToBufferOrList(c, obj);
TEST_ASSERT(obj->refcount == 2);
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len);
TEST_ASSERT(listLength(c->reply) == 1);
listIter iter;
listRewind(c->reply, &iter);
listNode *next = listNext(&iter);
clientReplyBlock *blk = listNodeValue(next);
TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void*));
payloadHeader *header1 = blk->last_header;
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->len == sizeof(void*));
robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader));
TEST_ASSERT(obj == *ptr);
/* Test 2: Add one more bulk offload to the reply list */
_addBulkOffloadToBufferOrList(c, obj);
TEST_ASSERT(obj->refcount == 3);
TEST_ASSERT(listLength(c->reply) == 1);
TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void*));
TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD);
TEST_ASSERT(header1->len == 2 * sizeof(void*));
/* Test 3: Add plain replies to cause reply list grow */
while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len);
_addReplyToBufferOrList(c, reply, reply_len);
TEST_ASSERT(listLength(c->reply) == 2);
/* last header in 1st block */
payloadHeader *header2 = blk->last_header;
listRewind(c->reply, &iter);
listNext(&iter);
next = listNext(&iter);
clientReplyBlock *blk2 = listNodeValue(next);
/* last header in 2nd block */
payloadHeader *header3 = blk2->last_header;
TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA && header3->type == CLIENT_REPLY_PAYLOAD_DATA);
TEST_ASSERT((header2->len + header3->len) % reply_len == 0);
decrRefCount(obj);
decrRefCount(obj);
decrRefCount(obj);
freeReplyOffloadClient(c);
return 0;
}
int test_addBufferToReplyIOV(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
const char* expected_reply = "$5\r\nhello\r\n";
ssize_t total_len = strlen(expected_reply);
const int iovmax = 16;
char crlf[2] = {'\r', '\n'};
/* Test 1: 1st writevToclient invocation */
client *c = createTestClient();
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "hello"));
_addBulkOffloadToBufferOrList(c, obj);
struct iovec iov_arr[iovmax];
char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3];
bufWriteMetadata metadata[1];
replyIOV reply;
initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply);
addBufferToReplyIOV(c->buf, c->bufpos, &reply, &metadata[0]);
TEST_ASSERT(reply.iov_len_total == total_len);
TEST_ASSERT(reply.cnt == 3);
const char* ptr = expected_reply;
for (int i = 0; i < reply.cnt; ++i) {
TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0);
ptr += reply.iov[i].iov_len;
}
/* Test 2: Last written buf/pos/data_len after 1st invocation */
saveLastWrittenBuf(c, metadata, 1, reply.iov_len_total, 1); /* only 1 byte has been written */
TEST_ASSERT(c->io_last_written_buf == c->buf);
TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */
TEST_ASSERT(c->io_last_written_data_len == 1);
/* Test 3: 2nd writevToclient invocation */
struct iovec iov_arr2[iovmax];
char prefixes2[iovmax / 3 + 1][LONG_STR_SIZE + 3];
bufWriteMetadata metadata2[1];
replyIOV reply2;
initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2);
addBufferToReplyIOV(c->buf, c->bufpos, &reply2, &metadata2[0]);
TEST_ASSERT(reply2.iov_len_total == total_len - 1);
TEST_ASSERT((*(char*)reply2.iov[0].iov_base) == '5');
/* Test 4: Last written buf/pos/data_len after 2nd invocation */
saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */
TEST_ASSERT(c->io_last_written_buf == c->buf);
TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */
TEST_ASSERT(c->io_last_written_data_len == 5); /* 1 + 4 */
/* Test 5: 3rd writevToclient invocation */
struct iovec iov_arr3[iovmax];
char prefixes3[iovmax / 3 + 1][LONG_STR_SIZE + 3];
bufWriteMetadata metadata3[1];
replyIOV reply3;
initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3);
addBufferToReplyIOV(c->buf, c->bufpos, &reply3, &metadata3[0]);
TEST_ASSERT(reply3.iov_len_total == total_len - 5);
TEST_ASSERT((*(char*)reply3.iov[0].iov_base) == 'e');
/* Test 6: Last written buf/pos/data_len after 3rd invocation */
saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */
TEST_ASSERT(c->io_last_written_buf == c->buf);
TEST_ASSERT(c->io_last_written_bufpos == c->bufpos);
TEST_ASSERT(c->io_last_written_data_len == (size_t)total_len);
decrRefCount(obj);
decrRefCount(obj);
freeReplyOffloadClient(c);
return 0;
}

View File

@ -256,6 +256,11 @@ proc tags_acceptable {tags err_return} {
return 0
}
if {$::reply_offload && [lsearch $tags "reply-offload:skip"] >= 0} {
set err "Not supported in reply-offload mode"
return 0
}
if {$::tcl_version < 8.6 && [lsearch $tags "ipv6"] >= 0} {
set err "TCL version is too low and does not support this"
return 0
@ -536,6 +541,10 @@ proc start_server {options {code undefined}} {
dict set config "events-per-io-thread" 0
}
if {$::reply_offload} {
dict set config "reply-offload" "yes"
}
foreach line $data {
if {[string length $line] > 0 && [string index $line 0] ne "#"} {
set elements [split $line " "]

View File

@ -55,6 +55,7 @@ set ::valgrind 0
set ::durable 0
set ::tls 0
set ::io_threads 0
set ::reply_offload 0
set ::tls_module 0
set ::stack_logging 0
set ::verbose 0
@ -610,6 +611,7 @@ proc print_help_screen {} {
" debugger)."
"--dump-logs Dump server log on test failure."
"--io-threads Run tests with IO threads."
"--reply-offload Run tests with reply offload enabled."
"--tls Run tests in TLS mode."
"--tls-module Run tests in TLS mode with Valkey module."
"--host <addr> Run tests against an external host."
@ -674,6 +676,8 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
set ::quiet 1
} elseif {$opt eq {--io-threads}} {
set ::io_threads 1
} elseif {$opt eq {--reply-offload}} {
set ::reply_offload 1
} elseif {$opt eq {--tls} || $opt eq {--tls-module}} {
package require tls 1.6
set ::tls 1

View File

@ -1,4 +1,4 @@
tags {"external:skip logreqres:skip"} {
tags {"external:skip logreqres:skip reply-offload:skip"} {
# Get info about a server client connection:
# name - name of client we want to query

View File

@ -524,9 +524,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
R 0 SET $key value
# +OK\r\n --> 5 bytes
R 0 GET $key
# $3\r\nvalue\r\n -> 11 bytes
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 5
dict create network-bytes-out 16
]
]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]

View File

@ -409,7 +409,7 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} {
r config set client-output-buffer-limit $org_outbuf_limit
set info [r info stats]
assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1}
} {} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres
} {} {logreqres:skip reply-offload:skip} ;# same as obuf-limits.tcl, skip logreqres
test {clients: pubsub clients} {
set info [r info clients]

View File

@ -1,4 +1,4 @@
start_server {tags {"maxmemory" "external:skip"}} {
start_server {tags {"maxmemory external:skip reply-offload:skip"}} {
r config set maxmemory 11mb
r config set maxmemory-policy allkeys-lru
set server_pid [s process_id]

View File

@ -1,4 +1,4 @@
start_server {tags {"obuf-limits external:skip logreqres:skip"}} {
start_server {tags {"obuf-limits external:skip logreqres:skip reply-offload:skip"}} {
test {CONFIG SET client-output-buffer-limit} {
set oldval [lindex [r config get client-output-buffer-limit] 1]

View File

@ -8,7 +8,7 @@ proc get_reply_buffer_size {cname} {
return $rbufsize
}
start_server {tags {"replybufsize"}} {
start_server {tags {"replybufsize reply-offload:skip"}} {
test {verify reply buffer limits} {
# In order to reduce test time we can set the peak reset time very low

View File

@ -1448,6 +1448,12 @@ lazyfree-lazy-user-flush yes
#
# prefetch-batch-max-size 16
#
# For use cases where command replies include Bulk strings (e.g. GET, MGET)
# reply offload can be enabled to eliminate espensive memory access
# and redundant data copy performed by main thread
#
# reply-offload yes
#
# NOTE:
# 1. The 'io-threads-do-reads' config is deprecated and has no effect. Please
# avoid using this config if possible.