Remove addReply*Async methods since we already know if its async or not. This is just a source of bugs
Former-commit-id: df22cdf6e91a1b9c390b69c4209c719ecf1e44f1
This commit is contained in:
parent
7a9fbad132
commit
19a4d2d358
@ -188,9 +188,9 @@ void replyToBlockedClientTimedOut(client *c) {
|
|||||||
if (c->btype == BLOCKED_LIST ||
|
if (c->btype == BLOCKED_LIST ||
|
||||||
c->btype == BLOCKED_ZSET ||
|
c->btype == BLOCKED_ZSET ||
|
||||||
c->btype == BLOCKED_STREAM) {
|
c->btype == BLOCKED_STREAM) {
|
||||||
addReplyNullArrayAsync(c);
|
addReplyNullArray(c);
|
||||||
} else if (c->btype == BLOCKED_WAIT) {
|
} else if (c->btype == BLOCKED_WAIT) {
|
||||||
addReplyLongLongAsync(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
||||||
} else if (c->btype == BLOCKED_MODULE) {
|
} else if (c->btype == BLOCKED_MODULE) {
|
||||||
moduleBlockedClientTimedOut(c);
|
moduleBlockedClientTimedOut(c);
|
||||||
} else {
|
} else {
|
||||||
@ -216,7 +216,7 @@ void disconnectAllBlockedClients(void) {
|
|||||||
|
|
||||||
fastlock_lock(&c->lock);
|
fastlock_lock(&c->lock);
|
||||||
if (c->flags & CLIENT_BLOCKED) {
|
if (c->flags & CLIENT_BLOCKED) {
|
||||||
addReplySdsAsync(c,sdsnew(
|
addReplySds(c,sdsnew(
|
||||||
"-UNBLOCKED force unblock from blocking operation, "
|
"-UNBLOCKED force unblock from blocking operation, "
|
||||||
"instance state changed (master -> replica?)\r\n"));
|
"instance state changed (master -> replica?)\r\n"));
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
@ -373,7 +373,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
/* If the group was not found, send an error
|
/* If the group was not found, send an error
|
||||||
* to the consumer. */
|
* to the consumer. */
|
||||||
if (!group) {
|
if (!group) {
|
||||||
addReplyErrorAsync(receiver,
|
addReplyError(receiver,
|
||||||
"-NOGROUP the consumer group this client "
|
"-NOGROUP the consumer group this client "
|
||||||
"was blocked on no longer exists");
|
"was blocked on no longer exists");
|
||||||
unblockClient(receiver);
|
unblockClient(receiver);
|
||||||
@ -404,12 +404,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
* extracted from it. Wrapped in a single-item
|
* extracted from it. Wrapped in a single-item
|
||||||
* array, since we have just one key. */
|
* array, since we have just one key. */
|
||||||
if (receiver->resp == 2) {
|
if (receiver->resp == 2) {
|
||||||
addReplyArrayLenAsync(receiver,1);
|
addReplyArrayLen(receiver,1);
|
||||||
addReplyArrayLenAsync(receiver,2);
|
addReplyArrayLen(receiver,2);
|
||||||
} else {
|
} else {
|
||||||
addReplyMapLenAsync(receiver,1);
|
addReplyMapLen(receiver,1);
|
||||||
}
|
}
|
||||||
addReplyBulkAsync(receiver,rl->key);
|
addReplyBulk(receiver,rl->key);
|
||||||
|
|
||||||
streamPropInfo pi = {
|
streamPropInfo pi = {
|
||||||
rl->key,
|
rl->key,
|
||||||
|
@ -1358,7 +1358,7 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyLongLongAsync(c,ll);
|
addReplyLongLong(c,ll);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1371,9 +1371,9 @@ int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyProtoAsync(c,prefix,strlen(prefix));
|
addReplyProto(c,prefix,strlen(prefix));
|
||||||
addReplyProtoAsync(c,msg,strlen(msg));
|
addReplyProto(c,msg,strlen(msg));
|
||||||
addReplyProtoAsync(c,"\r\n",2);
|
addReplyProto(c,"\r\n",2);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1426,10 +1426,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
|
|||||||
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
|
ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)*
|
||||||
(ctx->postponed_arrays_count+1), MALLOC_LOCAL);
|
(ctx->postponed_arrays_count+1), MALLOC_LOCAL);
|
||||||
ctx->postponed_arrays[ctx->postponed_arrays_count] =
|
ctx->postponed_arrays[ctx->postponed_arrays_count] =
|
||||||
addReplyDeferredLenAsync(c);
|
addReplyDeferredLen(c);
|
||||||
ctx->postponed_arrays_count++;
|
ctx->postponed_arrays_count++;
|
||||||
} else {
|
} else {
|
||||||
addReplyArrayLenAsync(c,len);
|
addReplyArrayLen(c,len);
|
||||||
}
|
}
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
@ -1444,7 +1444,7 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyNullArrayAsync(c);
|
addReplyNullArray(c);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1457,7 +1457,7 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyAsync(c,shared.emptyarray);
|
addReply(c,shared.emptyarray);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1502,7 +1502,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ctx->postponed_arrays_count--;
|
ctx->postponed_arrays_count--;
|
||||||
setDeferredArrayLenAsync(c,
|
setDeferredArrayLen(c,
|
||||||
ctx->postponed_arrays[ctx->postponed_arrays_count],
|
ctx->postponed_arrays[ctx->postponed_arrays_count],
|
||||||
len);
|
len);
|
||||||
if (ctx->postponed_arrays_count == 0) {
|
if (ctx->postponed_arrays_count == 0) {
|
||||||
@ -1520,7 +1520,7 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyBulkCBufferAsync(c,(char*)buf,len);
|
addReplyBulkCBuffer(c,(char*)buf,len);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1534,7 +1534,7 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyBulkCStringAsync(c,(char*)buf);
|
addReplyBulkCString(c,(char*)buf);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1547,7 +1547,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyBulkAsync(c,str);
|
addReplyBulk(c,str);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1560,7 +1560,7 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyAsync(c,shared.emptybulk);
|
addReply(c,shared.emptybulk);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1574,7 +1574,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len)
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyVerbatimAsync(c, buf, len, "txt");
|
addReplyVerbatim(c, buf, len, "txt");
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1587,7 +1587,7 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyNullAsync(c);
|
addReplyNull(c);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1604,7 +1604,7 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
|
|||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
sds proto = sdsnewlen(reply->proto, reply->protolen);
|
sds proto = sdsnewlen(reply->proto, reply->protolen);
|
||||||
addReplySdsAsync(c,proto);
|
addReplySds(c,proto);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1620,7 +1620,7 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyDoubleAsync(c,d);
|
addReplyDouble(c,d);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1638,7 +1638,7 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
|
|||||||
AeLocker locker;
|
AeLocker locker;
|
||||||
std::unique_lock<fastlock> lock(c->lock);
|
std::unique_lock<fastlock> lock(c->lock);
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
addReplyHumanLongDoubleAsync(c, ld);
|
addReplyHumanLongDouble(c, ld);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,8 +40,6 @@
|
|||||||
#include "aelocker.h"
|
#include "aelocker.h"
|
||||||
|
|
||||||
static void setProtocolError(const char *errstr, client *c);
|
static void setProtocolError(const char *errstr, client *c);
|
||||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
|
||||||
void addReplyBulkCStringCore(client *c, const char *s, bool fAsync);
|
|
||||||
|
|
||||||
/* Return the size consumed from the allocator, for the specified SDS string,
|
/* Return the size consumed from the allocator, for the specified SDS string,
|
||||||
* including internal fragmentation. This function is used in order to compute
|
* including internal fragmentation. This function is used in order to compute
|
||||||
@ -251,10 +249,10 @@ void clientInstallAsyncWriteHandler(client *c) {
|
|||||||
* Typically gets called every time a reply is built, before adding more
|
* Typically gets called every time a reply is built, before adding more
|
||||||
* data to the clients output buffers. If the function returns C_ERR no
|
* data to the clients output buffers. If the function returns C_ERR no
|
||||||
* data should be appended to the output buffers. */
|
* data should be appended to the output buffers. */
|
||||||
int prepareClientToWrite(client *c, bool fAsync) {
|
int prepareClientToWrite(client *c) {
|
||||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
serverAssert(FCorrectThread(c) || fAsync);
|
|
||||||
if (FCorrectThread(c)) {
|
if (!fAsync) {
|
||||||
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
||||||
} else {
|
} else {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
@ -290,10 +288,10 @@ int prepareClientToWrite(client *c, bool fAsync) {
|
|||||||
* Low level functions to add more data to output buffers.
|
* Low level functions to add more data to output buffers.
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
|
int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||||
if (c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
|
if (c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
|
||||||
|
|
||||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
bool fAsync = !FCorrectThread(c);
|
||||||
if (fAsync)
|
if (fAsync)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
@ -377,11 +375,12 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
|||||||
* Higher level functions to queue data on the client output buffer.
|
* Higher level functions to queue data on the client output buffer.
|
||||||
* The following functions are the ones that commands implementations will call.
|
* The following functions are the ones that commands implementations will call.
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
void addReplyCore(client *c, robj_roptr obj, bool fAsync) {
|
/* Add the object 'obj' string representation to the client output buffer. */
|
||||||
if (prepareClientToWrite(c, fAsync) != C_OK) return;
|
void addReply(client *c, robj_roptr obj) {
|
||||||
|
if (prepareClientToWrite(c) != C_OK) return;
|
||||||
|
|
||||||
if (sdsEncodedObject(obj)) {
|
if (sdsEncodedObject(obj)) {
|
||||||
if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)),fAsync) != C_OK)
|
if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))) != C_OK)
|
||||||
_addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)));
|
_addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)));
|
||||||
} else if (obj->encoding == OBJ_ENCODING_INT) {
|
} else if (obj->encoding == OBJ_ENCODING_INT) {
|
||||||
/* For integer encoded strings we just convert it into a string
|
/* For integer encoded strings we just convert it into a string
|
||||||
@ -389,44 +388,26 @@ void addReplyCore(client *c, robj_roptr obj, bool fAsync) {
|
|||||||
* to the output buffer. */
|
* to the output buffer. */
|
||||||
char buf[32];
|
char buf[32];
|
||||||
size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj));
|
size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj));
|
||||||
if (_addReplyToBuffer(c,buf,len,fAsync) != C_OK)
|
if (_addReplyToBuffer(c,buf,len) != C_OK)
|
||||||
_addReplyProtoToList(c,buf,len);
|
_addReplyProtoToList(c,buf,len);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Wrong obj->encoding in addReply()");
|
serverPanic("Wrong obj->encoding in addReply()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add the object 'obj' string representation to the client output buffer. */
|
|
||||||
void addReply(client *c, robj_roptr obj)
|
|
||||||
{
|
|
||||||
addReplyCore(c, obj, false);
|
|
||||||
}
|
|
||||||
void addReplyAsync(client *c, robj_roptr obj)
|
|
||||||
{
|
|
||||||
addReplyCore(c, obj, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add the SDS 's' string to the client output buffer, as a side effect
|
/* Add the SDS 's' string to the client output buffer, as a side effect
|
||||||
* the SDS string is freed. */
|
* the SDS string is freed. */
|
||||||
void addReplySdsCore(client *c, sds s, bool fAsync) {
|
void addReplySds(client *c, sds s) {
|
||||||
if (prepareClientToWrite(c, fAsync) != C_OK) {
|
if (prepareClientToWrite(c) != C_OK) {
|
||||||
/* The caller expects the sds to be free'd. */
|
/* The caller expects the sds to be free'd. */
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (_addReplyToBuffer(c,s,sdslen(s), fAsync) != C_OK)
|
if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
|
||||||
_addReplyProtoToList(c,s,sdslen(s));
|
_addReplyProtoToList(c,s,sdslen(s));
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplySds(client *c, sds s) {
|
|
||||||
addReplySdsCore(c, s, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplySdsAsync(client *c, sds s) {
|
|
||||||
addReplySdsCore(c, s, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This low level function just adds whatever protocol you send it to the
|
/* This low level function just adds whatever protocol you send it to the
|
||||||
* client buffer, trying the static buffer initially, and using the string
|
* client buffer, trying the static buffer initially, and using the string
|
||||||
* of objects if not possible.
|
* of objects if not possible.
|
||||||
@ -435,18 +416,10 @@ void addReplySdsAsync(client *c, sds s) {
|
|||||||
* if not needed. The object will only be created by calling
|
* if not needed. The object will only be created by calling
|
||||||
* _addReplyProtoToList() if we fail to extend the existing tail object
|
* _addReplyProtoToList() if we fail to extend the existing tail object
|
||||||
* in the list of objects. */
|
* in the list of objects. */
|
||||||
void addReplyProtoCore(client *c, const char *s, size_t len, bool fAsync) {
|
|
||||||
if (prepareClientToWrite(c, fAsync) != C_OK) return;
|
|
||||||
if (_addReplyToBuffer(c,s,len,fAsync) != C_OK)
|
|
||||||
_addReplyProtoToList(c,s,len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyProto(client *c, const char *s, size_t len) {
|
void addReplyProto(client *c, const char *s, size_t len) {
|
||||||
addReplyProtoCore(c, s, len, false);
|
if (prepareClientToWrite(c) != C_OK) return;
|
||||||
}
|
if (_addReplyToBuffer(c,s,len) != C_OK)
|
||||||
|
_addReplyProtoToList(c,s,len);
|
||||||
void addReplyProtoAsync(client *c, const char *s, size_t len) {
|
|
||||||
addReplyProtoCore(c, s, len, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string escapeString(sds str)
|
std::string escapeString(sds str)
|
||||||
@ -486,12 +459,12 @@ std::string escapeString(sds str)
|
|||||||
* code provided is used, otherwise the string "-ERR " for the generic
|
* code provided is used, otherwise the string "-ERR " for the generic
|
||||||
* error code is automatically added.
|
* error code is automatically added.
|
||||||
* Note that 's' must NOT end with \r\n. */
|
* Note that 's' must NOT end with \r\n. */
|
||||||
void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) {
|
void addReplyErrorLength(client *c, const char *s, size_t len) {
|
||||||
/* If the string already starts with "-..." then the error code
|
/* If the string already starts with "-..." then the error code
|
||||||
* is provided by the caller. Otherwise we use "-ERR". */
|
* is provided by the caller. Otherwise we use "-ERR". */
|
||||||
if (!len || s[0] != '-') addReplyProtoCore(c,"-ERR ",5,fAsync);
|
if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
|
||||||
addReplyProtoCore(c,s,len,fAsync);
|
addReplyProto(c,s,len);
|
||||||
addReplyProtoCore(c,"\r\n",2,fAsync);
|
addReplyProto(c,"\r\n",2);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
||||||
@ -535,11 +508,6 @@ void afterErrorReply(client *c, const char *s, size_t len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyErrorLength(client *c, const char *s, size_t len)
|
|
||||||
{
|
|
||||||
addReplyErrorLengthCore(c, s, len, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
|
/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
|
||||||
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
|
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
|
||||||
void addReplyErrorObject(client *c, robj *err) {
|
void addReplyErrorObject(client *c, robj *err) {
|
||||||
@ -547,13 +515,8 @@ void addReplyErrorObject(client *c, robj *err) {
|
|||||||
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */
|
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */
|
||||||
}
|
}
|
||||||
|
|
||||||
/* See addReplyErrorLength for expectations from the input string. */
|
|
||||||
void addReplyError(client *c, const char *err) {
|
void addReplyError(client *c, const char *err) {
|
||||||
addReplyErrorLengthCore(c,err,strlen(err), false);
|
addReplyErrorLength(c, err, strlen(err));
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyErrorAsync(client *c, const char *err) {
|
|
||||||
addReplyErrorLengthCore(c, err, strlen(err), true);
|
|
||||||
afterErrorReply(c,err,strlen(err));
|
afterErrorReply(c,err,strlen(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -629,19 +592,19 @@ void trimReplyUnusedTailSpace(client *c) {
|
|||||||
|
|
||||||
/* Adds an empty object to the reply list that will contain the multi bulk
|
/* Adds an empty object to the reply list that will contain the multi bulk
|
||||||
* length, which is not known when this function is called. */
|
* length, which is not known when this function is called. */
|
||||||
void *addReplyDeferredLen(client *c) {
|
void *addReplyDeferredLenCore(client *c) {
|
||||||
/* Note that we install the write event here even if the object is not
|
/* Note that we install the write event here even if the object is not
|
||||||
* ready to be sent, since we are sure that before returning to the
|
* ready to be sent, since we are sure that before returning to the
|
||||||
* event loop setDeferredAggregateLen() will be called. */
|
* event loop setDeferredAggregateLen() will be called. */
|
||||||
if (prepareClientToWrite(c, false) != C_OK) return NULL;
|
if (prepareClientToWrite(c) != C_OK) return NULL;
|
||||||
trimReplyUnusedTailSpace(c);
|
trimReplyUnusedTailSpace(c);
|
||||||
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
|
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
|
||||||
return listLast(c->reply);
|
return listLast(c->reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *addReplyDeferredLenAsync(client *c) {
|
void *addReplyDeferredLen(client *c) {
|
||||||
if (FCorrectThread(c))
|
if (FCorrectThread(c))
|
||||||
return addReplyDeferredLen(c);
|
return addReplyDeferredLenCore(c);
|
||||||
|
|
||||||
return (void*)((ssize_t)(c->replyAsync ? c->replyAsync->used : 0));
|
return (void*)((ssize_t)(c->replyAsync ? c->replyAsync->used : 0));
|
||||||
}
|
}
|
||||||
@ -718,11 +681,10 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi
|
|||||||
}
|
}
|
||||||
|
|
||||||
void setDeferredArrayLen(client *c, void *node, long length) {
|
void setDeferredArrayLen(client *c, void *node, long length) {
|
||||||
setDeferredAggregateLen(c,node,length,'*');
|
if (FCorrectThread(c))
|
||||||
}
|
setDeferredAggregateLen(c,node,length,'*');
|
||||||
|
else
|
||||||
void setDeferredArrayLenAsync(client *c, void *node, long length) {
|
setDeferredAggregateLenAsync(c, node, length, '*');
|
||||||
setDeferredAggregateLenAsync(c, node, length, '*');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void setDeferredMapLen(client *c, void *node, long length) {
|
void setDeferredMapLen(client *c, void *node, long length) {
|
||||||
@ -748,15 +710,15 @@ void setDeferredPushLen(client *c, void *node, long length) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Add a double as a bulk reply */
|
/* Add a double as a bulk reply */
|
||||||
void addReplyDoubleCore(client *c, double d, bool fAsync) {
|
void addReplyDouble(client *c, double d) {
|
||||||
if (std::isinf(d)) {
|
if (std::isinf(d)) {
|
||||||
/* Libc in odd systems (Hi Solaris!) will format infinite in a
|
/* Libc in odd systems (Hi Solaris!) will format infinite in a
|
||||||
* different way, so better to handle it in an explicit way. */
|
* different way, so better to handle it in an explicit way. */
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyBulkCStringCore(c, d > 0 ? "inf" : "-inf", fAsync);
|
addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
|
||||||
} else {
|
} else {
|
||||||
addReplyProtoCore(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
|
addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
|
||||||
d > 0 ? 6 : 7, fAsync);
|
d > 0 ? 6 : 7);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
char dbuf[MAX_LONG_DOUBLE_CHARS+3],
|
char dbuf[MAX_LONG_DOUBLE_CHARS+3],
|
||||||
@ -765,52 +727,34 @@ void addReplyDoubleCore(client *c, double d, bool fAsync) {
|
|||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
|
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
|
||||||
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
|
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
|
||||||
addReplyProtoCore(c,sbuf,slen,fAsync);
|
addReplyProto(c,sbuf,slen);
|
||||||
} else {
|
} else {
|
||||||
dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
|
dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
|
||||||
addReplyProtoCore(c,dbuf,dlen,fAsync);
|
addReplyProto(c,dbuf,dlen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyDouble(client *c, double d) {
|
|
||||||
addReplyDoubleCore(c, d, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyDoubleAsync(client *c, double d) {
|
|
||||||
addReplyDoubleCore(c, d, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync);
|
|
||||||
|
|
||||||
/* Add a long double as a bulk reply, but uses a human readable formatting
|
/* Add a long double as a bulk reply, but uses a human readable formatting
|
||||||
* of the double instead of exposing the crude behavior of doubles to the
|
* of the double instead of exposing the crude behavior of doubles to the
|
||||||
* dear user. */
|
* dear user. */
|
||||||
void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) {
|
void addReplyHumanLongDouble(client *c, long double d) {
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
robj *o = createStringObjectFromLongDouble(d,1);
|
robj *o = createStringObjectFromLongDouble(d,1);
|
||||||
addReplyBulkCore(c,o,fAsync);
|
addReplyBulk(c,o);
|
||||||
decrRefCount(o);
|
decrRefCount(o);
|
||||||
} else {
|
} else {
|
||||||
char buf[MAX_LONG_DOUBLE_CHARS];
|
char buf[MAX_LONG_DOUBLE_CHARS];
|
||||||
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
|
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
|
||||||
addReplyProtoCore(c,",",1,fAsync);
|
addReplyProto(c,",",1);
|
||||||
addReplyProtoCore(c,buf,len,fAsync);
|
addReplyProto(c,buf,len);
|
||||||
addReplyProtoCore(c,"\r\n",2,fAsync);
|
addReplyProto(c,"\r\n",2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyHumanLongDouble(client *c, long double d) {
|
|
||||||
addReplyHumanLongDoubleCore(c, d, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyHumanLongDoubleAsync(client *c, long double d) {
|
|
||||||
addReplyHumanLongDoubleCore(c, d, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add a long long as integer reply or bulk len / multi bulk count.
|
/* Add a long long as integer reply or bulk len / multi bulk count.
|
||||||
* Basically this is used to output <prefix><long long><crlf>. */
|
* Basically this is used to output <prefix><long long><crlf>. */
|
||||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
|
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
@ -818,10 +762,10 @@ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool f
|
|||||||
* so we have a few shared objects to use if the integer is small
|
* so we have a few shared objects to use if the integer is small
|
||||||
* like it is most of the times. */
|
* like it is most of the times. */
|
||||||
if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
||||||
addReplyCore(c,shared.mbulkhdr[ll], fAsync);
|
addReply(c,shared.mbulkhdr[ll]);
|
||||||
return;
|
return;
|
||||||
} else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
} else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
||||||
addReplyCore(c,shared.bulkhdr[ll], fAsync);
|
addReply(c,shared.bulkhdr[ll]);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -829,65 +773,33 @@ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool f
|
|||||||
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
||||||
buf[len+1] = '\r';
|
buf[len+1] = '\r';
|
||||||
buf[len+2] = '\n';
|
buf[len+2] = '\n';
|
||||||
addReplyProtoCore(c,buf,len+3, fAsync);
|
addReplyProto(c,buf,len+3);
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
|
||||||
addReplyLongLongWithPrefixCore(c, ll, prefix, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyLongLongCore(client *c, long long ll, bool fAsync) {
|
|
||||||
if (ll == 0)
|
|
||||||
addReplyCore(c,shared.czero, fAsync);
|
|
||||||
else if (ll == 1)
|
|
||||||
addReplyCore(c,shared.cone, fAsync);
|
|
||||||
else
|
|
||||||
addReplyLongLongWithPrefixCore(c,ll,':', fAsync);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyLongLong(client *c, long long ll) {
|
void addReplyLongLong(client *c, long long ll) {
|
||||||
addReplyLongLongCore(c, ll, false);
|
if (ll == 0)
|
||||||
}
|
addReply(c,shared.czero);
|
||||||
|
else if (ll == 1)
|
||||||
void addReplyLongLongAsync(client *c, long long ll) {
|
addReply(c,shared.cone);
|
||||||
addReplyLongLongCore(c, ll, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) {
|
|
||||||
if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN)
|
|
||||||
addReplyCore(c,shared.mbulkhdr[length], fAsync);
|
|
||||||
else
|
else
|
||||||
addReplyLongLongWithPrefixCore(c,length,prefix, fAsync);
|
addReplyLongLongWithPrefix(c,ll,':');
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyAggregateLen(client *c, long length, int prefix) {
|
void addReplyAggregateLen(client *c, long length, int prefix) {
|
||||||
addReplyAggregateLenCore(c, length, prefix, false);
|
if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN)
|
||||||
}
|
addReply(c,shared.mbulkhdr[length]);
|
||||||
|
else
|
||||||
void addReplyArrayLenCore(client *c, long length, bool fAsync) {
|
addReplyLongLongWithPrefix(c,length,prefix);
|
||||||
addReplyAggregateLenCore(c,length,'*', fAsync);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyArrayLen(client *c, long length) {
|
void addReplyArrayLen(client *c, long length) {
|
||||||
addReplyArrayLenCore(c, length, false);
|
addReplyAggregateLen(c,length,'*');
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyArrayLenAsync(client *c, long length) {
|
|
||||||
addReplyArrayLenCore(c, length, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyMapLenCore(client *c, long length, bool fAsync) {
|
|
||||||
int prefix = c->resp == 2 ? '*' : '%';
|
|
||||||
if (c->resp == 2) length *= 2;
|
|
||||||
addReplyAggregateLenCore(c,length,prefix,fAsync);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyMapLen(client *c, long length) {
|
void addReplyMapLen(client *c, long length) {
|
||||||
addReplyMapLenCore(c, length, false);
|
int prefix = c->resp == 2 ? '*' : '%';
|
||||||
}
|
if (c->resp == 2) length *= 2;
|
||||||
|
addReplyAggregateLen(c,length,prefix);
|
||||||
void addReplyMapLenAsync(client *c, long length) {
|
|
||||||
addReplyMapLenCore(c, length, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplySetLen(client *c, long length) {
|
void addReplySetLen(client *c, long length) {
|
||||||
@ -901,38 +813,19 @@ void addReplyAttributeLen(client *c, long length) {
|
|||||||
addReplyAggregateLen(c,length,prefix);
|
addReplyAggregateLen(c,length,prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyPushLenCore(client *c, long length, bool fAsync) {
|
|
||||||
int prefix = c->resp == 2 ? '*' : '>';
|
|
||||||
addReplyAggregateLenCore(c,length,prefix, fAsync);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyPushLen(client *c, long length) {
|
void addReplyPushLen(client *c, long length) {
|
||||||
addReplyPushLenCore(c, length, false);
|
int prefix = c->resp == 2 ? '*' : '>';
|
||||||
|
addReplyAggregateLen(c,length,prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyPushLenAsync(client *c, long length) {
|
void addReplyNull(client *c) {
|
||||||
addReplyPushLenCore(c, length, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyNullCore(client *c, bool fAsync) {
|
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyProtoCore(c,"$-1\r\n",5,fAsync);
|
addReplyProto(c,"$-1\r\n",5);
|
||||||
} else {
|
} else {
|
||||||
addReplyProtoCore(c,"_\r\n",3,fAsync);
|
addReplyProto(c,"_\r\n",3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyNull(client *c, robj_roptr objOldProtocol) {
|
|
||||||
if (c->resp < 3 && objOldProtocol != nullptr)
|
|
||||||
addReply(c, objOldProtocol);
|
|
||||||
else
|
|
||||||
addReplyNullCore(c, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyNullAsync(client *c) {
|
|
||||||
addReplyNullCore(c, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBool(client *c, int b) {
|
void addReplyBool(client *c, int b) {
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReply(c, b ? shared.cone : shared.czero);
|
addReply(c, b ? shared.cone : shared.czero);
|
||||||
@ -945,107 +838,58 @@ void addReplyBool(client *c, int b) {
|
|||||||
* RESP2 had it, so API-wise we have this call, that will emit the correct
|
* RESP2 had it, so API-wise we have this call, that will emit the correct
|
||||||
* RESP2 protocol, however for RESP3 the reply will always be just the
|
* RESP2 protocol, however for RESP3 the reply will always be just the
|
||||||
* Null type "_\r\n". */
|
* Null type "_\r\n". */
|
||||||
void addReplyNullArrayCore(client *c, bool fAsync)
|
void addReplyNullArray(client *c)
|
||||||
{
|
{
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyProtoCore(c,"*-1\r\n",5,fAsync);
|
addReplyProto(c,"*-1\r\n",5);
|
||||||
} else {
|
} else {
|
||||||
addReplyProtoCore(c,"_\r\n",3,fAsync);
|
addReplyProto(c,"_\r\n",3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyNullArray(client *c)
|
|
||||||
{
|
|
||||||
addReplyNullArrayCore(c, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyNullArrayAsync(client *c)
|
|
||||||
{
|
|
||||||
addReplyNullArrayCore(c, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create the length prefix of a bulk reply, example: $2234 */
|
/* Create the length prefix of a bulk reply, example: $2234 */
|
||||||
void addReplyBulkLenCore(client *c, robj_roptr obj, bool fAsync) {
|
void addReplyBulkLen(client *c, robj_roptr obj) {
|
||||||
size_t len = stringObjectLen(obj);
|
size_t len = stringObjectLen(obj);
|
||||||
|
|
||||||
if (len < OBJ_SHARED_BULKHDR_LEN)
|
if (len < OBJ_SHARED_BULKHDR_LEN)
|
||||||
addReplyCore(c,shared.bulkhdr[len], fAsync);
|
addReply(c,shared.bulkhdr[len]);
|
||||||
else
|
else
|
||||||
addReplyLongLongWithPrefixCore(c,len,'$', fAsync);
|
addReplyLongLongWithPrefix(c,len,'$');
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkLen(client *c, robj *obj)
|
|
||||||
{
|
|
||||||
addReplyBulkLenCore(c, obj, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a Redis Object as a bulk reply */
|
/* Add a Redis Object as a bulk reply */
|
||||||
void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync) {
|
void addReplyBulk(client *c, robj_roptr obj) {
|
||||||
addReplyBulkLenCore(c,obj,fAsync);
|
addReplyBulkLen(c,obj);
|
||||||
addReplyCore(c,obj,fAsync);
|
addReply(c,obj);
|
||||||
addReplyCore(c,shared.crlf,fAsync);
|
addReply(c,shared.crlf);
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulk(client *c, robj_roptr obj)
|
|
||||||
{
|
|
||||||
addReplyBulkCore(c, obj, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkAsync(client *c, robj_roptr obj)
|
|
||||||
{
|
|
||||||
addReplyBulkCore(c, obj, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a C buffer as bulk reply */
|
/* Add a C buffer as bulk reply */
|
||||||
void addReplyBulkCBufferCore(client *c, const void *p, size_t len, bool fAsync) {
|
|
||||||
addReplyLongLongWithPrefixCore(c,len,'$',fAsync);
|
|
||||||
addReplyProtoCore(c,(const char*)p,len,fAsync);
|
|
||||||
addReplyCore(c,shared.crlf,fAsync);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
||||||
addReplyBulkCBufferCore(c, p, len, false);
|
addReplyLongLongWithPrefix(c,len,'$');
|
||||||
}
|
addReplyProto(c,(const char*)p,len);
|
||||||
|
addReply(c,shared.crlf);
|
||||||
void addReplyBulkCBufferAsync(client *c, const void *p, size_t len) {
|
|
||||||
addReplyBulkCBufferCore(c, p, len, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add sds to reply (takes ownership of sds and frees it) */
|
/* Add sds to reply (takes ownership of sds and frees it) */
|
||||||
void addReplyBulkSdsCore(client *c, sds s, bool fAsync) {
|
void addReplyBulkSds(client *c, sds s) {
|
||||||
addReplyLongLongWithPrefixCore(c,sdslen(s),'$', fAsync);
|
addReplyLongLongWithPrefix(c,sdslen(s),'$');
|
||||||
addReplySdsCore(c,s,fAsync);
|
addReplySds(c,s);
|
||||||
addReplyCore(c,shared.crlf,fAsync);
|
addReply(c,shared.crlf);
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkSds(client *c, sds s) {
|
|
||||||
addReplyBulkSdsCore(c, s, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkSdsAsync(client *c, sds s) {
|
|
||||||
addReplyBulkSdsCore(c, s, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a C null term string as bulk reply */
|
/* Add a C null term string as bulk reply */
|
||||||
void addReplyBulkCStringCore(client *c, const char *s, bool fAsync) {
|
void addReplyBulkCString(client *c, const char *s) {
|
||||||
if (s == NULL) {
|
if (s == NULL) {
|
||||||
if (c->resp < 3)
|
if (c->resp < 3)
|
||||||
addReplyCore(c,shared.nullbulk, fAsync);
|
addReply(c,shared.nullbulk);
|
||||||
else
|
else
|
||||||
addReplyNullCore(c,fAsync);
|
addReplyNull(c);
|
||||||
} else {
|
} else {
|
||||||
addReplyBulkCBufferCore(c,s,strlen(s),fAsync);
|
addReplyBulkCBuffer(c,s,strlen(s));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyBulkCString(client *c, const char *s) {
|
|
||||||
addReplyBulkCStringCore(c, s, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyBulkCStringAsync(client *c, const char *s) {
|
|
||||||
addReplyBulkCStringCore(c, s, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add a long long as a bulk reply */
|
/* Add a long long as a bulk reply */
|
||||||
void addReplyBulkLongLong(client *c, long long ll) {
|
void addReplyBulkLongLong(client *c, long long ll) {
|
||||||
char buf[64];
|
char buf[64];
|
||||||
@ -1064,9 +908,9 @@ void addReplyBulkLongLong(client *c, long long ll) {
|
|||||||
* three first characters of the extension are used, and if the
|
* three first characters of the extension are used, and if the
|
||||||
* provided one is shorter than that, the remaining is filled with
|
* provided one is shorter than that, the remaining is filled with
|
||||||
* spaces. */
|
* spaces. */
|
||||||
void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) {
|
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyBulkCBufferCore(c,s,len,fAsync);
|
addReplyBulkCBuffer(c,s,len);
|
||||||
} else {
|
} else {
|
||||||
char buf[32];
|
char buf[32];
|
||||||
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
|
size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
|
||||||
@ -1078,20 +922,12 @@ void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext,
|
|||||||
p[i] = *ext++;
|
p[i] = *ext++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addReplyProtoCore(c,buf,preflen,fAsync);
|
addReplyProto(c,buf,preflen);
|
||||||
addReplyProtoCore(c,s,len,fAsync);
|
addReplyProto(c,s,len);
|
||||||
addReplyProtoCore(c,"\r\n",2,fAsync);
|
addReplyProto(c,"\r\n",2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
|
||||||
addReplyVerbatimCore(c, s, len, ext, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) {
|
|
||||||
addReplyVerbatimCore(c, s, len, ext, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add an array of C strings as status replies with a heading.
|
/* Add an array of C strings as status replies with a heading.
|
||||||
* This function is typically invoked by from commands that support
|
* This function is typically invoked by from commands that support
|
||||||
* subcommands in response to the 'help' subcommand. The help array
|
* subcommands in response to the 'help' subcommand. The help array
|
||||||
@ -1127,7 +963,7 @@ void addReplySubcommandSyntaxError(client *c) {
|
|||||||
/* Append 'src' client output buffers into 'dst' client output buffers.
|
/* Append 'src' client output buffers into 'dst' client output buffers.
|
||||||
* This function clears the output buffers of 'src' */
|
* This function clears the output buffers of 'src' */
|
||||||
void AddReplyFromClient(client *dst, client *src) {
|
void AddReplyFromClient(client *dst, client *src) {
|
||||||
if (prepareClientToWrite(dst, false) != C_OK)
|
if (prepareClientToWrite(dst) != C_OK)
|
||||||
return;
|
return;
|
||||||
addReplyProto(dst,src->buf, src->bufpos);
|
addReplyProto(dst,src->buf, src->bufpos);
|
||||||
if (listLength(src->reply))
|
if (listLength(src->reply))
|
||||||
@ -1907,7 +1743,7 @@ void ProcessPendingAsyncWrites()
|
|||||||
|
|
||||||
if (FCorrectThread(c))
|
if (FCorrectThread(c))
|
||||||
{
|
{
|
||||||
prepareClientToWrite(c, false); // queue an event
|
prepareClientToWrite(c); // queue an event
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -2898,7 +2734,7 @@ NULL
|
|||||||
if (target && target->flags & CLIENT_BLOCKED) {
|
if (target && target->flags & CLIENT_BLOCKED) {
|
||||||
std::unique_lock<fastlock> ul(target->lock);
|
std::unique_lock<fastlock> ul(target->lock);
|
||||||
if (unblock_error)
|
if (unblock_error)
|
||||||
addReplyErrorAsync(target,
|
addReplyError(target,
|
||||||
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
||||||
else
|
else
|
||||||
replyToBlockedClientTimedOut(target);
|
replyToBlockedClientTimedOut(target);
|
||||||
|
@ -433,7 +433,7 @@ robj *resetRefCount(robj *obj) {
|
|||||||
|
|
||||||
int checkType(client *c, robj_roptr o, int type) {
|
int checkType(client *c, robj_roptr o, int type) {
|
||||||
if (o->type != type) {
|
if (o->type != type) {
|
||||||
addReplyAsync(c,shared.wrongtypeerr);
|
addReply(c,shared.wrongtypeerr);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -43,12 +43,12 @@ int clientSubscriptionsCount(client *c);
|
|||||||
* addReply*() API family. */
|
* addReply*() API family. */
|
||||||
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
||||||
if (c->resp == 2)
|
if (c->resp == 2)
|
||||||
addReplyAsync(c,shared.mbulkhdr[3]);
|
addReply(c,shared.mbulkhdr[3]);
|
||||||
else
|
else
|
||||||
addReplyPushLenAsync(c,3);
|
addReplyPushLen(c,3);
|
||||||
addReplyAsync(c,shared.messagebulk);
|
addReply(c,shared.messagebulk);
|
||||||
addReplyBulkAsync(c,channel);
|
addReplyBulk(c,channel);
|
||||||
if (msg) addReplyBulkAsync(c,msg);
|
if (msg) addReplyBulk(c,msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a pubsub message of type "pmessage" to the client. The difference
|
/* Send a pubsub message of type "pmessage" to the client. The difference
|
||||||
@ -56,13 +56,13 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
|||||||
* this message format also includes the pattern that matched the message. */
|
* this message format also includes the pattern that matched the message. */
|
||||||
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
||||||
if (c->resp == 2)
|
if (c->resp == 2)
|
||||||
addReplyAsync(c,shared.mbulkhdr[4]);
|
addReply(c,shared.mbulkhdr[4]);
|
||||||
else
|
else
|
||||||
addReplyPushLenAsync(c,4);
|
addReplyPushLen(c,4);
|
||||||
addReplyAsync(c,shared.pmessagebulk);
|
addReply(c,shared.pmessagebulk);
|
||||||
addReplyBulkAsync(c,pat);
|
addReplyBulk(c,pat);
|
||||||
addReplyBulkAsync(c,channel);
|
addReplyBulk(c,channel);
|
||||||
addReplyBulkAsync(c,msg);
|
addReplyBulk(c,msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send the pubsub subscription notification to the client. */
|
/* Send the pubsub subscription notification to the client. */
|
||||||
|
@ -315,7 +315,7 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
|
|||||||
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
|
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
|
||||||
|
|
||||||
/* Send it to slaves */
|
/* Send it to slaves */
|
||||||
addReplyAsync(replica,selectcmd);
|
addReply(replica,selectcmd);
|
||||||
|
|
||||||
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
|
||||||
decrRefCount(selectcmd);
|
decrRefCount(selectcmd);
|
||||||
@ -329,18 +329,18 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
|
|||||||
if (fSendRaw)
|
if (fSendRaw)
|
||||||
{
|
{
|
||||||
/* Add the multi bulk length. */
|
/* Add the multi bulk length. */
|
||||||
addReplyArrayLenAsync(replica,argc);
|
addReplyArrayLen(replica,argc);
|
||||||
|
|
||||||
/* Finally any additional argument that was not stored inside the
|
/* Finally any additional argument that was not stored inside the
|
||||||
* static buffer if any (from j to argc). */
|
* static buffer if any (from j to argc). */
|
||||||
for (int j = 0; j < argc; j++)
|
for (int j = 0; j < argc; j++)
|
||||||
addReplyBulkAsync(replica,argv[j]);
|
addReplyBulk(replica,argv[j]);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
struct redisCommand *cmd = lookupCommand(szFromObj(argv[0]));
|
struct redisCommand *cmd = lookupCommand(szFromObj(argv[0]));
|
||||||
sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc);
|
sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc);
|
||||||
addReplyProtoAsync(replica, buf, sdslen(buf));
|
addReplyProto(replica, buf, sdslen(buf));
|
||||||
sdsfree(buf);
|
sdsfree(buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -516,21 +516,21 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
* or are already in sync with the master. */
|
* or are already in sync with the master. */
|
||||||
|
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
addReplyProtoAsync(replica, proto, cchProto);
|
addReplyProto(replica, proto, cchProto);
|
||||||
|
|
||||||
addReplyProtoAsync(replica,fake->buf,fake->bufpos);
|
addReplyProto(replica,fake->buf,fake->bufpos);
|
||||||
listRewind(fake->reply, &liReply);
|
listRewind(fake->reply, &liReply);
|
||||||
while ((lnReply = listNext(&liReply)))
|
while ((lnReply = listNext(&liReply)))
|
||||||
{
|
{
|
||||||
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
||||||
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
addReplyProto(replica, reply->buf(), reply->used);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
{
|
{
|
||||||
addReplyAsync(replica,shared.crlf);
|
addReply(replica,shared.crlf);
|
||||||
addReplyProtoAsync(replica, szDbNum, cchDbNum);
|
addReplyProto(replica, szDbNum, cchDbNum);
|
||||||
addReplyProtoAsync(replica, szMvcc, cchMvcc);
|
addReplyProto(replica, szMvcc, cchMvcc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -605,7 +605,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
|||||||
|
|
||||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||||
addReplyProtoAsync(replica,buf,buflen);
|
addReplyProto(replica,buf,buflen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listLength(slaves))
|
if (listLength(slaves))
|
||||||
@ -651,7 +651,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||||||
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
||||||
if (FCorrectThread(c))
|
if (FCorrectThread(c))
|
||||||
lock.lock();
|
lock.lock();
|
||||||
addReplyAsync(monitor,cmdobj);
|
addReply(monitor,cmdobj);
|
||||||
}
|
}
|
||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
}
|
}
|
||||||
@ -3267,7 +3267,7 @@ void replicaofCommand(client *c) {
|
|||||||
miNew->masterhost, miNew->masterport, client);
|
miNew->masterhost, miNew->masterport, client);
|
||||||
sdsfree(client);
|
sdsfree(client);
|
||||||
}
|
}
|
||||||
addReplyAsync(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ROLE command: provide information about the role of the instance
|
/* ROLE command: provide information about the role of the instance
|
||||||
@ -3747,7 +3747,7 @@ void processClientsWaitingReplicas(void) {
|
|||||||
last_numreplicas > c->bpop.numreplicas)
|
last_numreplicas > c->bpop.numreplicas)
|
||||||
{
|
{
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
addReplyLongLongAsync(c,last_numreplicas);
|
addReplyLongLong(c,last_numreplicas);
|
||||||
} else {
|
} else {
|
||||||
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
|
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
|
||||||
|
|
||||||
@ -3755,7 +3755,7 @@ void processClientsWaitingReplicas(void) {
|
|||||||
last_offset = c->bpop.reploffset;
|
last_offset = c->bpop.reploffset;
|
||||||
last_numreplicas = numreplicas;
|
last_numreplicas = numreplicas;
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
addReplyLongLongAsync(c,numreplicas);
|
addReplyLongLong(c,numreplicas);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fastlock_unlock(&c->lock);
|
fastlock_unlock(&c->lock);
|
||||||
|
25
src/server.h
25
src/server.h
@ -2004,17 +2004,14 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
|||||||
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
void readQueryFromClient(connection *conn);
|
void readQueryFromClient(connection *conn);
|
||||||
void addReplyNull(client *c, robj_roptr objOldProtocol = nullptr);
|
void addReplyNull(client *c);
|
||||||
void addReplyNullArray(client *c);
|
void addReplyNullArray(client *c);
|
||||||
void addReplyNullArrayAsync(client *c);
|
|
||||||
void addReplyBool(client *c, int b);
|
void addReplyBool(client *c, int b);
|
||||||
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
|
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
|
||||||
void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext);
|
|
||||||
void addReplyProto(client *c, const char *s, size_t len);
|
void addReplyProto(client *c, const char *s, size_t len);
|
||||||
void addReplyBulk(client *c, robj_roptr obj);
|
void addReplyBulk(client *c, robj_roptr obj);
|
||||||
void AddReplyFromClient(client *c, client *src);
|
void AddReplyFromClient(client *c, client *src);
|
||||||
void addReplyBulkCString(client *c, const char *s);
|
void addReplyBulkCString(client *c, const char *s);
|
||||||
void addReplyBulkCStringAsync(client *c, const char *s);
|
|
||||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
|
||||||
void addReplyBulkLongLong(client *c, long long ll);
|
void addReplyBulkLongLong(client *c, long long ll);
|
||||||
void addReply(client *c, robj_roptr obj);
|
void addReply(client *c, robj_roptr obj);
|
||||||
@ -2026,10 +2023,9 @@ void addReplyError(client *c, const char *err);
|
|||||||
void addReplyStatus(client *c, const char *status);
|
void addReplyStatus(client *c, const char *status);
|
||||||
void addReplyDouble(client *c, double d);
|
void addReplyDouble(client *c, double d);
|
||||||
void addReplyHumanLongDouble(client *c, long double d);
|
void addReplyHumanLongDouble(client *c, long double d);
|
||||||
void addReplyHumanLongDoubleAsync(client *c, long double d);
|
|
||||||
void addReplyLongLong(client *c, long long ll);
|
void addReplyLongLong(client *c, long long ll);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix);
|
||||||
#endif
|
#endif
|
||||||
void addReplyArrayLen(client *c, long length);
|
void addReplyArrayLen(client *c, long length);
|
||||||
void addReplyMapLen(client *c, long length);
|
void addReplyMapLen(client *c, long length);
|
||||||
@ -2074,23 +2070,6 @@ void linkClient(client *c);
|
|||||||
void protectClient(client *c);
|
void protectClient(client *c);
|
||||||
void unprotectClient(client *c);
|
void unprotectClient(client *c);
|
||||||
|
|
||||||
// Special Thread-safe addReply() commands for posting messages to clients from a different thread
|
|
||||||
void addReplyAsync(client *c, robj_roptr obj);
|
|
||||||
void addReplyArrayLenAsync(client *c, long length);
|
|
||||||
void addReplyProtoAsync(client *c, const char *s, size_t len);
|
|
||||||
void addReplyBulkAsync(client *c, robj_roptr obj);
|
|
||||||
void addReplyBulkCBufferAsync(client *c, const void *p, size_t len);
|
|
||||||
void addReplyErrorAsync(client *c, const char *err);
|
|
||||||
void addReplyMapLenAsync(client *c, long length);
|
|
||||||
void addReplyNullAsync(client *c);
|
|
||||||
void addReplyDoubleAsync(client *c, double d);
|
|
||||||
void *addReplyDeferredLenAsync(client *c);
|
|
||||||
void setDeferredArrayLenAsync(client *c, void *node, long length);
|
|
||||||
void addReplySdsAsync(client *c, sds s);
|
|
||||||
void addReplyBulkSdsAsync(client *c, sds s);
|
|
||||||
void addReplyPushLenAsync(client *c, long length);
|
|
||||||
void addReplyLongLongAsync(client *c, long long ll);
|
|
||||||
|
|
||||||
void ProcessPendingAsyncWrites(void);
|
void ProcessPendingAsyncWrites(void);
|
||||||
client *lookupClientByID(uint64_t id);
|
client *lookupClientByID(uint64_t id);
|
||||||
|
|
||||||
|
@ -677,7 +677,7 @@ static void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *val
|
|||||||
listTypePush(dstobj,value,LIST_HEAD);
|
listTypePush(dstobj,value,LIST_HEAD);
|
||||||
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
|
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
|
||||||
/* Always send the pushed value to the client. */
|
/* Always send the pushed value to the client. */
|
||||||
addReplyBulkAsync(c,value);
|
addReplyBulk(c,value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpoplpushCommand(client *c) {
|
void rpoplpushCommand(client *c) {
|
||||||
@ -758,9 +758,9 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
|
||||||
/* BRPOP/BLPOP */
|
/* BRPOP/BLPOP */
|
||||||
addReplyArrayLenAsync(receiver,2);
|
addReplyArrayLen(receiver,2);
|
||||||
addReplyBulkAsync(receiver,key);
|
addReplyBulk(receiver,key);
|
||||||
addReplyBulkAsync(receiver,value);
|
addReplyBulk(receiver,value);
|
||||||
|
|
||||||
/* Notify event. */
|
/* Notify event. */
|
||||||
const char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
const char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||||
|
@ -818,7 +818,7 @@ static void addReplyStreamID(client *c, streamID *id) {
|
|||||||
|
|
||||||
static void addReplyStreamIDAsync(client *c, streamID *id) {
|
static void addReplyStreamIDAsync(client *c, streamID *id) {
|
||||||
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
||||||
addReplyBulkSdsAsync(c,replyid);
|
addReplyBulkSds(c,replyid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Similar to the above function, but just creates an object, usually useful
|
/* Similar to the above function, but just creates an object, usually useful
|
||||||
@ -968,7 +968,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!(flags & STREAM_RWR_RAWENTRIES))
|
if (!(flags & STREAM_RWR_RAWENTRIES))
|
||||||
arraylen_ptr = addReplyDeferredLenAsync(c);
|
arraylen_ptr = addReplyDeferredLen(c);
|
||||||
streamIteratorStart(&si,s,start,end,rev);
|
streamIteratorStart(&si,s,start,end,rev);
|
||||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
/* Update the group last_id if needed. */
|
/* Update the group last_id if needed. */
|
||||||
@ -982,18 +982,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
|
|
||||||
/* Emit a two elements array for each item. The first is
|
/* Emit a two elements array for each item. The first is
|
||||||
* the ID, the second is an array of field-value pairs. */
|
* the ID, the second is an array of field-value pairs. */
|
||||||
addReplyArrayLenAsync(c,2);
|
addReplyArrayLen(c,2);
|
||||||
addReplyStreamIDAsync(c,&id);
|
addReplyStreamIDAsync(c,&id);
|
||||||
|
|
||||||
addReplyArrayLenAsync(c,numfields*2);
|
addReplyArrayLen(c,numfields*2);
|
||||||
|
|
||||||
/* Emit the field-value pairs. */
|
/* Emit the field-value pairs. */
|
||||||
while(numfields--) {
|
while(numfields--) {
|
||||||
unsigned char *key, *value;
|
unsigned char *key, *value;
|
||||||
int64_t key_len, value_len;
|
int64_t key_len, value_len;
|
||||||
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
|
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
|
||||||
addReplyBulkCBufferAsync(c,key,key_len);
|
addReplyBulkCBuffer(c,key,key_len);
|
||||||
addReplyBulkCBufferAsync(c,value,value_len);
|
addReplyBulkCBuffer(c,value,value_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If a group is passed, we need to create an entry in the
|
/* If a group is passed, we need to create an entry in the
|
||||||
@ -1052,7 +1052,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
|
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
|
||||||
|
|
||||||
streamIteratorStop(&si);
|
streamIteratorStop(&si);
|
||||||
if (arraylen_ptr) setDeferredArrayLenAsync(c,arraylen_ptr,arraylen);
|
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
|
||||||
return arraylen;
|
return arraylen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3165,11 +3165,11 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *arraylen_ptr = addReplyDeferredLenAsync(c);
|
void *arraylen_ptr = addReplyDeferredLen(c);
|
||||||
long arraylen = 0;
|
long arraylen = 0;
|
||||||
|
|
||||||
/* We emit the key only for the blocking variant. */
|
/* We emit the key only for the blocking variant. */
|
||||||
if (emitkey) addReplyBulkAsync(c,key);
|
if (emitkey) addReplyBulk(c,key);
|
||||||
|
|
||||||
/* Remove the element. */
|
/* Remove the element. */
|
||||||
do {
|
do {
|
||||||
@ -3219,8 +3219,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
signalModifiedKey(c,c->db,key);
|
signalModifiedKey(c,c->db,key);
|
||||||
}
|
}
|
||||||
|
|
||||||
addReplyBulkCBufferAsync(c,ele,sdslen(ele));
|
addReplyBulkCBuffer(c,ele,sdslen(ele));
|
||||||
addReplyDoubleAsync(c,score);
|
addReplyDouble(c,score);
|
||||||
sdsfree(ele);
|
sdsfree(ele);
|
||||||
arraylen += 2;
|
arraylen += 2;
|
||||||
|
|
||||||
@ -3232,7 +3232,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
}
|
}
|
||||||
} while(--count);
|
} while(--count);
|
||||||
|
|
||||||
setDeferredArrayLenAsync(c,arraylen_ptr,arraylen + (emitkey != 0));
|
setDeferredArrayLen(c,arraylen_ptr,arraylen + (emitkey != 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ZPOPMIN key [<count>] */
|
/* ZPOPMIN key [<count>] */
|
||||||
|
@ -179,7 +179,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (tval < 0) {
|
if (tval < 0) {
|
||||||
addReplyErrorAsync(c,"timeout is negative");
|
addReplyError(c,"timeout is negative");
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,9 +215,9 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
|
|||||||
* are unable to send invalidation messages to the redirected
|
* are unable to send invalidation messages to the redirected
|
||||||
* connection, because the client no longer exist. */
|
* connection, because the client no longer exist. */
|
||||||
if (c->resp > 2) {
|
if (c->resp > 2) {
|
||||||
addReplyPushLenAsync(c,3);
|
addReplyPushLen(c,3);
|
||||||
addReplyBulkCBufferAsync(c,"tracking-redir-broken",21);
|
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
|
||||||
addReplyLongLongAsync(c,c->client_tracking_redirection);
|
addReplyLongLong(c,c->client_tracking_redirection);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -232,8 +232,8 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
|
|||||||
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
|
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
|
||||||
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
|
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
|
||||||
if (c->resp > 2) {
|
if (c->resp > 2) {
|
||||||
addReplyPushLenAsync(c,2);
|
addReplyPushLen(c,2);
|
||||||
addReplyBulkCBufferAsync(c,"invalidate",10);
|
addReplyBulkCBuffer(c,"invalidate",10);
|
||||||
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
|
||||||
/* We use a static object to speedup things, however we assume
|
/* We use a static object to speedup things, however we assume
|
||||||
* that addReplyPubsubMessage() will not take a reference. */
|
* that addReplyPubsubMessage() will not take a reference. */
|
||||||
@ -248,10 +248,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
|
|||||||
|
|
||||||
/* Send the "value" part, which is the array of keys. */
|
/* Send the "value" part, which is the array of keys. */
|
||||||
if (proto) {
|
if (proto) {
|
||||||
addReplyProtoAsync(c,keyname,keylen);
|
addReplyProto(c,keyname,keylen);
|
||||||
} else {
|
} else {
|
||||||
addReplyArrayLenAsync(c,1);
|
addReplyArrayLen(c,1);
|
||||||
addReplyBulkCBufferAsync(c,keyname,keylen);
|
addReplyBulkCBuffer(c,keyname,keylen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user