Add support to notify modules of keys loaded by flash on startup (#536)

This commit is contained in:
Malavan Sotheeswaran 2023-02-06 12:52:32 -05:00 committed by GitHub
parent 5123e2b3a1
commit a1978ce04c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 133 additions and 9 deletions

View File

@ -37,5 +37,6 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/hash \
--single unit/moduleapi/zset \
--single unit/moduleapi/stream \
--single unit/moduleapi/load \
--config server-threads 3 \
"${@}"

View File

@ -2607,6 +2607,14 @@ void clusterStorageLoadCallback(const char *rgchkey, size_t cch, void *)
slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/);
}
void moduleLoadCallback(const char * rgchKey, size_t cchKey, void *data) {
if (g_pserver->cluster_enabled) {
clusterStorageLoadCallback(rgchKey, cchKey, data);
}
robj *keyobj = createEmbeddedStringObject(rgchKey, cchKey);
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", keyobj, *(int *)data);
}
void redisDb::initialize(int id)
{
redisDbPersistentData::initialize();
@ -2625,8 +2633,8 @@ void redisDb::storageProviderInitialize()
{
if (g_pserver->m_pstorageFactory != nullptr)
{
IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr;
this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, nullptr));
IStorageFactory::key_load_iterator itr = moduleLoadCallback;
this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, &id));
}
}

View File

@ -326,6 +326,7 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_LOADING_ENDED 3
#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define _REDISMODULE_SUBEVENT_LOADING_NEXT 5
#define REDISMODULE_SUBEVENT_LOADING_FLASH_START 6
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1

View File

@ -4068,12 +4068,6 @@ void initServer(void) {
}
}
/* We have to initialize storage providers after the cluster has been initialized */
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->storageProviderInitialize();
}
saveMasterStatusToStorage(false); // eliminate the repl-offset field
/* Initialize ACL default password if it exists */
@ -4086,6 +4080,15 @@ void initServer(void) {
* Thread Local Storage initialization collides with dlopen call.
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
/* We have to initialize storage providers after the cluster has been initialized */
moduleFireServerEvent(REDISMODULE_EVENT_LOADING, REDISMODULE_SUBEVENT_LOADING_FLASH_START, NULL);
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->storageProviderInitialize();
}
moduleFireServerEvent(REDISMODULE_EVENT_LOADING, REDISMODULE_SUBEVENT_LOADING_ENDED, NULL);
bioInit();
set_jemalloc_bg_thread(cserver.jemalloc_bg_thread);
g_pserver->initial_memory_usage = zmalloc_used_memory();

View File

@ -39,7 +39,8 @@ TEST_MODULES = \
defragtest.so \
hash.so \
zset.so \
stream.so
stream.so \
load.so
.PHONY: all

View File

@ -212,6 +212,7 @@ void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void
case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break;
case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break;
case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break;
case REDISMODULE_SUBEVENT_LOADING_FLASH_START: keyname = "loading-flash-start"; break;
case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break;
case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break;
}

94
tests/modules/load.c Normal file
View File

@ -0,0 +1,94 @@
/* Server hooks API example
*
* -----------------------------------------------------------------------------
*
* Copyright (c) 2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
size_t count, finalCount;
/* Client state change callback. */
void loadCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
if (sub == REDISMODULE_SUBEVENT_LOADING_FLASH_START || sub == REDISMODULE_SUBEVENT_LOADING_RDB_START || sub == REDISMODULE_SUBEVENT_LOADING_AOF_START || sub == REDISMODULE_SUBEVENT_LOADING_REPL_START) {
count = 0;
finalCount = 0;
} else if (sub == REDISMODULE_SUBEVENT_LOADING_ENDED) {
finalCount = count;
}
}
int loadKeyCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *keyname = RedisModule_StringPtrLen(key, NULL);
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "Loaded key: %s", keyname);
count++;
return 0;
}
int LoadCount_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
if (argc != 1) return RedisModule_WrongArity(ctx);
RedisModule_ReplyWithLongLong(ctx, finalCount);
return REDISMODULE_OK;
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"load",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Loading, loadCallback);
RedisModule_SubscribeToKeyspaceEvents(ctx,
REDISMODULE_NOTIFY_LOADED, loadKeyCallback);
if (RedisModule_CreateCommand(ctx, "load.count",
LoadCount_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -0,0 +1,15 @@
set testmodule [file normalize tests/modules/load.so]
if {$::flash_enabled} {
start_server [list tags [list "modules"] overrides [list storage-provider {flash ./rocks.db.master} databases 256 loadmodule $testmodule]] {
test "Module is notified of keys loaded from flash" {
r flushall
r set foo bar
r set bar foo
r set foobar barfoo
assert_equal [r load.count] 0
r debug reload
assert_equal [r load.count] 3
}
}
}