Merge branch 'unstable' into keydbpro
Former-commit-id: fec8209c1996112976d927d65dbc2b4492131681
This commit is contained in:
commit
3aa271fe38
@ -189,9 +189,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
|||||||
* Returns the linked value object if the key exists or NULL if the key
|
* Returns the linked value object if the key exists or NULL if the key
|
||||||
* does not exist in the specified DB. */
|
* does not exist in the specified DB. */
|
||||||
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
|
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
|
||||||
|
expireIfNeeded(db,key);
|
||||||
robj *o = lookupKey(db,key,flags|LOOKUP_UPDATEMVCC);
|
robj *o = lookupKey(db,key,flags|LOOKUP_UPDATEMVCC);
|
||||||
if (expireIfNeeded(db,key))
|
|
||||||
o = nullptr;
|
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -736,8 +736,26 @@ NULL
|
|||||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) {
|
||||||
stringmatchlen_fuzz_test();
|
stringmatchlen_fuzz_test();
|
||||||
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
||||||
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) {
|
||||||
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
|
c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY;
|
||||||
|
if (!strcasecmp(szFromObj(c->argv[2]), "yes"))
|
||||||
|
{
|
||||||
|
redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL);
|
||||||
|
mi->master = c;
|
||||||
|
listAddNodeHead(g_pserver->masters, mi);
|
||||||
|
}
|
||||||
|
else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset
|
||||||
|
{
|
||||||
|
serverAssert(c->flags & CLIENT_MASTER);
|
||||||
|
if (listLength(g_pserver->masters))
|
||||||
|
{
|
||||||
|
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
|
||||||
|
serverAssert(mi->master == c);
|
||||||
|
listDelNode(g_pserver->masters, listFirst(g_pserver->masters));
|
||||||
|
zfree(mi);
|
||||||
|
}
|
||||||
|
c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY);
|
||||||
|
}
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
#ifdef USE_JEMALLOC
|
#ifdef USE_JEMALLOC
|
||||||
} else if(!strcasecmp(szFromObj(c->argv[1]),"mallctl") && c->argc >= 3) {
|
} else if(!strcasecmp(szFromObj(c->argv[1]),"mallctl") && c->argc >= 3) {
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
#include "rdb.h"
|
#include "rdb.h"
|
||||||
|
#include "aelocker.h"
|
||||||
#include <dlfcn.h>
|
#include <dlfcn.h>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) {
|
|||||||
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
|
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyLongLong(c,ll);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyLongLongAsync(c,ll);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
|
|||||||
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
|
int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyProto(c,prefix,strlen(prefix));
|
AeLocker locker;
|
||||||
addReplyProto(c,msg,strlen(msg));
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
addReplyProto(c,"\r\n",2);
|
locker.arm(c);
|
||||||
|
addReplyProtoAsync(c,prefix,strlen(prefix));
|
||||||
|
addReplyProtoAsync(c,msg,strlen(msg));
|
||||||
|
addReplyProtoAsync(c,"\r\n",2);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
|
|||||||
* The function always returns REDISMODULE_OK. */
|
* The function always returns REDISMODULE_OK. */
|
||||||
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
|
int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
|
AeLocker locker;
|
||||||
|
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
|
if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
|
||||||
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
|
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
|
||||||
(ctx->postponed_arrays_count+1), MALLOC_LOCAL);
|
(ctx->postponed_arrays_count+1), MALLOC_LOCAL);
|
||||||
ctx->postponed_arrays[ctx->postponed_arrays_count] =
|
ctx->postponed_arrays[ctx->postponed_arrays_count] =
|
||||||
addReplyDeferredLen(c);
|
addReplyDeferredLenAsync(c);
|
||||||
ctx->postponed_arrays_count++;
|
ctx->postponed_arrays_count++;
|
||||||
} else {
|
} else {
|
||||||
addReplyArrayLen(c,len);
|
addReplyArrayLenAsync(c,len);
|
||||||
}
|
}
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
|
|||||||
int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
|
int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyNullArray(c);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyNullArrayAsync(c);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
|
|||||||
int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
|
int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReply(c,shared.emptyarray);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyAsync(c,shared.emptyarray);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
|
|||||||
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
|
void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return;
|
if (c == NULL) return;
|
||||||
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
if (ctx->postponed_arrays_count == 0) {
|
if (ctx->postponed_arrays_count == 0) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"API misuse detected in module %s: "
|
"API misuse detected in module %s: "
|
||||||
@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ctx->postponed_arrays_count--;
|
ctx->postponed_arrays_count--;
|
||||||
setDeferredArrayLen(c,
|
setDeferredArrayLenAsync(c,
|
||||||
ctx->postponed_arrays[ctx->postponed_arrays_count],
|
ctx->postponed_arrays[ctx->postponed_arrays_count],
|
||||||
len);
|
len);
|
||||||
if (ctx->postponed_arrays_count == 0) {
|
if (ctx->postponed_arrays_count == 0) {
|
||||||
@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
|
|||||||
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyBulkCBuffer(c,(char*)buf,len);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyBulkCBufferAsync(c,(char*)buf,len);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
|||||||
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
|
int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyBulkCString(c,(char*)buf);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyBulkCStringAsync(c,(char*)buf);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
|
|||||||
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyBulk(c,str);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyBulkAsync(c,str);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|||||||
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
|
int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReply(c,shared.emptybulk);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyAsync(c,shared.emptybulk);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
|
|||||||
int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyVerbatim(c, buf, len, "txt");
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyVerbatimAsync(c, buf, len, "txt");
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
|
|||||||
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
|
int RM_ReplyWithNull(RedisModuleCtx *ctx) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyNull(c);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyNullAsync(c);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
|
|||||||
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
|
int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
sds proto = sdsnewlen(reply->proto, reply->protolen);
|
sds proto = sdsnewlen(reply->proto, reply->protolen);
|
||||||
addReplySds(c,proto);
|
addReplySdsAsync(c,proto);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
|
|||||||
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
|
int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyDouble(c,d);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyDoubleAsync(c,d);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
|
|||||||
int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
|
int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
|
||||||
client *c = moduleGetReplyClient(ctx);
|
client *c = moduleGetReplyClient(ctx);
|
||||||
if (c == NULL) return REDISMODULE_OK;
|
if (c == NULL) return REDISMODULE_OK;
|
||||||
addReplyHumanLongDouble(c, ld);
|
AeLocker locker;
|
||||||
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
|
locker.arm(c);
|
||||||
|
addReplyHumanLongDoubleAsync(c, ld);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -663,23 +663,33 @@ void addReplyDoubleAsync(client *c, double d) {
|
|||||||
addReplyDoubleCore(c, d, true);
|
addReplyDoubleCore(c, d, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync);
|
||||||
|
|
||||||
/* Add a long double as a bulk reply, but uses a human readable formatting
|
/* Add a long double as a bulk reply, but uses a human readable formatting
|
||||||
* of the double instead of exposing the crude behavior of doubles to the
|
* of the double instead of exposing the crude behavior of doubles to the
|
||||||
* dear user. */
|
* dear user. */
|
||||||
void addReplyHumanLongDouble(client *c, long double d) {
|
void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) {
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
robj *o = createStringObjectFromLongDouble(d,1);
|
robj *o = createStringObjectFromLongDouble(d,1);
|
||||||
addReplyBulk(c,o);
|
addReplyBulkCore(c,o,fAsync);
|
||||||
decrRefCount(o);
|
decrRefCount(o);
|
||||||
} else {
|
} else {
|
||||||
char buf[MAX_LONG_DOUBLE_CHARS];
|
char buf[MAX_LONG_DOUBLE_CHARS];
|
||||||
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
|
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
|
||||||
addReplyProto(c,",",1);
|
addReplyProtoCore(c,",",1,fAsync);
|
||||||
addReplyProto(c,buf,len);
|
addReplyProtoCore(c,buf,len,fAsync);
|
||||||
addReplyProto(c,"\r\n",2);
|
addReplyProtoCore(c,"\r\n",2,fAsync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyHumanLongDouble(client *c, long double d) {
|
||||||
|
addReplyHumanLongDoubleCore(c, d, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyHumanLongDoubleAsync(client *c, long double d) {
|
||||||
|
addReplyHumanLongDoubleCore(c, d, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add a long long as integer reply or bulk len / multi bulk count.
|
/* Add a long long as integer reply or bulk len / multi bulk count.
|
||||||
* Basically this is used to output <prefix><long long><crlf>. */
|
* Basically this is used to output <prefix><long long><crlf>. */
|
||||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
|
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
|
||||||
@ -914,6 +924,10 @@ void addReplyBulkCString(client *c, const char *s) {
|
|||||||
addReplyBulkCStringCore(c, s, false);
|
addReplyBulkCStringCore(c, s, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyBulkCStringAsync(client *c, const char *s) {
|
||||||
|
addReplyBulkCStringCore(c, s, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add a long long as a bulk reply */
|
/* Add a long long as a bulk reply */
|
||||||
void addReplyBulkLongLong(client *c, long long ll) {
|
void addReplyBulkLongLong(client *c, long long ll) {
|
||||||
char buf[64];
|
char buf[64];
|
||||||
@ -932,9 +946,9 @@ void addReplyBulkLongLong(client *c, long long ll) {
|
|||||||
* three first characters of the extension are used, and if the
|
* three first characters of the extension are used, and if the
|
||||||
* provided one is shorter than that, the remaining is filled with
|
* provided one is shorter than that, the remaining is filled with
|
||||||
* spaces. */
|
* spaces. */
|
||||||
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) {
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyBulkCBuffer(c,s,len);
|
addReplyBulkCBufferCore(c,s,len,fAsync);
|
||||||
} else {
|
} else {
|
||||||
char buf[32];
|
char buf[32];
|
||||||
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
|
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
|
||||||
@ -946,12 +960,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
|||||||
p[i] = *ext++;
|
p[i] = *ext++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addReplyProto(c,buf,preflen);
|
addReplyProtoCore(c,buf,preflen,fAsync);
|
||||||
addReplyProto(c,s,len);
|
addReplyProtoCore(c,s,len,fAsync);
|
||||||
addReplyProto(c,"\r\n",2);
|
addReplyProtoCore(c,"\r\n",2,fAsync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
||||||
|
addReplyVerbatimCore(c, s, len, ext, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) {
|
||||||
|
addReplyVerbatimCore(c, s, len, ext, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add an array of C strings as status replies with a heading.
|
/* Add an array of C strings as status replies with a heading.
|
||||||
* This function is typically invoked by from commands that support
|
* This function is typically invoked by from commands that support
|
||||||
* subcommands in response to the 'help' subcommand. The help array
|
* subcommands in response to the 'help' subcommand. The help array
|
||||||
|
@ -2528,10 +2528,12 @@ void addReplyNullArray(client *c);
|
|||||||
void addReplyNullArrayAsync(client *c);
|
void addReplyNullArrayAsync(client *c);
|
||||||
void addReplyBool(client *c, int b);
|
void addReplyBool(client *c, int b);
|
||||||
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
|
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
|
||||||
|
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext);
|
||||||
void addReplyProto(client *c, const char *s, size_t len);
|
void addReplyProto(client *c, const char *s, size_t len);
|
||||||
void addReplyBulk(client *c, robj_roptr obj);
|
void addReplyBulk(client *c, robj_roptr obj);
|
||||||
void AddReplyFromClient(client *c, client *src);
|
void AddReplyFromClient(client *c, client *src);
|
||||||
void addReplyBulkCString(client *c, const char *s);
|
void addReplyBulkCString(client *c, const char *s);
|
||||||
|
void addReplyBulkCStringAsync(client *c, const char *s);
|
||||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
||||||
void addReplyBulkLongLong(client *c, long long ll);
|
void addReplyBulkLongLong(client *c, long long ll);
|
||||||
void addReply(client *c, robj_roptr obj);
|
void addReply(client *c, robj_roptr obj);
|
||||||
@ -2541,6 +2543,7 @@ void addReplyError(client *c, const char *err);
|
|||||||
void addReplyStatus(client *c, const char *status);
|
void addReplyStatus(client *c, const char *status);
|
||||||
void addReplyDouble(client *c, double d);
|
void addReplyDouble(client *c, double d);
|
||||||
void addReplyHumanLongDouble(client *c, long double d);
|
void addReplyHumanLongDouble(client *c, long double d);
|
||||||
|
void addReplyHumanLongDoubleAsync(client *c, long double d);
|
||||||
void addReplyLongLong(client *c, long long ll);
|
void addReplyLongLong(client *c, long long ll);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
||||||
|
@ -37,6 +37,7 @@ set ::all_tests {
|
|||||||
unit/acl
|
unit/acl
|
||||||
unit/rreplay
|
unit/rreplay
|
||||||
unit/cron
|
unit/cron
|
||||||
|
unit/replication
|
||||||
integration/block-repl
|
integration/block-repl
|
||||||
integration/replication
|
integration/replication
|
||||||
integration/replication-2
|
integration/replication-2
|
||||||
|
0
tests/unit/replication.tcl
Normal file
0
tests/unit/replication.tcl
Normal file
@ -1,7 +1,7 @@
|
|||||||
start_server {tags {"rreplay"}} {
|
start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
||||||
|
|
||||||
test {RREPLAY use current db} {
|
test {RREPLAY use current db} {
|
||||||
r debug force-master
|
r debug force-master flagonly
|
||||||
r select 4
|
r select 4
|
||||||
r set dbnum invalid
|
r set dbnum invalid
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
||||||
@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} {
|
|||||||
reconnect
|
reconnect
|
||||||
|
|
||||||
test {RREPLAY db different} {
|
test {RREPLAY db different} {
|
||||||
r debug force-master
|
r debug force-master flagonly
|
||||||
r select 4
|
r select 4
|
||||||
r set testkey four
|
r set testkey four
|
||||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user