Optimize linear search of WAIT and WAITAOF when unblocking the client (#787)
Currently, if the client enters a blocked state, it will be added to the server.clients_waiting_acks list. When the client is unblocked, that is, when unblockClient is called, we will need to linearly traverse server.clients_waiting_acks to delete the client, and this search is O(N). When WAIT (or WAITAOF) is used extensively in some cases, this O(N) search may be time-consuming. We can remember the list node and store it in the blockingState struct and it can avoid the linear search in unblockClientWaitingReplicas. Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
33c7ca41be
commit
70b9285802
@ -76,10 +76,13 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
|
||||
void initClientBlockingState(client *c) {
|
||||
c->bstate.btype = BLOCKED_NONE;
|
||||
c->bstate.timeout = 0;
|
||||
c->bstate.unblock_on_nokey = 0;
|
||||
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
|
||||
c->bstate.numreplicas = 0;
|
||||
c->bstate.numlocal = 0;
|
||||
c->bstate.reploffset = 0;
|
||||
c->bstate.unblock_on_nokey = 0;
|
||||
c->bstate.client_waiting_acks_list_node = NULL;
|
||||
c->bstate.module_blocked_handle = NULL;
|
||||
c->bstate.async_rm_call_handle = NULL;
|
||||
}
|
||||
|
||||
@ -595,6 +598,11 @@ void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, lon
|
||||
c->bstate.numreplicas = numreplicas;
|
||||
c->bstate.numlocal = numlocal;
|
||||
listAddNodeHead(server.clients_waiting_acks, c);
|
||||
/* Note that we remember the linked list node where the client is stored,
|
||||
* this way removing the client in unblockClientWaitingReplicas() will not
|
||||
* require a linear scan, but just a constant time operation. */
|
||||
serverAssert(c->bstate.client_waiting_acks_list_node == NULL);
|
||||
c->bstate.client_waiting_acks_list_node = listFirst(server.clients_waiting_acks);
|
||||
blockClient(c, BLOCKED_WAIT);
|
||||
}
|
||||
|
||||
|
@ -4358,9 +4358,9 @@ void waitaofCommand(client *c) {
|
||||
* waiting for replica acks. Never call it directly, call unblockClient()
|
||||
* instead. */
|
||||
void unblockClientWaitingReplicas(client *c) {
|
||||
listNode *ln = listSearchKey(server.clients_waiting_acks, c);
|
||||
serverAssert(ln != NULL);
|
||||
listDelNode(server.clients_waiting_acks, ln);
|
||||
serverAssert(c->bstate.client_waiting_acks_list_node);
|
||||
listDelNode(server.clients_waiting_acks, c->bstate.client_waiting_acks_list_node);
|
||||
c->bstate.client_waiting_acks_list_node = NULL;
|
||||
updateStatsOnUnblock(c, 0, 0, 0);
|
||||
}
|
||||
|
||||
|
10
src/server.h
10
src/server.h
@ -1002,7 +1002,7 @@ typedef struct multiState {
|
||||
} multiState;
|
||||
|
||||
/* This structure holds the blocking operation state for a client.
|
||||
* The fields used depend on client->btype. */
|
||||
* The fields used depend on client->bstate.btype. */
|
||||
typedef struct blockingState {
|
||||
/* Generic fields. */
|
||||
blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */
|
||||
@ -1010,13 +1010,15 @@ typedef struct blockingState {
|
||||
* is > timeout then the operation timed out. */
|
||||
int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys
|
||||
is deleted or does not exist anymore */
|
||||
|
||||
/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */
|
||||
dict *keys; /* The keys we are blocked on */
|
||||
|
||||
/* BLOCKED_WAIT and BLOCKED_WAITAOF */
|
||||
int numreplicas; /* Number of replicas we are waiting for ACK. */
|
||||
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
|
||||
long long reploffset; /* Replication offset to reach. */
|
||||
int numreplicas; /* Number of replicas we are waiting for ACK. */
|
||||
int numlocal; /* Indication if WAITAOF is waiting for local fsync. */
|
||||
long long reploffset; /* Replication offset to reach. */
|
||||
listNode *client_waiting_acks_list_node; /* list node in server.clients_waiting_acks list. */
|
||||
|
||||
/* BLOCKED_MODULE */
|
||||
void *module_blocked_handle; /* ValkeyModuleBlockedClient structure.
|
||||
|
Loading…
x
Reference in New Issue
Block a user