diff --git a/runtest-moduleapi b/runtest-moduleapi index 9301002c9..e48535126 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/propagate \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ +--single unit/moduleapi/blockonkeys \ "${@}" diff --git a/src/aof.c b/src/aof.c index 0e3648ff0..dda9579e3 100644 --- a/src/aof.c +++ b/src/aof.c @@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createAOFClient(); - startLoadingFile(fp, filename); + startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ @@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,NULL,1) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(1); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; - stopLoading(); + stopLoading(1); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; @@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); + startSaving(RDBFLAGS_AOF_PREAMBLE); + if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } @@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) { if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } diff --git a/src/blocked.c b/src/blocked.c index 867f03de6..14c2ff830 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -174,6 +174,7 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { + if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); } else { serverPanic("Unknown btype in unblockClient()."); @@ -430,6 +431,49 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { } } +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * in order to check if we can serve clients blocked by modules using + * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: + * our goal here is to call the RedisModuleBlockedClient reply() callback to + * see if the key is really able to serve the client, and in that case, + * unblock it. */ +void serveClientsBlockedOnKeyByModule(readyList *rl) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + /* Put at the tail, so that at the next call + * we'll not run into it again: clients here may not be + * ready to be served, so they'll remain in the list + * sometimes. We want also be able to skip clients that are + * not blocked for the MODULE type safely. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + + if (receiver->btype != BLOCKED_MODULE) continue; + + /* Note that if *this* client cannot be served by this key, + * it does not mean that another client that is next into the + * list cannot be served as well: they may be blocked by + * different modules with different triggers to consider if a key + * is ready or not. This means we can't exit the loop but need + * to continue after the first failure. */ + if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; + + moduleUnblockClient(receiver); + } + } +} + /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after * being called by a client. It handles serving clients blocked in @@ -480,6 +524,10 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnSortedSetKey(o,rl); else if (o->type == OBJ_STREAM) serveClientsBlockedOnStreamKey(o,rl); + /* We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. */ + serveClientsBlockedOnKeyByModule(rl); } /* Free this item. */ diff --git a/src/db.c b/src/db.c index 14f163a8f..e75a4d429 100644 --- a/src/db.c +++ b/src/db.c @@ -465,6 +465,29 @@ int getFlushCommandFlags(client *c, int *flags) { return C_OK; } +/* Flushes the whole server data set. */ +void flushAllDataAndResetRDB(int flags) { + server.dirty += emptyDb(-1,flags,NULL); + if (server.rdb_child_pid != -1) killRDBChild(); + if (server.saveparamslen > 0) { + /* Normally rdbSave() will reset dirty, but we don't want this here + * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ + int saved_dirty = server.dirty; + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSave(server.rdb_filename,rsiptr); + server.dirty = saved_dirty; + } + server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif +} + /* FLUSHDB [ASYNC] * * Flushes the currently SELECTed Redis DB. */ @@ -488,28 +511,9 @@ void flushdbCommand(client *c) { * Flushes the whole server data set. */ void flushallCommand(client *c) { int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - server.dirty += emptyDb(-1,flags,NULL); + flushAllDataAndResetRDB(flags); addReply(c,shared.ok); - if (server.rdb_child_pid != -1) killRDBChild(); - if (server.saveparamslen > 0) { - /* Normally rdbSave() will reset dirty, but we don't want this here - * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ - int saved_dirty = server.dirty; - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(server.rdb_filename,rsiptr); - server.dirty = saved_dirty; - } - server.dirty++; -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchroneus. */ - if (!(flags & EMPTYDB_ASYNC)) - jemalloc_purge(); -#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/debug.c b/src/debug.c index 179f6d2c9..a2d37337d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -417,7 +417,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/module.c b/src/module.c index 09cf6ac7a..3a161c05f 100644 --- a/src/module.c +++ b/src/module.c @@ -140,6 +140,9 @@ struct RedisModuleCtx { void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ void *blocked_privdata; /* Privdata set when unblocking a client. */ + RedisModuleString *blocked_ready_key; /* Key ready when the reply callback + gets called for clients blocked + on keys. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -153,7 +156,7 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) @@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient { client *reply_client; /* Fake client used to accumulate replies in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ + int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ + int unblocked; /* Already on the moduleUnblocked list. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -1620,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { return REDISMODULE_OK; } +/* This is an helper for moduleFireServerEvent() and other functions: + * It populates the replication info structure with the appropriate + * fields depending on the version provided. If the version is not valid + * then REDISMODULE_ERR is returned. Otherwise the function returns + * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */ +int modulePopulateReplicationInfoStructure(void *ri, int structver) { + if (structver != 1) return REDISMODULE_ERR; + + RedisModuleReplicationInfoV1 *ri1 = ri; + memset(ri1,0,sizeof(*ri1)); + ri1->version = structver; + ri1->master = server.masterhost==NULL; + ri1->masterhost = server.masterhost? server.masterhost: ""; + ri1->masterport = server.masterport; + ri1->replid1 = server.replid; + ri1->replid2 = server.replid2; + ri1->repl1_offset = server.master_repl_offset; + ri1->repl2_offset = server.second_replid_offset; + return REDISMODULE_OK; +} + /* Return information about the client with the specified ID (that was * previously obtained via the RedisModule_GetClientId() API). If the * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR @@ -1672,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { return modulePopulateClientInfoStructure(ci,client,structver); } +/* Publish a message to subscribers (see PUBLISH command). */ +int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + int receivers = pubsubPublishMessage(channel, message); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message); + return receivers; +} + /* Return the currently selected DB. */ int RM_GetSelectedDb(RedisModuleCtx *ctx) { return ctx->client->db->id; @@ -1960,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_OK; } +/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled) + * If restart_aof is true, you must make sure the command that triggered this call is not + * propagated to the AOF file. + * When async is set to true, db contents will be freed by a background thread. */ +void RM_ResetDataset(int restart_aof, int async) { + if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); + flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); +} + +/* Returns the number of keys in the current db. */ +unsigned long long RM_DbSize(RedisModuleCtx *ctx) { + return dictSize(ctx->client->db->dict); +} + +/* Returns a name of a random key, or NULL if current db is empty. */ +RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { + robj *key = dbRandomKey(ctx->client->db); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key); + return key; +} + /* -------------------------------------------------------------------------- * Key API for String type * -------------------------------------------------------------------------- */ @@ -4014,6 +4071,94 @@ void unblockClientFromModule(client *c) { resetClient(c); } +/* Block a client in the context of a module: this function implements both + * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the + * keys are passed or not. + * + * When not blocking for keys, the keys, numkeys, and privdata parameters are + * not needed. The privdata in that case must be NULL, since later is + * RM_UnblockClient() that will provide some private data that the reply + * callback will receive. + * + * Instead when blocking for keys, normally RM_UnblockClient() will not be + * called (because the client will unblock when the key is modified), so + * 'privdata' should be provided in that case, so that once the client is + * unlocked and the reply callback is called, it will receive its associated + * private data. + * + * Even when blocking on keys, RM_UnblockClient() can be called however, but + * in that case the privdata argument is disregarded, because we pass the + * reply callback the privdata that is set here while blocking. + */ +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + client *c = ctx->client; + int islua = c->flags & CLIENT_LUA; + int ismulti = c->flags & CLIENT_MULTI; + + c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + ctx->module->blocked_clients++; + + /* We need to handle the invalid operation of calling modules blocking + * commands from Lua or MULTI. We actually create an already aborted + * (client set to NULL) blocked client handle, and actually reply with + * an error. */ + mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0; + bc->client = (islua || ismulti) ? NULL : c; + bc->module = ctx->module; + bc->reply_callback = reply_callback; + bc->timeout_callback = timeout_callback; + bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ + bc->free_privdata = free_privdata; + bc->privdata = privdata; + bc->reply_client = createClient(NULL); + bc->reply_client->flags |= CLIENT_MODULE; + bc->dbid = c->db->id; + bc->blocked_on_keys = keys != NULL; + bc->unblocked = 0; + c->bpop.timeout = timeout; + + if (islua || ismulti) { + c->bpop.module_blocked_handle = NULL; + addReplyError(c, islua ? + "Blocking module command called from Lua script" : + "Blocking module command called from transaction"); + } else { + if (keys) { + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL); + } else { + blockClient(c,BLOCKED_MODULE); + } + } + return bc; +} + +/* This function is called from module.c in order to check if a module + * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) + * can really be unblocked, since the module was able to serve the client. + * If the callback returns REDISMODULE_OK, then the client can be unblocked, + * otherwise the client remains blocked and we'll retry again when one of + * the keys it blocked for becomes "ready" again. */ +int moduleTryServeClientBlockedOnKey(client *c, robj *key) { + int served = 0; + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already + * in the unblocking list for any reason (including RM_UnblockClient() + * explicit call). */ + if (bc->unblocked) return REDISMODULE_ERR; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + ctx.blocked_ready_key = key; + ctx.blocked_privdata = bc->privdata; + ctx.module = bc->module; + ctx.client = bc->client; + ctx.blocked_client = bc; + if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK) + served = 1; + moduleFreeContext(&ctx); + return served; +} + /* Block a client in the context of a blocking command, returning an handle * which will be used, later, in order to unblock the client with a call to * RedisModule_UnblockClient(). The arguments specify callback functions @@ -4031,39 +4176,96 @@ void unblockClientFromModule(client *c) { * by RedisModule_UnblockClient() call. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { - client *c = ctx->client; - int islua = c->flags & CLIENT_LUA; - int ismulti = c->flags & CLIENT_MULTI; + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); +} - c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; - ctx->module->blocked_clients++; +/* This call is similar to RedisModule_BlockClient(), however in this case we + * don't just block the client, but also ask Redis to unblock it automatically + * once certain keys become "ready", that is, contain more data. + * + * Basically this is similar to what a typical Redis command usually does, + * like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP, + * and later when the key receives new data (a list push for instance), the + * client is unblocked and served. + * + * However in the case of this module API, when the client is unblocked? + * + * 1. If you block ok a key of a type that has blocking operations associated, + * like a list, a sorted set, a stream, and so forth, the client may be + * unblocked once the relevant key is targeted by an operation that normally + * unblocks the native blocking operations for that type. So if we block + * on a list key, an RPUSH command may unblock our client and so forth. + * 2. If you are implementing your native data type, or if you want to add new + * unblocking conditions in addition to "1", you can call the modules API + * RedisModule_SignalKeyAsReady(). + * + * Anyway we can't be sure if the client should be unblocked just because the + * key is signaled as ready: for instance a successive operation may change the + * key, or a client in queue before this one can be served, modifying the key + * as well and making it empty again. So when a client is blocked with + * RedisModule_BlockClientOnKeys() the reply callback is not called after + * RM_UnblockCLient() is called, but every time a key is signaled as ready: + * if the reply callback can serve the client, it returns REDISMODULE_OK + * and the client is unblocked, otherwise it will return REDISMODULE_ERR + * and we'll try again later. + * + * The reply callback can access the key that was signaled as ready by + * calling the API RedisModule_GetBlockedClientReadyKey(), that returns + * just the string name of the key as a RedisModuleString object. + * + * Thanks to this system we can setup complex blocking scenarios, like + * unblocking a client only if a list contains at least 5 items or other + * more fancy logics. + * + * Note that another difference with RedisModule_BlockClient(), is that here + * we pass the private data directly when blocking the client: it will + * be accessible later in the reply callback. Normally when blocking with + * RedisModule_BlockClient() the private data to reply to the client is + * passed when calling RedisModule_UnblockClient() but here the unblocking + * is performed by Redis itself, so we need to have some private data before + * hand. The private data is used to store any information about the specific + * unblocking operation that you are implementing. Such information will be + * freed using the free_privdata callback provided by the user. + * + * However the reply callback will be able to access the argument vector of + * the command, so the private data is often not needed. */ +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); +} - /* We need to handle the invalid operation of calling modules blocking - * commands from Lua or MULTI. We actually create an already aborted - * (client set to NULL) blocked client handle, and actually reply with - * an error. */ - bc->client = (islua || ismulti) ? NULL : c; - bc->module = ctx->module; - bc->reply_callback = reply_callback; - bc->timeout_callback = timeout_callback; - bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ - bc->free_privdata = free_privdata; - bc->privdata = NULL; - bc->reply_client = createClient(NULL); - bc->reply_client->flags |= CLIENT_MODULE; - bc->dbid = c->db->id; - c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; +/* This function is used in order to potentially unblock a client blocked + * on keys with RedisModule_BlockClientOnKeys(). When this function is called, + * all the clients blocked for this key will get their reply callback called, + * and if the callback returns REDISMODULE_OK the client will be unblocked. */ +void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { + signalKeyAsReady(ctx->client->db, key); +} - if (islua || ismulti) { - c->bpop.module_blocked_handle = NULL; - addReplyError(c, islua ? - "Blocking module command called from Lua script" : - "Blocking module command called from transaction"); - } else { - blockClient(c,BLOCKED_MODULE); +/* Implements RM_UnblockClient() and moduleUnblockClient(). */ +int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { + pthread_mutex_lock(&moduleUnblockedClientsMutex); + if (!bc->blocked_on_keys) bc->privdata = privdata; + bc->unblocked = 1; + listAddNodeTail(moduleUnblockedClients,bc); + if (write(server.module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ } - return bc; + pthread_mutex_unlock(&moduleUnblockedClientsMutex); + return REDISMODULE_OK; +} + +/* This API is used by the Redis core to unblock a client that was blocked + * by a module. */ +void moduleUnblockClient(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + moduleUnblockClientByHandle(bc,NULL); +} + +/* Return true if the client 'c' was blocked by a module using + * RM_BlockClientOnKeys(). */ +int moduleClientIsBlockedOnKeys(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + return bc->blocked_on_keys; } /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger @@ -4076,15 +4278,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * needs to be passed to the client, included but not limited some slow * to compute reply or some reply obtained via networking. * - * Note: this function can be called from threads spawned by the module. */ + * Note 1: this function can be called from threads spawned by the module. + * + * Note 2: when we unblock a client that is blocked for keys using + * the API RedisModule_BlockClientOnKeys(), the privdata argument here is + * not used, and the reply callback is called with the privdata pointer that + * was passed when blocking the client. + * + * Unblocking a client that was blocked for keys using this API will still + * require the client to get some reply, so the function will use the + * "timeout" handler in order to do so. */ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { - pthread_mutex_lock(&moduleUnblockedClientsMutex); - bc->privdata = privdata; - listAddNodeTail(moduleUnblockedClients,bc); - if (write(server.module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ + if (bc->blocked_on_keys) { + /* In theory the user should always pass the timeout handler as an + * argument, but better to be safe than sorry. */ + if (bc->timeout_callback == NULL) return REDISMODULE_ERR; + if (bc->unblocked) return REDISMODULE_OK; + if (bc->client) moduleBlockedClientTimedOut(bc->client); } - pthread_mutex_unlock(&moduleUnblockedClientsMutex); + moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; } @@ -4144,16 +4356,19 @@ void moduleHandleBlockedClients(void) { * touch the shared list. */ /* Call the reply callback if the client is valid and we have - * any callback. */ - if (c && bc->reply_callback) { + * any callback. However the callback is not called if the client + * was blocked on keys (RM_BlockClientOnKeys()), because we already + * called such callback in moduleTryServeClientBlockedOnKey() when + * the key was signaled as ready. */ + if (c && !bc->blocked_on_keys && bc->reply_callback) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_privdata = bc->privdata; + ctx.blocked_ready_key = NULL; ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; bc->reply_callback(&ctx,(void**)c->argv,c->argc); - moduleHandlePropagationAfterCommandCallback(&ctx); moduleFreeContext(&ctx); } @@ -4241,6 +4456,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { return ctx->blocked_privdata; } +/* Get the key that is ready when the reply callback is called in the context + * of a client blocked by RedisModule_BlockClientOnKeys(). */ +RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) { + return ctx->blocked_ready_key; +} + /* Get the blocked client associated with a given context. * This is useful in the reply and timeout callbacks of blocked clients, * before sometimes the module has the blocked client handle references @@ -4295,7 +4516,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { ctx->client = createClient(NULL); if (bc) { selectDb(ctx->client,bc->dbid); - ctx->client->id = bc->client->id; + if (bc->client) ctx->client->id = bc->client->id; } return ctx; } @@ -5781,8 +6002,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The following sub events are available: * - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA * * The 'data' field can be casted by the callback to a * RedisModuleReplicationInfo structure with the following fields: @@ -5792,24 +6013,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int masterport; // master instance port for NOW_REPLICA * char *replid1; // Main replication ID * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset * uint64_t repl2_offset; // Offset of replid2 validity - * uint64_t main_repl_offset; // Replication offset * * RedisModuleEvent_Persistence * * This event is called when RDB saving or AOF rewriting starts * and ends. The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start - * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end - * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start - * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end + * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START + * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED + * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED * * The above events are triggered not just when the user calls the * relevant commands like BGSAVE, but also when a saving operation * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * * RedisModuleEvent_FlushDB * @@ -5817,8 +6044,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * because of replication, after the replica synchronization) * happened. The following sub events are available: * - * REDISMODULE_EVENT_FLUSHDB_START - * REDISMODULE_EVENT_FLUSHDB_END + * REDISMODULE_SUBEVENT_FLUSHDB_START + * REDISMODULE_SUBEVENT_FLUSHDB_END * * The data pointer can be casted to a RedisModuleFlushInfo * structure with the following fields: @@ -5842,12 +6069,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica is loading the RDB file from the master. * The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START - * REDISMODULE_EVENT_LOADING_RDB_END - * REDISMODULE_EVENT_LOADING_MASTER_RDB_START - * REDISMODULE_EVENT_LOADING_MASTER_RDB_END - * REDISMODULE_EVENT_LOADING_AOF_START - * REDISMODULE_EVENT_LOADING_AOF_END + * REDISMODULE_SUBEVENT_LOADING_RDB_START + * REDISMODULE_SUBEVENT_LOADING_AOF_START + * REDISMODULE_SUBEVENT_LOADING_REPL_START + * REDISMODULE_SUBEVENT_LOADING_ENDED + * REDISMODULE_SUBEVENT_LOADING_FAILED + * + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only recieve an AOF_START event. + * * * RedisModuleEvent_ClientChange * @@ -5856,8 +6086,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * structure, documented in RedisModule_GetClientInfoById(). * The following sub events are available: * - * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED * * RedisModuleEvent_Shutdown * @@ -5870,8 +6100,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica since it gets disconnected. * The following sub events are availble: * - * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE * * No additional information is available so far: future versions * of Redis will have an API in order to enumerate the replicas @@ -5886,6 +6116,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * this changes depending on the "hz" configuration. * No sub events are available. * + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * * RedisModuleEvent_MasterLinkChange * * This is called for replicas in order to notify when the @@ -5895,8 +6130,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replication is happening correctly. * The following sub events are available: * - * REDISMODULE_EVENT_MASTER_LINK_UP - * REDISMODULE_EVENT_MASTER_LINK_DOWN + * REDISMODULE_SUBEVENT_MASTER_LINK_UP + * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * + * RedisModuleEvent_ModuleChange + * + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_MODULE_LOADED + * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: + * + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. + * + * RedisModuleEvent_LoadingProgress + * + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * or -1 if unknown. * * The function returns REDISMODULE_OK if the module was successfully subscrived * for the specified event. If the API is called from a wrong context then @@ -5955,7 +6220,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { listRewind(RedisModule_EventListeners,&li); while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; - if (el->event.id == eid && !el->module->in_hook) { + if (el->event.id == eid) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = el->module; @@ -5969,6 +6234,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { void *moduledata = NULL; RedisModuleClientInfoV1 civ1; + RedisModuleReplicationInfoV1 riv1; + RedisModuleModuleChangeV1 mcv1; /* Start at DB zero by default when calling the handler. It's * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct @@ -5980,11 +6247,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { modulePopulateClientInfoStructure(&civ1,data, el->event.dataver); moduledata = &civ1; + } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) { + modulePopulateReplicationInfoStructure(&riv1,el->event.dataver); + moduledata = &riv1; } else if (eid == REDISMODULE_EVENT_FLUSHDB) { moduledata = data; RedisModuleFlushInfoV1 *fi = data; if (fi->dbnum != -1) selectDb(ctx.client, fi->dbnum); + } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { + RedisModule *m = data; + if (m == el->module) + continue; + mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; + mcv1.module_name = m->name; + mcv1.module_version = m->ver; + moduledata = &mcv1; + } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) { + moduledata = data; + } else if (eid == REDISMODULE_EVENT_CRON_LOOP) { + moduledata = data; } ModulesInHooks++; @@ -6016,6 +6298,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } } +void processModuleLoadingProgressEvent(int is_aof) { + long long now = ustime(); + static long long next_event = 0; + if (now >= next_event) { + /* Fire the loading progress modules end event. */ + int progress = -1; + if (server.loading_total_bytes) + progress = (server.loading_total_bytes<<10) / server.loading_total_bytes; + RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION, + server.hz, + progress}; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS, + is_aof? + REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF: + REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB, + &fi); + /* decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -6184,6 +6487,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); + /* Fire the loaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_LOADED, + ctx.module); + moduleFreeContext(&ctx); return C_OK; } @@ -6246,6 +6554,11 @@ int moduleUnload(sds name) { module->name, error); } + /* Fire the unloaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_UNLOADED, + module); + /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); @@ -6494,6 +6807,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringTruncate); REGISTER_API(SetExpire); REGISTER_API(GetExpire); + REGISTER_API(ResetDataset); + REGISTER_API(DbSize); + REGISTER_API(RandomKey); REGISTER_API(ZsetAdd); REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); @@ -6621,7 +6937,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldLongLong); REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); + REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); + REGISTER_API(BlockClientOnKeys); + REGISTER_API(SignalKeyAsReady); + REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c index ba634c4a1..4f2d1d730 100644 --- a/src/modules/hellotype.c +++ b/src/modules/hellotype.c @@ -129,6 +129,7 @@ int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, /* Insert the new element. */ HelloTypeInsert(hto,value); + RedisModule_SignalKeyAsReady(ctx,argv[1]); RedisModule_ReplyWithLongLong(ctx,hto->len); RedisModule_ReplicateVerbatim(ctx); @@ -190,6 +191,77 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return REDISMODULE_OK; } +/* ====================== Example of a blocking command ==================== */ + +/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get + * called when the key we blocked for is ready: we need to check if we + * can really serve the client, and reply OK or ERR accordingly. */ +int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_MODULE || + RedisModule_ModuleTypeGetType(key) != HelloType) + { + RedisModule_CloseKey(key); + return REDISMODULE_ERR; + } + + /* In case the key is able to serve our blocked client, let's directly + * use our original command implementation to make this example simpler. */ + RedisModule_CloseKey(key); + return HelloTypeRange_RedisCommand(ctx,argv,argc-1); +} + +/* Timeout callback for blocking command HELLOTYPE.BRANGE */ +int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); +} + +/* Private data freeing callback for HELLOTYPE.BRANGE command. */ +void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { + REDISMODULE_NOT_USED(ctx); + RedisModule_Free(privdata); +} + +/* HELLOTYPE.BRANGE key first count timeout -- This is a blocking verison of + * the RANGE operation, in order to show how to use the API + * RedisModule_BlockClientOnKeys(). */ +int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) return RedisModule_WrongArity(ctx); + RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_ModuleTypeGetType(key) != HelloType) + { + return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); + } + + /* Parse the timeout before even trying to serve the client synchronously, + * so that we always fail ASAP on syntax errors. */ + long long timeout; + if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, + "ERR invalid timeout parameter"); + } + + /* Can we serve the reply synchronously? */ + if (type != REDISMODULE_KEYTYPE_EMPTY) { + return HelloTypeRange_RedisCommand(ctx,argv,argc-1); + } + + /* Otherwise let's block on the key. */ + void *privdata = RedisModule_Alloc(100); + RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata); + return REDISMODULE_OK; +} /* ========================== "hellotype" type methods ======================= */ @@ -282,5 +354,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hellotype.brange", + HelloTypeBRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/src/networking.c b/src/networking.c index e7cc561fa..9336c177c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1118,6 +1118,11 @@ void freeClient(client *c) { if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + if (c->replstate == SLAVE_STATE_ONLINE) + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, + NULL); } /* Master/slave cleanup Case 2: diff --git a/src/rdb.c b/src/rdb.c index f530219a4..b569edfea 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { +int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; + int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; @@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { @@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && + if (rdbflags & RDBFLAGS_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; @@ -1254,18 +1254,21 @@ werr: int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; + startSaving(RDBFLAGS_REPLICATION); getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); if (error) *error = 0; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; + stopSaving(1); return C_OK; werr: /* Write error. */ /* Set 'error' only if not already set by rdbSaveRio() call. */ if (error && *error == 0) *error = errno; + stopSaving(0); return C_ERR; } @@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { } rioInitWithFile(&rdb,fp); + startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(size_t size) { +void startLoading(size_t size, int rdbflags) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; + + /* Fire the loading modules start event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START; + else if(rdbflags & RDBFLAGS_REPLICATION) + subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START; + else + subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL); } /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. * 'filename' is optional and used for rdb-check on error */ -void startLoadingFile(FILE *fp, char* filename) { +void startLoadingFile(FILE *fp, char* filename, int rdbflags) { struct stat sb; if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; rdbFileBeingLoaded = filename; - startLoading(sb.st_size); + startLoading(sb.st_size, rdbflags); } /* Refresh the loading progress info */ @@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) { } /* Loading finished */ -void stopLoading(void) { +void stopLoading(int success) { server.loading = 0; rdbFileBeingLoaded = NULL; + + /* Fire the loading modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_LOADING, + success? + REDISMODULE_SUBEVENT_LOADING_ENDED: + REDISMODULE_SUBEVENT_LOADING_FAILED, + NULL); +} + +void startSaving(int rdbflags) { + /* Fire the persistence modules end event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; + else if (getpid()!=server.pid) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; + else + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL); +} + +void stopSaving(int success) { + /* Fire the persistence modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, + success? + REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: + REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, + NULL); } /* Track loading progress in order to serve client's from time to time @@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(0); } } /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { + if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { @@ -2243,17 +2289,17 @@ eoferr: * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { FILE *fp; rio rdb; int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoadingFile(fp, filename); + startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rsi,0); + retval = rdbLoadRio(&rdb,rdbflags,rsi); fclose(fp); - stopLoading(); + stopLoading(retval==C_OK); return retval; } diff --git a/src/rdb.h b/src/rdb.h index 40a50f7ba..4229beea8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -121,8 +121,10 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) -#define RDB_SAVE_NONE 0 -#define RDB_SAVE_AOF_PREAMBLE (1<<0) +/* flags on the purpose of rdb save or load */ +#define RDBFLAGS_NONE 0 +#define RDBFLAGS_AOF_PREAMBLE (1<<0) +#define RDBFLAGS_REPLICATION (1<<1) int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); @@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 5e7415046..1210d49b4 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoadingFile(fp, rdbfilename); + startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE); while(1) { robj *key, *val; @@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); - stopLoading(); + stopLoading(1); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); - stopLoading(); + stopLoading(0); return 1; } diff --git a/src/redismodule.h b/src/redismodule.h index 96e9fb4fa..26604d63c 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -185,6 +185,8 @@ typedef uint64_t RedisModuleTimerID; #define REDISMODULE_EVENT_REPLICA_CHANGE 6 #define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7 #define REDISMODULE_EVENT_CRON_LOOP 8 +#define REDISMODULE_EVENT_MODULE_CHANGE 9 +#define REDISMODULE_EVENT_LOADING_PROGRESS 10 typedef struct RedisModuleEvent { uint64_t id; /* REDISMODULE_EVENT_... defines. */ @@ -230,18 +232,28 @@ static const RedisModuleEvent RedisModuleEvent_MasterLinkChange = { REDISMODULE_EVENT_MASTER_LINK_CHANGE, 1 + }, + RedisModuleEvent_ModuleChange = { + REDISMODULE_EVENT_MODULE_CHANGE, + 1 + }, + RedisModuleEvent_LoadingProgress = { + REDISMODULE_EVENT_LOADING_PROGRESS, + 1 }; /* Those are values that are used for the 'subevent' callback argument. */ #define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0 -#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1 +#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2 +#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4 #define REDISMODULE_SUBEVENT_LOADING_RDB_START 0 -#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1 -#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2 -#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3 +#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1 +#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2 +#define REDISMODULE_SUBEVENT_LOADING_ENDED 3 +#define REDISMODULE_SUBEVENT_LOADING_FAILED 4 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1 @@ -249,12 +261,21 @@ static const RedisModuleEvent #define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0 #define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1 + +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0 +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1 #define REDISMODULE_SUBEVENT_FLUSHDB_START 0 #define REDISMODULE_SUBEVENT_FLUSHDB_END 1 +#define REDISMODULE_SUBEVENT_MODULE_LOADED 0 +#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1 + +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0 +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1 + /* RedisModuleClientInfo flags. */ #define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0) #define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1) @@ -290,6 +311,22 @@ typedef struct RedisModuleClientInfo { #define RedisModuleClientInfo RedisModuleClientInfoV1 +#define REDISMODULE_REPLICATIONINFO_VERSION 1 +typedef struct RedisModuleReplicationInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int master; /* true if master, false if replica */ + char *masterhost; /* master instance hostname for NOW_REPLICA */ + int masterport; /* master instance port for NOW_REPLICA */ + char *replid1; /* Main replication ID */ + char *replid2; /* Secondary replication ID */ + uint64_t repl1_offset; /* Main replication offset */ + uint64_t repl2_offset; /* Offset of replid2 validity */ +} RedisModuleReplicationInfoV1; + +#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1 + #define REDISMODULE_FLUSHINFO_VERSION 1 typedef struct RedisModuleFlushInfo { uint64_t version; /* Not used since this structure is never passed @@ -301,6 +338,39 @@ typedef struct RedisModuleFlushInfo { #define RedisModuleFlushInfo RedisModuleFlushInfoV1 +#define REDISMODULE_MODULE_CHANGE_VERSION 1 +typedef struct RedisModuleModuleChange { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + const char* module_name;/* Name of module loaded or unloaded. */ + int32_t module_version; /* Module version. */ +} RedisModuleModuleChangeV1; + +#define RedisModuleModuleChange RedisModuleModuleChangeV1 + +#define REDISMODULE_CRON_LOOP_VERSION 1 +typedef struct RedisModuleCronLoopInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ +} RedisModuleCronLoopV1; + +#define RedisModuleCronLoop RedisModuleCronLoopV1 + +#define REDISMODULE_LOADING_PROGRESS_VERSION 1 +typedef struct RedisModuleLoadingProgressInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ + int32_t progress; /* Approximate progress between 0 and 1024, or -1 + * if unknown. */ +} RedisModuleLoadingProgressV1; + +#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1 + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -420,6 +490,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen); mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire); +void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async); +unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr); int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); @@ -439,6 +512,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id); +int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message); int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); @@ -509,6 +583,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx * int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); +RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); +void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -622,6 +699,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringTruncate); REDISMODULE_GET_API(GetExpire); REDISMODULE_GET_API(SetExpire); + REDISMODULE_GET_API(ResetDataset); + REDISMODULE_GET_API(DbSize); + REDISMODULE_GET_API(RandomKey); REDISMODULE_GET_API(ZsetAdd); REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); @@ -708,9 +788,13 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldLongLong); REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); + REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(SetLRUOrLFU); REDISMODULE_GET_API(GetLRUOrLFU); + REDISMODULE_GET_API(BlockClientOnKeys); + REDISMODULE_GET_API(SignalKeyAsReady); + REDISMODULE_GET_API(GetBlockedClientReadyKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/replication.c b/src/replication.c index 4550e6a83..c9a2e0fe1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) { * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); + + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); + return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: @@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) { return; } refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); } @@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) { * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION); - if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) { /* RDB loading failed. */ - stopLoading(); + stopLoading(0); serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " "from socket"); @@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) { * gets promoted. */ return; } - stopLoading(); + stopLoading(1); /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(); return; } - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk"); @@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) { server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* Fire the master link modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_UP, + NULL); + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ @@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, + NULL); + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.repl_state = REPL_STATE_CONNECT; } /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + sdsfree(server.masterhost); server.masterhost = NULL; /* When a slave is turned into a master, the current replication ID @@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, + NULL); } /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(void) { + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.master = NULL; server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = server.unixtime; diff --git a/src/server.c b/src/server.c index 8f165113d..f42924764 100644 --- a/src/server.c +++ b/src/server.c @@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { server.rdb_bgsave_scheduled = 0; } + /* Fire the cron loop modules event. */ + RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; + moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, + 0, + &ei); + server.cronloops++; return 1000/server.hz; } @@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) { } } + /* Fire the shutdown modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL); + /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE,"Removing the pid file."); @@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) { serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); diff --git a/src/server.h b/src/server.h index 30b25a918..8063dc101 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,10 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +void processModuleLoadingProgressEvent(int is_aof); +int moduleTryServeClientBlockedOnKey(client *c, robj *key); +void moduleUnblockClient(client *c); +int moduleClientIsBlockedOnKeys(client *c); /* Utils */ long long ustime(void); @@ -1831,10 +1835,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); /* Generic persistence functions */ -void startLoadingFile(FILE* fp, char* filename); -void startLoading(size_t size); +void startLoadingFile(FILE* fp, char* filename, int rdbflags); +void startLoading(size_t size, int rdbflags); void loadingProgress(off_t pos); -void stopLoading(void); +void stopLoading(int success); +void startSaving(int rdbflags); +void stopSaving(int success); #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ @@ -1843,7 +1849,6 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */ @@ -1859,6 +1864,7 @@ void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); ssize_t aofReadDiffFromParent(void); void killAppendOnlyChild(void); +void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); @@ -2099,6 +2105,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); int selectDb(client *c, int id); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 71c0b5ef8..9e27758a2 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -18,7 +18,8 @@ TEST_MODULES = \ infotest.so \ propagate.so \ misc.so \ - hooks.so + hooks.so \ + blockonkeys.so .PHONY: all diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c new file mode 100644 index 000000000..959918b1c --- /dev/null +++ b/tests/modules/blockonkeys.c @@ -0,0 +1,261 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define LIST_SIZE 1024 + +typedef struct { + long long list[LIST_SIZE]; + long long length; +} fsl_t; /* Fixed-size list */ + +static RedisModuleType *fsltype = NULL; + +fsl_t *fsl_type_create() { + fsl_t *o; + o = RedisModule_Alloc(sizeof(*o)); + o->length = 0; + return o; +} + +void fsl_type_free(fsl_t *o) { + RedisModule_Free(o); +} + +/* ========================== "fsltype" type methods ======================= */ + +void *fsl_rdb_load(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + return NULL; + } + fsl_t *fsl = fsl_type_create(); + fsl->length = RedisModule_LoadUnsigned(rdb); + for (long long i = 0; i < fsl->length; i++) + fsl->list[i] = RedisModule_LoadSigned(rdb); + return fsl; +} + +void fsl_rdb_save(RedisModuleIO *rdb, void *value) { + fsl_t *fsl = value; + RedisModule_SaveUnsigned(rdb,fsl->length); + for (long long i = 0; i < fsl->length; i++) + RedisModule_SaveSigned(rdb, fsl->list[i]); +} + +void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) { + fsl_t *fsl = value; + for (long long i = 0; i < fsl->length; i++) + RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]); +} + +void fsl_free(void *value) { + fsl_type_free(value); +} + +/* ========================== helper methods ======================= */ + +int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) { + RedisModule_CloseKey(key); + if (reply_on_failure) + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + return 0; + } + + /* Create an empty value object if the key is currently empty. */ + if (type == REDISMODULE_KEYTYPE_EMPTY) { + if (!create) { + /* Key is empty but we cannot create */ + RedisModule_CloseKey(key); + *fsl = NULL; + return 1; + } + *fsl = fsl_type_create(); + RedisModule_ModuleTypeSetValue(key, fsltype, *fsl); + } else { + *fsl = RedisModule_ModuleTypeGetValue(key); + } + + RedisModule_CloseKey(key); + return 1; +} + +/* ========================== commands ======================= */ + +/* FSL.PUSH - Push an integer to the fixed-size list (to the right). + * It must be greater than the element in the head of the list. */ +int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long ele; + if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1)) + return REDISMODULE_OK; + + if (fsl->length == LIST_SIZE) + return RedisModule_ReplyWithError(ctx,"ERR list is full"); + + if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele) + return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); + + fsl->list[fsl->length++] = ele; + + if (fsl->length >= 2) + RedisModule_SignalKeyAsReady(ctx, argv[1]); + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->length < 2) + return REDISMODULE_ERR; + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + + +/* FSL.BPOP2 - Block clients until list has two or more elements. + * When that happens, unblock client and pop the last two elements (from the right). */ +int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->length < 2) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + NULL, timeout, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->list[fsl->length-1] <= gt) + return REDISMODULE_ERR; + + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { + /* Nothing to do because privdata is actually a 'long long', + * not a pointer to the heap */ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(privdata); +} + +/* FSL.BPOPGT - Block clients until list has an element greater than . + * When that happens, unblock client and pop the last element (from the right). */ +int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long gt; + if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->list[fsl->length-1] <= gt) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); + } else { + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = fsl_rdb_load, + .rdb_save = fsl_rdb_save, + .aof_rewrite = fsl_aofrw, + .mem_usage = NULL, + .free = fsl_free, + .digest = NULL + }; + + fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); + if (fsltype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c index 33b690b2f..665a20481 100644 --- a/tests/modules/hooks.c +++ b/tests/modules/hooks.c @@ -30,36 +30,227 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" +#include +#include + +/* We need to store events to be able to test and see what we got, and we can't + * store them in the key-space since that would mess up rdb loading (duplicates) + * and be lost of flushdb. */ +RedisModuleDict *event_log = NULL; + +typedef struct EventElement { + long count; + RedisModuleString *last_val_string; + long last_val_int; +} EventElement; + +void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) { + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data)); + event->count++; +} + +void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) { + REDISMODULE_NOT_USED(ctx); + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + event->last_val_int = data; + event->count++; +} + +void FreeEvent(RedisModuleCtx *ctx, EventElement *event) { + if (event->last_val_string) + RedisModule_FreeString(ctx, event->last_val_string); + RedisModule_Free(event); +} + +int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + RedisModule_ReplyWithLongLong(ctx, event? event->count: 0); + return REDISMODULE_OK; +} + +int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + if (event && event->last_val_string) + RedisModule_ReplyWithString(ctx, event->last_val_string); + else if (event) + RedisModule_ReplyWithLongLong(ctx, event->last_val_int); + else + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; +} + +void clearEvents(RedisModuleCtx *ctx) +{ + RedisModuleString *key; + EventElement *event; + RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL); + while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) { + event->count = 0; + event->last_val_int = 0; + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = NULL; + RedisModule_DictDel(event_log, key, NULL); + RedisModule_Free(event); + } + RedisModule_DictIteratorStop(iter); +} + +int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argc); + REDISMODULE_NOT_USED(argv); + clearEvents(ctx); + return REDISMODULE_OK; +} /* Client state change callback. */ void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleClientInfo *ci = data; char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ? - "connected" : "disconnected"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id); - RedisModule_FreeCallReply(reply); + "client-connected" : "client-disconnected"; + LogNumericEvent(ctx, keyname, ci->id); } void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleFlushInfo *fi = data; char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? "flush-start" : "flush-end"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum); - RedisModule_FreeCallReply(reply); + LogNumericEvent(ctx, keyname, fi->dbnum); +} + +void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + RedisModuleReplicationInfo *ri = data; + char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ? + "role-master" : "role-replica"; + LogStringEvent(ctx, keyname, ri->masterhost); +} + +void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ? + "replica-online" : "replica-offline"; + LogNumericEvent(ctx, keyname, 0); +} + +void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ? + "masterlink-up" : "masterlink-down"; + LogNumericEvent(ctx, keyname, 0); +} + +void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break; + } + /* modifying the keyspace from the fork child is not an option, using log instead */ + RedisModule_Log(ctx, "warning", "module-event-%s", keyname); + if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START) + LogNumericEvent(ctx, keyname, 0); +} + +void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break; + case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break; + case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break; + case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break; + case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break; + } + LogNumericEvent(ctx, keyname, 0); +} + +void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleLoadingProgress *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ? + "loading-progress-rdb" : "loading-progress-aof"; + LogNumericEvent(ctx, keyname, ei->progress); +} + +void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + REDISMODULE_NOT_USED(sub); + + RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown"); +} + +void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(sub); + + RedisModuleCronLoop *ei = data; + LogNumericEvent(ctx, "cron-loop", ei->hz); +} + +void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleModuleChange *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ? + "module-loaded" : "module-unloaded"; + LogStringEvent(ctx, keyname, ei->module_name); } /* This function must be present on each Redis module. It is used in order to @@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + /* replication related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicaChange, replicationChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback); + + /* persistence related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Persistence, persistenceCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Loading, loadingCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_LoadingProgress, loadingProgressCallback); + + /* other hooks */ RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClientChange, clientChangeCallback); RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Shutdown, shutdownCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_CronLoop, cronLoopCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ModuleChange, moduleChangeCallback); + + event_log = RedisModule_CreateDict(ctx); + + if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + clearEvents(ctx); + RedisModule_FreeDict(ctx, event_log); + event_log = NULL; + return REDISMODULE_OK; +} + diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl new file mode 100644 index 000000000..cb99ab1c9 --- /dev/null +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -0,0 +1,85 @@ +set testmodule [file normalize tests/modules/blockonkeys.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module client blocked on keys (no metadata): No block} { + r del k + r fsl.push k 33 + r fsl.push k 34 + r fsl.bpop2 k 0 + } {34 33} + + test {Module client blocked on keys (no metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 0 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + r fsl.push k 34 + $rd fsl.bpop2 k 0 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (with metadata): No block} { + r del k + r fsl.push k 34 + r fsl.bpopgt k 30 0 + } {34} + + test {Module client blocked on keys (with metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 35 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 33 0 + r fsl.push k 34 + assert_equal {34} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + r fsl.push k 33 + r fsl.push k 34 + r fsl.push k 35 + r fsl.push k 36 + assert_equal {36} [$rd read] + } + + test {Module client blocked on keys does not wake up on wrong type} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpop2 k 0 + r lpush k 12 + r lpush k 13 + r lpush k 14 + r del k + r fsl.push k 33 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } +} diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index 7a727902d..cbca9c3eb 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so] tags "modules" { start_server {} { r module load $testmodule + r config set appendonly yes + test {Test clients connection / disconnection hooks} { for {set j 0} {$j < 2} {incr j} { set rd1 [redis_deferring_client] $rd1 close } - assert {[r llen connected] > 1} - assert {[r llen disconnected] > 1} + assert {[r hooks.event_count client-connected] > 1} + assert {[r hooks.event_count client-disconnected] > 1} + } + + test {Test module cron hook} { + after 100 + assert {[r hooks.event_count cron-loop] > 0} + set hz [r hooks.event_last cron-loop] + assert_equal $hz 10 + } + + test {Test module loaded / unloaded hooks} { + set othermodule [file normalize tests/modules/infotest.so] + r module load $othermodule + r module unload infotest + assert_equal [r hooks.event_last module-loaded] "infotest" + assert_equal [r hooks.event_last module-unloaded] "infotest" + } + + test {Test module aofrw hook} { + r debug populate 1000 foo 10000 ;# 10mb worth of data + r config set rdbcompression no ;# rdb progress is only checked once in 2mb + r BGREWRITEAOF + waitForBgrewriteaof r + assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1 + assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1 + } + + test {Test module aof load and rdb/aof progress hooks} { + # create some aof tail (progress is checked only once in 1000 commands) + for {set j 0} {$j < 4000} {incr j} { + r set "bar$j" x + } + # set some configs that will cause many loading progress events during aof loading + r config set key-load-delay 1 + r config set dynamic-hz no + r config set hz 500 + r DEBUG LOADAOF + assert_equal [r hooks.event_last loading-aof-start] 0 + assert_equal [r hooks.event_last loading-end] 0 + assert {[r hooks.event_count loading-rdb-start] == 0} + assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section + assert {[r hooks.event_count loading-progress-aof] >= 2} + } + # undo configs before next test + r config set dynamic-hz yes + r config set key-load-delay 0 + + test {Test module rdb save hook} { + # debug reload does: save, flush, load: + assert {[r hooks.event_count persistence-syncrdb-start] == 0} + assert {[r hooks.event_count loading-rdb-start] == 0} + r debug reload + assert {[r hooks.event_count persistence-syncrdb-start] == 1} + assert {[r hooks.event_count loading-rdb-start] == 1} } test {Test flushdb hooks} { - r flushall ;# Note: only the "end" RPUSH will survive - r select 1 r flushdb - r select 2 - r flushdb - r select 9 - assert {[r llen flush-start] == 2} - assert {[r llen flush-end] == 3} - assert {[r lrange flush-start 0 -1] eq {1 2}} - assert {[r lrange flush-end 0 -1] eq {-1 1 2}} + assert_equal [r hooks.event_last flush-start] 9 + assert_equal [r hooks.event_last flush-end] 9 + r flushall + assert_equal [r hooks.event_last flush-start] -1 + assert_equal [r hooks.event_last flush-end] -1 } + + # replication related tests + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server {} { + r module load $testmodule + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [r info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + test {Test master link up hook} { + assert_equal [r hooks.event_count masterlink-up] 1 + assert_equal [r hooks.event_count masterlink-down] 0 + } + + test {Test role-replica hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 0 + assert_equal [r hooks.event_last role-replica] [s 0 master_host] + } + + test {Test replica-online hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 0 + } + + test {Test master link down hook} { + r client kill type master + assert_equal [r hooks.event_count masterlink-down] 1 + } + + $replica replicaof no one + + test {Test role-master hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 1 + assert_equal [r hooks.event_last role-master] {} + } + + test {Test replica-offline hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 1 + } + # get the replica stdout, to be used by the next test + set replica_stdout [srv 0 stdout] + } + + + # look into the log file of the server that just exited + test {Test shutdown hook} { + assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1 + } + } }