From 1bca62c4b751b325c726482dd9a7264125a2664c Mon Sep 17 00:00:00 2001
From: antirez <antirez@gmail.com>
Date: Thu, 3 Oct 2019 11:03:46 +0200
Subject: [PATCH] Modules: RM_Replicate() in thread safe contexts.

---
 src/module.c | 30 ++++++++++++++++++++++++++----
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/module.c b/src/module.c
index e8fb5adb3..c87f81c5d 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1407,6 +1407,20 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
  *
  * Please refer to RedisModule_Call() for more information.
  *
+ * ## Note about calling this function from a thread safe context:
+ *
+ * Normally when you call this function from the callback implementing a
+ * module command, or any other callback provided by the Redis Module API,
+ * Redis will accumulate all the calls to this function in the context of
+ * the callback, and will propagate all the commands wrapped in a MULTI/EXEC
+ * transaction. However when calling this function from a threaded safe context
+ * that can live an undefined amount of time, and can be locked/unlocked in
+ * at will, the behavior is different: MULTI/EXEC wrapper is not emitted
+ * and the command specified is inserted in the AOF and replication stream
+ * immediately.
+ *
+ * ## Return value
+ *
  * The command returns REDISMODULE_ERR if the format specifiers are invalid
  * or the command name does not belong to a known command. */
 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
@@ -1424,10 +1438,18 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
     va_end(ap);
     if (argv == NULL) return REDISMODULE_ERR;
 
-    /* Replicate! */
-    moduleReplicateMultiIfNeeded(ctx);
-    alsoPropagate(cmd,ctx->client->db->id,argv,argc,
-        PROPAGATE_AOF|PROPAGATE_REPL);
+    /* Replicate! When we are in a threaded context, we want to just insert
+     * the replicated command ASAP, since it is not clear when the context
+     * will stop being used, so accumulating stuff does not make much sense,
+     * nor we could easily use the alsoPropagate() API from threads. */
+    if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
+        propagate(cmd,ctx->client->db->id,argv,argc,
+            PROPAGATE_AOF|PROPAGATE_REPL);
+    } else {
+        moduleReplicateMultiIfNeeded(ctx);
+        alsoPropagate(cmd,ctx->client->db->id,argv,argc,
+            PROPAGATE_AOF|PROPAGATE_REPL);
+    }
 
     /* Release the argv. */
     for (j = 0; j < argc; j++) decrRefCount(argv[j]);