Support empty callback on function and free temp function in async way (#1334)

We have a replicationEmptyDbCallback, it is a callback used by emptyData
while flushing away old data. Previously, we did not add this callback
logic for function, in case of abuse, there may be a lot of functions,
and also to make the code consistent, we add the same callback logic
for function.

Changes around this commit:
1. Extend emptyData / functionsLibCtxClear to support passing callback
   when flushing functions.
2. Added disklessLoad function create and discard helper function, just
   like disklessLoadInitTempDb and disklessLoadDiscardTempDb), we wll
   always flush the temp function in a async way to avoid any block.
3. Cleanup around discardTempDb, remove the callback pointer since in
   async way we don't need the callback.
4. Remove functionsLibCtxClear call in readSyncBulkPayload, because we
   called emptyData in the previous lines, which also empty functions.

We are doing this callback in replication is because during the flush,
replica may block a while if the flush is doing in the sync way, to
avoid the primary to detect the replica is timing out, replica will use this
callback to notify the primary (we also do this callback when loading
a RDB). And in the async way, we empty the data in the bio and there is
no slw operation, so it will ignores the callback.

Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
Binbin 2024-11-25 09:59:37 +08:00 committed by GitHub
parent 33f42d7fb5
commit 653d5f7fe3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 22 deletions

View File

@ -574,7 +574,7 @@ long long emptyData(int dbnum, int flags, void(callback)(dict *)) {
if (with_functions) {
serverAssert(dbnum == -1);
functionsLibCtxClearCurrent(async);
functionsLibCtxClearCurrent(async, callback);
}
/* Also fire the end event. Note that this event will fire almost
@ -602,12 +602,10 @@ serverDb *initTempDb(void) {
return tempDb;
}
/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
void discardTempDb(serverDb *tempDb, void(callback)(dict *)) {
int async = 1;
/* Discard tempDb, it's always async. */
void discardTempDb(serverDb *tempDb) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
emptyDbStructure(tempDb, -1, 1, NULL);
for (int i = 0; i < server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);

View File

@ -161,9 +161,9 @@ static void engineLibraryDispose(void *obj) {
}
/* Clear all the functions from the given library ctx */
void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
dictEmpty(lib_ctx->functions, NULL);
dictEmpty(lib_ctx->libraries, NULL);
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) {
dictEmpty(lib_ctx->functions, callback);
dictEmpty(lib_ctx->libraries, callback);
dictIterator *iter = dictGetIterator(lib_ctx->engines_stats);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
@ -175,13 +175,13 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx) {
lib_ctx->cache_memory = 0;
}
void functionsLibCtxClearCurrent(int async) {
void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) {
if (async) {
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
curr_functions_lib_ctx = functionsLibCtxCreate();
freeFunctionsAsync(old_l_ctx);
} else {
functionsLibCtxClear(curr_functions_lib_ctx);
functionsLibCtxClear(curr_functions_lib_ctx, callback);
}
}
@ -196,7 +196,7 @@ static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int a
/* Free the given functions ctx */
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
functionsLibCtxClear(functions_lib_ctx);
functionsLibCtxClear(functions_lib_ctx, NULL);
dictRelease(functions_lib_ctx->functions);
dictRelease(functions_lib_ctx->libraries);
dictRelease(functions_lib_ctx->engines_stats);
@ -380,7 +380,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l
dictReleaseIterator(iter);
iter = NULL;
functionsLibCtxClear(functions_lib_ctx_src);
functionsLibCtxClear(functions_lib_ctx_src, NULL);
if (old_libraries_list) {
listRelease(old_libraries_list);
old_libraries_list = NULL;
@ -820,7 +820,7 @@ void functionFlushCommand(client *c) {
return;
}
functionsLibCtxClearCurrent(async);
functionsLibCtxClearCurrent(async, NULL);
/* Indicate that the command changed the data so it will be replicated and
* counted as a data change (for persistence configuration) */

View File

@ -133,9 +133,9 @@ dict *functionsLibGet(void);
size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
functionsLibCtx *functionsLibCtxGetCurrent(void);
functionsLibCtx *functionsLibCtxCreate(void);
void functionsLibCtxClearCurrent(int async);
void functionsLibCtxClearCurrent(int async, void(callback)(dict *));
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
void functionsLibCtxClear(functionsLibCtx *lib_ctx);
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *));
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);
int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err);

View File

@ -1981,7 +1981,20 @@ serverDb *disklessLoadInitTempDb(void) {
/* Helper function for readSyncBulkPayload() to discard our tempDb
* when the loading succeeded or failed. */
void disklessLoadDiscardTempDb(serverDb *tempDb) {
discardTempDb(tempDb, replicationEmptyDbCallback);
discardTempDb(tempDb);
}
/* Helper function for to initialize temp function lib context.
* The temp ctx may be populated by functionsLibCtxSwapWithCurrent or
* freed by disklessLoadDiscardFunctionsLibCtx later. */
functionsLibCtx *disklessLoadFunctionsLibCtxCreate(void) {
return functionsLibCtxCreate();
}
/* Helper function to discard our temp function lib context
* when the loading succeeded or failed. */
void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) {
freeFunctionsAsync(temp_functions_lib_ctx);
}
/* If we know we got an entirely different data set from our primary
@ -2186,7 +2199,7 @@ void readSyncBulkPayload(connection *conn) {
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Initialize empty tempDb dictionaries. */
diskless_load_tempDb = disklessLoadInitTempDb();
temp_functions_lib_ctx = functionsLibCtxCreate();
temp_functions_lib_ctx = disklessLoadFunctionsLibCtxCreate();
moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
}
@ -2226,7 +2239,6 @@ void readSyncBulkPayload(connection *conn) {
dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
}
rioInitWithConn(&rdb, conn, server.repl_transfer_size);
@ -2264,7 +2276,7 @@ void readSyncBulkPayload(connection *conn) {
NULL);
disklessLoadDiscardTempDb(diskless_load_tempDb);
functionsLibCtxFree(temp_functions_lib_ctx);
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
@ -2289,7 +2301,7 @@ void readSyncBulkPayload(connection *conn) {
swapMainDbWithTempDb(diskless_load_tempDb);
/* swap existing functions ctx with the temporary one */
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 0);
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx, 1);
moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
NULL);

View File

@ -3572,7 +3572,7 @@ long long emptyDbStructure(serverDb *dbarray, int dbnum, int async, void(callbac
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount(void);
serverDb *initTempDb(void);
void discardTempDb(serverDb *tempDb, void(callback)(dict *));
void discardTempDb(serverDb *tempDb);
int selectDb(client *c, int id);