Modules: block on keys: fix bugs in processing order.
This commit is contained in:
parent
629081f839
commit
66f55bc5c1
@ -450,6 +450,22 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
|||||||
listNode *clientnode = listFirst(clients);
|
listNode *clientnode = listFirst(clients);
|
||||||
client *receiver = clientnode->value;
|
client *receiver = clientnode->value;
|
||||||
|
|
||||||
|
/* Put at the tail, so that at the next call
|
||||||
|
* we'll not run into it again: clients here may not be
|
||||||
|
* ready to be served, so they'll remain in the list
|
||||||
|
* sometimes. We want also be able to skip clients that are
|
||||||
|
* not blocked for the MODULE type safely. */
|
||||||
|
listDelNode(clients,clientnode);
|
||||||
|
listAddNodeTail(clients,receiver);
|
||||||
|
|
||||||
|
if (receiver->btype != BLOCKED_MODULE) continue;
|
||||||
|
|
||||||
|
/* Note that if *this* client cannot be served by this key,
|
||||||
|
* it does not mean that another client that is next into the
|
||||||
|
* list cannot be served as well: they may be blocked by
|
||||||
|
* different modules with different triggers to consider if a key
|
||||||
|
* is ready or not. This means we can't exit the loop but need
|
||||||
|
* to continue after the first failure. */
|
||||||
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
||||||
|
|
||||||
moduleUnblockClient(receiver);
|
moduleUnblockClient(receiver);
|
||||||
|
@ -249,6 +249,7 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
in thread safe contexts. */
|
in thread safe contexts. */
|
||||||
int dbid; /* Database number selected by the original client. */
|
int dbid; /* Database number selected by the original client. */
|
||||||
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
||||||
|
int unblocked; /* Already on the moduleUnblocked list. */
|
||||||
} RedisModuleBlockedClient;
|
} RedisModuleBlockedClient;
|
||||||
|
|
||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
@ -4037,6 +4038,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
|||||||
bc->reply_client->flags |= CLIENT_MODULE;
|
bc->reply_client->flags |= CLIENT_MODULE;
|
||||||
bc->dbid = c->db->id;
|
bc->dbid = c->db->id;
|
||||||
bc->blocked_on_keys = keys != NULL;
|
bc->blocked_on_keys = keys != NULL;
|
||||||
|
bc->unblocked = 0;
|
||||||
c->bpop.timeout = timeout;
|
c->bpop.timeout = timeout;
|
||||||
|
|
||||||
if (islua || ismulti) {
|
if (islua || ismulti) {
|
||||||
@ -4063,6 +4065,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
|||||||
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
||||||
int served = 0;
|
int served = 0;
|
||||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
/* Protect against re-processing: don't serve clients that are already
|
||||||
|
* in the unblocking list for any reason (including RM_UnblockClient()
|
||||||
|
* explicit call). */
|
||||||
|
if (bc->unblocked) return REDISMODULE_ERR;
|
||||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
||||||
ctx.blocked_ready_key = key;
|
ctx.blocked_ready_key = key;
|
||||||
@ -4162,6 +4168,7 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
|
|||||||
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
|
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
|
||||||
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
||||||
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||||
|
bc->unblocked = 1;
|
||||||
listAddNodeTail(moduleUnblockedClients,bc);
|
listAddNodeTail(moduleUnblockedClients,bc);
|
||||||
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
||||||
/* Ignore the error, this is best-effort. */
|
/* Ignore the error, this is best-effort. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user