module fixes

Former-commit-id: ef4e11ecb8a6f1a05bb21f014120b0ef9e771b60
This commit is contained in:
John Sully 2020-02-04 01:22:25 -05:00
parent 771787273a
commit 6bb1429a46
3 changed files with 100 additions and 28 deletions

View File

@ -30,6 +30,7 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "rdb.h" #include "rdb.h"
#include "aelocker.h"
#include <dlfcn.h> #include <dlfcn.h>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) {
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyLongLong(c,ll); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyLongLongAsync(c,ll);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyProto(c,prefix,strlen(prefix)); AeLocker locker;
addReplyProto(c,msg,strlen(msg)); std::unique_lock<fastlock> lock(c->lock);
addReplyProto(c,"\r\n",2); locker.arm(c);
addReplyProtoAsync(c,prefix,strlen(prefix));
addReplyProtoAsync(c,msg,strlen(msg));
addReplyProtoAsync(c,"\r\n",2);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
* The function always returns REDISMODULE_OK. */ * The function always returns REDISMODULE_OK. */
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
AeLocker locker;
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
(ctx->postponed_arrays_count+1), MALLOC_LOCAL); (ctx->postponed_arrays_count+1), MALLOC_LOCAL);
ctx->postponed_arrays[ctx->postponed_arrays_count] = ctx->postponed_arrays[ctx->postponed_arrays_count] =
addReplyDeferredLen(c); addReplyDeferredLenAsync(c);
ctx->postponed_arrays_count++; ctx->postponed_arrays_count++;
} else { } else {
addReplyArrayLen(c,len); addReplyArrayLenAsync(c,len);
} }
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyNullArray(c); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullArrayAsync(c);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptyarray); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptyarray);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return; if (c == NULL) return;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
if (ctx->postponed_arrays_count == 0) { if (ctx->postponed_arrays_count == 0) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"API misuse detected in module %s: " "API misuse detected in module %s: "
@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
return; return;
} }
ctx->postponed_arrays_count--; ctx->postponed_arrays_count--;
setDeferredArrayLen(c, setDeferredArrayLenAsync(c,
ctx->postponed_arrays[ctx->postponed_arrays_count], ctx->postponed_arrays[ctx->postponed_arrays_count],
len); len);
if (ctx->postponed_arrays_count == 0) { if (ctx->postponed_arrays_count == 0) {
@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulkCBuffer(c,(char*)buf,len); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCBufferAsync(c,(char*)buf,len);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulkCString(c,(char*)buf); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkCStringAsync(c,(char*)buf);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyBulk(c,str); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyBulkAsync(c,str);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReply(c,shared.emptybulk); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyAsync(c,shared.emptybulk);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyVerbatim(c, buf, len, "txt"); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyVerbatimAsync(c, buf, len, "txt");
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithNull(RedisModuleCtx *ctx) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyNull(c); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyNullAsync(c);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
sds proto = sdsnewlen(reply->proto, reply->protolen); sds proto = sdsnewlen(reply->proto, reply->protolen);
addReplySds(c,proto); addReplySdsAsync(c,proto);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyDouble(c,d); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyDoubleAsync(c,d);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
client *c = moduleGetReplyClient(ctx); client *c = moduleGetReplyClient(ctx);
if (c == NULL) return REDISMODULE_OK; if (c == NULL) return REDISMODULE_OK;
addReplyHumanLongDouble(c, ld); AeLocker locker;
std::unique_lock<fastlock> lock(c->lock);
locker.arm(c);
addReplyHumanLongDoubleAsync(c, ld);
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -661,23 +661,33 @@ void addReplyDoubleAsync(client *c, double d) {
addReplyDoubleCore(c, d, true); addReplyDoubleCore(c, d, true);
} }
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync);
/* Add a long double as a bulk reply, but uses a human readable formatting /* Add a long double as a bulk reply, but uses a human readable formatting
* of the double instead of exposing the crude behavior of doubles to the * of the double instead of exposing the crude behavior of doubles to the
* dear user. */ * dear user. */
void addReplyHumanLongDouble(client *c, long double d) { void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) {
if (c->resp == 2) { if (c->resp == 2) {
robj *o = createStringObjectFromLongDouble(d,1); robj *o = createStringObjectFromLongDouble(d,1);
addReplyBulk(c,o); addReplyBulkCore(c,o,fAsync);
decrRefCount(o); decrRefCount(o);
} else { } else {
char buf[MAX_LONG_DOUBLE_CHARS]; char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
addReplyProto(c,",",1); addReplyProtoCore(c,",",1,fAsync);
addReplyProto(c,buf,len); addReplyProtoCore(c,buf,len,fAsync);
addReplyProto(c,"\r\n",2); addReplyProtoCore(c,"\r\n",2,fAsync);
} }
} }
void addReplyHumanLongDouble(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, false);
}
void addReplyHumanLongDoubleAsync(client *c, long double d) {
addReplyHumanLongDoubleCore(c, d, true);
}
/* Add a long long as integer reply or bulk len / multi bulk count. /* Add a long long as integer reply or bulk len / multi bulk count.
* Basically this is used to output <prefix><long long><crlf>. */ * Basically this is used to output <prefix><long long><crlf>. */
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
@ -912,6 +922,10 @@ void addReplyBulkCString(client *c, const char *s) {
addReplyBulkCStringCore(c, s, false); addReplyBulkCStringCore(c, s, false);
} }
void addReplyBulkCStringAsync(client *c, const char *s) {
addReplyBulkCStringCore(c, s, true);
}
/* Add a long long as a bulk reply */ /* Add a long long as a bulk reply */
void addReplyBulkLongLong(client *c, long long ll) { void addReplyBulkLongLong(client *c, long long ll) {
char buf[64]; char buf[64];
@ -930,9 +944,9 @@ void addReplyBulkLongLong(client *c, long long ll) {
* three first characters of the extension are used, and if the * three first characters of the extension are used, and if the
* provided one is shorter than that, the remaining is filled with * provided one is shorter than that, the remaining is filled with
* spaces. */ * spaces. */
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) {
if (c->resp == 2) { if (c->resp == 2) {
addReplyBulkCBuffer(c,s,len); addReplyBulkCBufferCore(c,s,len,fAsync);
} else { } else {
char buf[32]; char buf[32];
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
@ -944,12 +958,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
p[i] = *ext++; p[i] = *ext++;
} }
} }
addReplyProto(c,buf,preflen); addReplyProtoCore(c,buf,preflen,fAsync);
addReplyProto(c,s,len); addReplyProtoCore(c,s,len,fAsync);
addReplyProto(c,"\r\n",2); addReplyProtoCore(c,"\r\n",2,fAsync);
} }
} }
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, false);
}
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) {
addReplyVerbatimCore(c, s, len, ext, true);
}
/* Add an array of C strings as status replies with a heading. /* Add an array of C strings as status replies with a heading.
* This function is typically invoked by from commands that support * This function is typically invoked by from commands that support
* subcommands in response to the 'help' subcommand. The help array * subcommands in response to the 'help' subcommand. The help array

View File

@ -2121,10 +2121,12 @@ void addReplyNullArray(client *c);
void addReplyNullArrayAsync(client *c); void addReplyNullArrayAsync(client *c);
void addReplyBool(client *c, int b); void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len); void addReplyProto(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj_roptr obj); void addReplyBulk(client *c, robj_roptr obj);
void AddReplyFromClient(client *c, client *src); void AddReplyFromClient(client *c, client *src);
void addReplyBulkCString(client *c, const char *s); void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCStringAsync(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll); void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj_roptr obj); void addReply(client *c, robj_roptr obj);
@ -2134,6 +2136,7 @@ void addReplyError(client *c, const char *err);
void addReplyStatus(client *c, const char *status); void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d); void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d); void addReplyHumanLongDouble(client *c, long double d);
void addReplyHumanLongDoubleAsync(client *c, long double d);
void addReplyLongLong(client *c, long long ll); void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus #ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);