CLIENT LIST speedup via peerid caching + smart allocation.
This commit adds peer ID caching in the client structure plus an API change and the use of sdsMakeRoomFor() in order to improve the reallocation pattern to generate the CLIENT LIST output. Both the changes account for a very significant speedup.
This commit is contained in:
parent
49c543415b
commit
d4a180bbc1
@ -507,6 +507,7 @@ struct redisClient *createFakeClient(void) {
|
|||||||
c->reply_bytes = 0;
|
c->reply_bytes = 0;
|
||||||
c->obuf_soft_limit_reached_time = 0;
|
c->obuf_soft_limit_reached_time = 0;
|
||||||
c->watched_keys = listCreate();
|
c->watched_keys = listCreate();
|
||||||
|
c->peerid = NULL;
|
||||||
listSetFreeMethod(c->reply,decrRefCountVoid);
|
listSetFreeMethod(c->reply,decrRefCountVoid);
|
||||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||||
initClientMultiState(c);
|
initClientMultiState(c);
|
||||||
|
@ -701,7 +701,7 @@ void logCurrentClient(void) {
|
|||||||
int j;
|
int j;
|
||||||
|
|
||||||
redisLog(REDIS_WARNING, "--- CURRENT CLIENT INFO");
|
redisLog(REDIS_WARNING, "--- CURRENT CLIENT INFO");
|
||||||
client = getClientInfoString(cc);
|
client = catClientInfoString(sdsempty(),cc);
|
||||||
redisLog(REDIS_WARNING,"client: %s", client);
|
redisLog(REDIS_WARNING,"client: %s", client);
|
||||||
sdsfree(client);
|
sdsfree(client);
|
||||||
for (j = 0; j < cc->argc; j++) {
|
for (j = 0; j < cc->argc; j++) {
|
||||||
|
@ -118,6 +118,7 @@ redisClient *createClient(int fd) {
|
|||||||
c->watched_keys = listCreate();
|
c->watched_keys = listCreate();
|
||||||
c->pubsub_channels = dictCreate(&setDictType,NULL);
|
c->pubsub_channels = dictCreate(&setDictType,NULL);
|
||||||
c->pubsub_patterns = listCreate();
|
c->pubsub_patterns = listCreate();
|
||||||
|
c->peerid = NULL;
|
||||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||||
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
||||||
if (fd != -1) listAddNodeTail(server.clients,c);
|
if (fd != -1) listAddNodeTail(server.clients,c);
|
||||||
@ -763,6 +764,7 @@ void freeClient(redisClient *c) {
|
|||||||
if (c->name) decrRefCount(c->name);
|
if (c->name) decrRefCount(c->name);
|
||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
freeClientMultiState(c);
|
freeClientMultiState(c);
|
||||||
|
sdsfree(c->peerid);
|
||||||
zfree(c);
|
zfree(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -946,7 +948,7 @@ int processInlineBuffer(redisClient *c) {
|
|||||||
* multi bulk requests idempotent. */
|
* multi bulk requests idempotent. */
|
||||||
static void setProtocolError(redisClient *c, int pos) {
|
static void setProtocolError(redisClient *c, int pos) {
|
||||||
if (server.verbosity >= REDIS_VERBOSE) {
|
if (server.verbosity >= REDIS_VERBOSE) {
|
||||||
sds client = getClientInfoString(c);
|
sds client = catClientInfoString(sdsempty(),c);
|
||||||
redisLog(REDIS_VERBOSE,
|
redisLog(REDIS_VERBOSE,
|
||||||
"Protocol error from client: %s", client);
|
"Protocol error from client: %s", client);
|
||||||
sdsfree(client);
|
sdsfree(client);
|
||||||
@ -1184,7 +1186,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
|
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
|
||||||
sds ci = getClientInfoString(c), bytes = sdsempty();
|
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
|
||||||
|
|
||||||
bytes = sdscatrepr(bytes,c->querybuf,64);
|
bytes = sdscatrepr(bytes,c->querybuf,64);
|
||||||
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
||||||
@ -1215,7 +1217,7 @@ void getClientsMaxBuffers(unsigned long *longest_output_list,
|
|||||||
*biggest_input_buffer = bib;
|
*biggest_input_buffer = bib;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is a helper function for getClientPeerId().
|
/* This is a helper function for genClientPeerId().
|
||||||
* It writes the specified ip/port to "peerid" as a null termiated string
|
* It writes the specified ip/port to "peerid" as a null termiated string
|
||||||
* in the form ip:port if ip does not contain ":" itself, otherwise
|
* in the form ip:port if ip does not contain ":" itself, otherwise
|
||||||
* [ip]:port format is used (for IPv6 addresses basically). */
|
* [ip]:port format is used (for IPv6 addresses basically). */
|
||||||
@ -1239,7 +1241,7 @@ void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) {
|
|||||||
* On failure the function still populates 'peerid' with the "?:0" string
|
* On failure the function still populates 'peerid' with the "?:0" string
|
||||||
* in case you want to relax error checking or need to display something
|
* in case you want to relax error checking or need to display something
|
||||||
* anyway (see anetPeerToString implementation for more info). */
|
* anyway (see anetPeerToString implementation for more info). */
|
||||||
int getClientPeerId(redisClient *client, char *peerid, size_t peerid_len) {
|
int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) {
|
||||||
char ip[REDIS_IP_STR_LEN];
|
char ip[REDIS_IP_STR_LEN];
|
||||||
int port;
|
int port;
|
||||||
|
|
||||||
@ -1255,12 +1257,26 @@ int getClientPeerId(redisClient *client, char *peerid, size_t peerid_len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Turn a Redis client into an sds string representing its state. */
|
/* This function returns the client peer id, by creating and caching it
|
||||||
sds getClientInfoString(redisClient *client) {
|
* if client->perrid is NULL, otherwise returning the cached value.
|
||||||
char peerid[REDIS_PEER_ID_LEN], flags[16], events[3], *p;
|
* The Peer ID never changes during the life of the client, however it
|
||||||
|
* is expensive to compute. */
|
||||||
|
char *getClientPeerId(redisClient *c) {
|
||||||
|
char peerid[REDIS_PEER_ID_LEN];
|
||||||
|
|
||||||
|
if (c->peerid == NULL) {
|
||||||
|
genClientPeerId(c,peerid,sizeof(peerid));
|
||||||
|
c->peerid = sdsnew(peerid);
|
||||||
|
}
|
||||||
|
return c->peerid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Concatenate a string representing the state of a client in an human
|
||||||
|
* readable format, into the sds string 's'. */
|
||||||
|
sds catClientInfoString(sds s, redisClient *client) {
|
||||||
|
char flags[16], events[3], *p;
|
||||||
int emask;
|
int emask;
|
||||||
|
|
||||||
getClientPeerId(client,peerid,sizeof(peerid));
|
|
||||||
p = flags;
|
p = flags;
|
||||||
if (client->flags & REDIS_SLAVE) {
|
if (client->flags & REDIS_SLAVE) {
|
||||||
if (client->flags & REDIS_MONITOR)
|
if (client->flags & REDIS_MONITOR)
|
||||||
@ -1285,9 +1301,9 @@ sds getClientInfoString(redisClient *client) {
|
|||||||
if (emask & AE_READABLE) *p++ = 'r';
|
if (emask & AE_READABLE) *p++ = 'r';
|
||||||
if (emask & AE_WRITABLE) *p++ = 'w';
|
if (emask & AE_WRITABLE) *p++ = 'w';
|
||||||
*p = '\0';
|
*p = '\0';
|
||||||
return sdscatfmt(sdsempty(),
|
return sdscatfmt(s,
|
||||||
"addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
|
"addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
|
||||||
peerid,
|
getClientPeerId(client),
|
||||||
client->fd,
|
client->fd,
|
||||||
client->name ? (char*)client->name->ptr : "",
|
client->name ? (char*)client->name->ptr : "",
|
||||||
(long long)(server.unixtime - client->ctime),
|
(long long)(server.unixtime - client->ctime),
|
||||||
@ -1312,14 +1328,11 @@ sds getAllClientsInfoString(void) {
|
|||||||
redisClient *client;
|
redisClient *client;
|
||||||
sds o = sdsempty();
|
sds o = sdsempty();
|
||||||
|
|
||||||
|
o = sdsMakeRoomFor(o,200*listLength(server.clients));
|
||||||
listRewind(server.clients,&li);
|
listRewind(server.clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
sds cs;
|
|
||||||
|
|
||||||
client = listNodeValue(ln);
|
client = listNodeValue(ln);
|
||||||
cs = getClientInfoString(client);
|
o = catClientInfoString(o,client);
|
||||||
o = sdscatsds(o,cs);
|
|
||||||
sdsfree(cs);
|
|
||||||
o = sdscatlen(o,"\n",1);
|
o = sdscatlen(o,"\n",1);
|
||||||
}
|
}
|
||||||
return o;
|
return o;
|
||||||
@ -1337,11 +1350,10 @@ void clientCommand(redisClient *c) {
|
|||||||
} else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
|
||||||
listRewind(server.clients,&li);
|
listRewind(server.clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
char peerid[REDIS_PEER_ID_LEN];
|
char *peerid;
|
||||||
|
|
||||||
client = listNodeValue(ln);
|
client = listNodeValue(ln);
|
||||||
if (getClientPeerId(client,peerid,sizeof(peerid)) == REDIS_ERR)
|
peerid = getClientPeerId(client);
|
||||||
continue;
|
|
||||||
if (strcmp(peerid,c->argv[2]->ptr) == 0) {
|
if (strcmp(peerid,c->argv[2]->ptr) == 0) {
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
if (c == client) {
|
if (c == client) {
|
||||||
@ -1547,7 +1559,7 @@ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
|
|||||||
redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
|
redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
|
||||||
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
|
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
|
||||||
if (checkClientOutputBufferLimits(c)) {
|
if (checkClientOutputBufferLimits(c)) {
|
||||||
sds client = getClientInfoString(c);
|
sds client = catClientInfoString(sdsempty(),c);
|
||||||
|
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
|
redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
|
||||||
|
@ -530,6 +530,7 @@ typedef struct redisClient {
|
|||||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||||
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
|
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
|
||||||
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
|
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
|
||||||
|
sds peerid; /* Cached peer ID. */
|
||||||
|
|
||||||
/* Response buffer */
|
/* Response buffer */
|
||||||
int bufpos;
|
int bufpos;
|
||||||
@ -991,8 +992,8 @@ void *dupClientReplyValue(void *o);
|
|||||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||||
unsigned long *biggest_input_buffer);
|
unsigned long *biggest_input_buffer);
|
||||||
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port);
|
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port);
|
||||||
int getClientPeerId(redisClient *client, char *peerid, size_t peerid_len);
|
char *getClientPeerId(redisClient *client);
|
||||||
sds getClientInfoString(redisClient *client);
|
sds catClientInfoString(sds s, redisClient *client);
|
||||||
sds getAllClientsInfoString(void);
|
sds getAllClientsInfoString(void);
|
||||||
void rewriteClientCommandVector(redisClient *c, int argc, ...);
|
void rewriteClientCommandVector(redisClient *c, int argc, ...);
|
||||||
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
|
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
|
||||||
|
@ -239,7 +239,6 @@ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **
|
|||||||
int j;
|
int j;
|
||||||
sds cmdrepr = sdsnew("+");
|
sds cmdrepr = sdsnew("+");
|
||||||
robj *cmdobj;
|
robj *cmdobj;
|
||||||
char peerid[REDIS_PEER_ID_LEN];
|
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
|
|
||||||
gettimeofday(&tv,NULL);
|
gettimeofday(&tv,NULL);
|
||||||
@ -249,8 +248,7 @@ void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **
|
|||||||
} else if (c->flags & REDIS_UNIX_SOCKET) {
|
} else if (c->flags & REDIS_UNIX_SOCKET) {
|
||||||
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
|
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
|
||||||
} else {
|
} else {
|
||||||
getClientPeerId(c,peerid,sizeof(peerid));
|
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
|
||||||
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,peerid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (j = 0; j < argc; j++) {
|
for (j = 0; j < argc; j++) {
|
||||||
@ -1387,6 +1385,12 @@ void replicationCacheMaster(redisClient *c) {
|
|||||||
/* Set fd to -1 so that we can safely call freeClient(c) later. */
|
/* Set fd to -1 so that we can safely call freeClient(c) later. */
|
||||||
c->fd = -1;
|
c->fd = -1;
|
||||||
|
|
||||||
|
/* Invalidate the Peer ID cache. */
|
||||||
|
if (c->peerid) {
|
||||||
|
sdsfree(c->peerid);
|
||||||
|
c->peerid = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Caching the master happens instead of the actual freeClient() call,
|
/* Caching the master happens instead of the actual freeClient() call,
|
||||||
* so make sure to adjust the replication state. This function will
|
* so make sure to adjust the replication state. This function will
|
||||||
* also set server.master to NULL. */
|
* also set server.master to NULL. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user