Optimize stream id sds creation on XADD key * (~20% saved cpu cycles) (#10574)
we can observe that when adding to a stream without ID there is a duplicate work on sds creation/freeing/sdslen that costs ~11% of the CPU cycles. This PR avoids it by not freeing the sds after the first reply. The expected reduction in CPU cycles is around 9-10% Additionally, we now pre-allocate the sds to the right size, to avoid realloc. this brought another ~10% improvement Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
bb7891f080
commit
a642947e04
@ -1370,24 +1370,35 @@ void streamLastValidID(stream *s, streamID *maxid)
|
||||
streamIteratorStop(&si);
|
||||
}
|
||||
|
||||
/* Maximum size for a stream ID string. In theory 20*2+1 should be enough,
|
||||
* But to avoid chance for off by one issues and null-term, in case this will
|
||||
* be used as parsing buffer, we use a slightly larger buffer. On the other
|
||||
* hand considering sds header is gonna add 4 bytes, we wanna keep below the
|
||||
* allocator's 48 bytes bin. */
|
||||
#define STREAM_ID_STR_LEN 44
|
||||
|
||||
sds createStreamIDString(streamID *id) {
|
||||
/* Optimization: pre-allocate a big enough buffer to avoid reallocs. */
|
||||
sds str = sdsnewlen(SDS_NOINIT, STREAM_ID_STR_LEN);
|
||||
sdssetlen(str, 0);
|
||||
return sdscatfmt(str,"%U-%U", id->ms,id->seq);
|
||||
}
|
||||
|
||||
/* Emit a reply in the client output buffer by formatting a Stream ID
|
||||
* in the standard <ms>-<seq> format, using the simple string protocol
|
||||
* of REPL. */
|
||||
void addReplyStreamID(client *c, streamID *id) {
|
||||
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
||||
addReplyBulkSds(c,replyid);
|
||||
addReplyBulkSds(c,createStreamIDString(id));
|
||||
}
|
||||
|
||||
void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
|
||||
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
||||
setDeferredReplyBulkSds(c, dr, replyid);
|
||||
setDeferredReplyBulkSds(c, dr, createStreamIDString(id));
|
||||
}
|
||||
|
||||
/* Similar to the above function, but just creates an object, usually useful
|
||||
* for replication purposes to create arguments. */
|
||||
robj *createObjectFromStreamID(streamID *id) {
|
||||
return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
|
||||
id->ms,id->seq));
|
||||
return createObject(OBJ_STRING, createStreamIDString(id));
|
||||
}
|
||||
|
||||
/* Returns non-zero if the ID is 0-0. */
|
||||
@ -2025,7 +2036,8 @@ void xaddCommand(client *c) {
|
||||
addReplyError(c,"Elements are too large to be stored");
|
||||
return;
|
||||
}
|
||||
addReplyStreamID(c,&id);
|
||||
sds replyid = createStreamIDString(&id);
|
||||
addReplyBulkCBuffer(c, replyid, sdslen(replyid));
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
|
||||
@ -2050,9 +2062,11 @@ void xaddCommand(client *c) {
|
||||
/* Let's rewrite the ID argument with the one actually generated for
|
||||
* AOF/replication propagation. */
|
||||
if (!parsed_args.id_given || !parsed_args.seq_given) {
|
||||
robj *idarg = createObjectFromStreamID(&id);
|
||||
robj *idarg = createObject(OBJ_STRING, replyid);
|
||||
rewriteClientCommandArgument(c, idpos, idarg);
|
||||
decrRefCount(idarg);
|
||||
} else {
|
||||
sdsfree(replyid);
|
||||
}
|
||||
|
||||
/* We need to signal to blocked clients that there is new data on this
|
||||
|
Loading…
x
Reference in New Issue
Block a user