Precise timeouts: cleaup the table on unblock.
Now that this mechanism is the sole one used for blocked clients timeouts, it is more wise to cleanup the table when the client unblocks for any reason. We use a flag: CLIENT_IN_TO_TABLE, in order to avoid a radix tree lookup when the client was already removed from the table because we processed it by scanning the radix tree.
This commit is contained in:
parent
0e5723ff18
commit
c881ba1680
@ -186,6 +186,7 @@ void unblockClient(client *c) {
|
|||||||
server.blocked_clients_by_type[c->btype]--;
|
server.blocked_clients_by_type[c->btype]--;
|
||||||
c->flags &= ~CLIENT_BLOCKED;
|
c->flags &= ~CLIENT_BLOCKED;
|
||||||
c->btype = BLOCKED_NONE;
|
c->btype = BLOCKED_NONE;
|
||||||
|
removeClientFromTimeoutTable(c);
|
||||||
queueClientForReprocessing(c);
|
queueClientForReprocessing(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
20
src/server.c
20
src/server.c
@ -1560,7 +1560,20 @@ void addClientToTimeoutTable(client *c) {
|
|||||||
uint64_t id = c->id;
|
uint64_t id = c->id;
|
||||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||||
encodeTimeoutKey(buf,timeout,id);
|
encodeTimeoutKey(buf,timeout,id);
|
||||||
raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL);
|
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
|
||||||
|
c->flags |= CLIENT_IN_TO_TABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Remove the client from the table when it is unblocked for reasons
|
||||||
|
* different than timing out. */
|
||||||
|
void removeClientFromTimeoutTable(client *c) {
|
||||||
|
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
|
||||||
|
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||||
|
uint64_t timeout = c->bpop.timeout;
|
||||||
|
uint64_t id = c->id;
|
||||||
|
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||||
|
encodeTimeoutKey(buf,timeout,id);
|
||||||
|
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called in beforeSleep() in order to unblock clients
|
/* This function is called in beforeSleep() in order to unblock clients
|
||||||
@ -1577,7 +1590,10 @@ void clientsHandleTimeout(void) {
|
|||||||
decodeTimeoutKey(ri.key,&timeout,&id);
|
decodeTimeoutKey(ri.key,&timeout,&id);
|
||||||
if (timeout >= now) break; /* All the timeouts are in the future. */
|
if (timeout >= now) break; /* All the timeouts are in the future. */
|
||||||
client *c = lookupClientByID(id);
|
client *c = lookupClientByID(id);
|
||||||
if (c) checkBlockedClientTimeout(c,now);
|
if (c) {
|
||||||
|
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||||
|
checkBlockedClientTimeout(c,now);
|
||||||
|
}
|
||||||
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||||
raxSeek(&ri,"^",NULL,0);
|
raxSeek(&ri,"^",NULL,0);
|
||||||
}
|
}
|
||||||
|
@ -252,6 +252,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||||||
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
|
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
|
||||||
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
|
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
|
||||||
depending on optin/optout mode. */
|
depending on optin/optout mode. */
|
||||||
|
#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -2139,6 +2140,7 @@ void handleClientsBlockedOnKeys(void);
|
|||||||
void signalKeyAsReady(redisDb *db, robj *key);
|
void signalKeyAsReady(redisDb *db, robj *key);
|
||||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||||
void addClientToTimeoutTable(client *c);
|
void addClientToTimeoutTable(client *c);
|
||||||
|
void removeClientFromTimeoutTable(client *c);
|
||||||
|
|
||||||
/* expire.c -- Handling of expired keys */
|
/* expire.c -- Handling of expired keys */
|
||||||
void activeExpireCycle(int type);
|
void activeExpireCycle(int type);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user