diff --git a/src/blocked.c b/src/blocked.c index 443daec7f..795985ea1 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -186,6 +186,7 @@ void unblockClient(client *c) { server.blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } diff --git a/src/server.c b/src/server.c index cb3c143f8..9a89290ca 100644 --- a/src/server.c +++ b/src/server.c @@ -1560,7 +1560,20 @@ void addClientToTimeoutTable(client *c) { uint64_t id = c->id; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,id); - raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); + if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); } /* This function is called in beforeSleep() in order to unblock clients @@ -1577,7 +1590,10 @@ void clientsHandleTimeout(void) { decodeTimeoutKey(ri.key,&timeout,&id); if (timeout >= now) break; /* All the timeouts are in the future. */ client *c = lookupClientByID(id); - if (c) checkBlockedClientTimeout(c,now); + if (c) { + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); raxSeek(&ri,"^",NULL,0); } diff --git a/src/server.h b/src/server.h index 4a8426bab..03a153c7b 100644 --- a/src/server.h +++ b/src/server.h @@ -252,6 +252,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, depending on optin/optout mode. */ +#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -2139,6 +2140,7 @@ void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); void addClientToTimeoutTable(client *c); +void removeClientFromTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type);