Adds listnode to client struct for clients_pending_write list (#11220)

This commit is contained in:
Adi Pinsky 2022-09-15 06:39:47 +03:00 committed by GitHub
parent 42e4241ece
commit d144dc927a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 12 deletions

View File

@ -93,6 +93,14 @@ list *listAddNodeHead(list *list, void *value)
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
listLinkNodeHead(list, node);
return list;
}
/*
* Add a node that has already been allocated to the head of list
*/
void listLinkNodeHead(list* list, listNode *node) {
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
@ -103,7 +111,6 @@ list *listAddNodeHead(list *list, void *value)
list->head = node;
}
list->len++;
return list;
}
/* Add a new node to the list, to tail, containing the specified 'value'
@ -162,11 +169,20 @@ list *listInsertNode(list *list, listNode *old_node, void *value, int after) {
}
/* Remove the specified node from the specified list.
* It's up to the caller to free the private value of the node.
* The node is freed. If free callback is provided the value is freed as well.
*
* This function can't fail. */
void listDelNode(list *list, listNode *node)
{
listUnlinkNode(list, node);
if (list->free) list->free(node->value);
zfree(node);
}
/*
* Remove the specified node from the list without freeing it.
*/
void listUnlinkNode(list *list, listNode *node) {
if (node->prev)
node->prev->next = node->next;
else
@ -175,8 +191,10 @@ void listDelNode(list *list, listNode *node)
node->next->prev = node->prev;
else
list->tail = node->prev;
if (list->free) list->free(node->value);
zfree(node);
node->next = NULL;
node->prev = NULL;
list->len--;
}
@ -381,3 +399,12 @@ void listJoin(list *l, list *o) {
o->head = o->tail = NULL;
o->len = 0;
}
/* Initializes the node's value and sets its pointers
* so that it is initially not a member of any list.
*/
void listInitNode(listNode *node, void *value) {
node->prev = NULL;
node->next = NULL;
node->value = value;
}

View File

@ -88,6 +88,9 @@ void listRewindTail(list *list, listIter *li);
void listRotateTailToHead(list *list);
void listRotateHeadToTail(list *list);
void listJoin(list *l, list *o);
void listInitNode(listNode *node, void *value);
void listLinkNodeHead(list *list, listNode *node);
void listUnlinkNode(list *list, listNode *node);
/* Directions for iterators */
#define AL_START_HEAD 0

View File

@ -7542,7 +7542,7 @@ void moduleHandleBlockedClients(void) {
!(c->flags & CLIENT_PENDING_WRITE))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}

View File

@ -207,6 +207,7 @@ client *createClient(connection *conn) {
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listInitNode(&c->clients_pending_write_node, c);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
c->mem_usage_bucket = NULL;
@ -255,7 +256,7 @@ void putClientInPendingWriteQueue(client *c) {
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}
@ -1439,9 +1440,9 @@ void unlinkClient(client *c) {
/* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) {
ln = listSearchKey(server.clients_pending_write,c);
serverAssert(ln != NULL);
listDelNode(server.clients_pending_write,ln);
serverAssert(&c->clients_pending_write_node.next != NULL ||
&c->clients_pending_write_node.prev != NULL);
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flags &= ~CLIENT_PENDING_WRITE;
}
@ -1975,7 +1976,7 @@ int handleClientsWithPendingWrites(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
listUnlinkNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
@ -4156,7 +4157,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
@ -4215,7 +4216,9 @@ int handleClientsWithPendingWritesUsingThreads(void) {
installClientWriteHandler(c);
}
}
listEmpty(server.clients_pending_write);
while(listLength(server.clients_pending_write) > 0) {
listUnlinkNode(server.clients_pending_write, server.clients_pending_write->head);
}
/* Update processed count on server */
server.stat_io_writes_processed += processed;

View File

@ -1176,6 +1176,8 @@ typedef struct client {
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
/* list node in clients_pending_write list */
listNode clients_pending_write_node;
/* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */