From 19a4d2d358e941c9d2de7dba59304c2f5bfa28af Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 24 Oct 2020 02:18:03 +0000 Subject: [PATCH] Remove addReply*Async methods since we already know if its async or not. This is just a source of bugs Former-commit-id: df22cdf6e91a1b9c390b69c4209c719ecf1e44f1 --- src/blocked.cpp | 16 +- src/module.cpp | 36 ++--- src/networking.cpp | 348 ++++++++++++-------------------------------- src/object.cpp | 2 +- src/pubsub.cpp | 22 +-- src/replication.cpp | 30 ++-- src/server.h | 25 +--- src/t_list.cpp | 8 +- src/t_stream.cpp | 14 +- src/t_zset.cpp | 10 +- src/timeout.cpp | 2 +- src/tracking.cpp | 16 +- 12 files changed, 172 insertions(+), 357 deletions(-) diff --git a/src/blocked.cpp b/src/blocked.cpp index 7f96fcfec..63cce0996 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -188,9 +188,9 @@ void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { - addReplyNullArrayAsync(c); + addReplyNullArray(c); } else if (c->btype == BLOCKED_WAIT) { - addReplyLongLongAsync(c,replicationCountAcksByOffset(c->bpop.reploffset)); + addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); } else if (c->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { @@ -216,7 +216,7 @@ void disconnectAllBlockedClients(void) { fastlock_lock(&c->lock); if (c->flags & CLIENT_BLOCKED) { - addReplySdsAsync(c,sdsnew( + addReplySds(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> replica?)\r\n")); unblockClient(c); @@ -373,7 +373,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { /* If the group was not found, send an error * to the consumer. */ if (!group) { - addReplyErrorAsync(receiver, + addReplyError(receiver, "-NOGROUP the consumer group this client " "was blocked on no longer exists"); unblockClient(receiver); @@ -404,12 +404,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { * extracted from it. Wrapped in a single-item * array, since we have just one key. */ if (receiver->resp == 2) { - addReplyArrayLenAsync(receiver,1); - addReplyArrayLenAsync(receiver,2); + addReplyArrayLen(receiver,1); + addReplyArrayLen(receiver,2); } else { - addReplyMapLenAsync(receiver,1); + addReplyMapLen(receiver,1); } - addReplyBulkAsync(receiver,rl->key); + addReplyBulk(receiver,rl->key); streamPropInfo pi = { rl->key, diff --git a/src/module.cpp b/src/module.cpp index 73e176e7a..028ffd4cc 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1358,7 +1358,7 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyLongLongAsync(c,ll); + addReplyLongLong(c,ll); return REDISMODULE_OK; } @@ -1371,9 +1371,9 @@ int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyProtoAsync(c,prefix,strlen(prefix)); - addReplyProtoAsync(c,msg,strlen(msg)); - addReplyProtoAsync(c,"\r\n",2); + addReplyProto(c,prefix,strlen(prefix)); + addReplyProto(c,msg,strlen(msg)); + addReplyProto(c,"\r\n",2); return REDISMODULE_OK; } @@ -1426,10 +1426,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* (ctx->postponed_arrays_count+1), MALLOC_LOCAL); ctx->postponed_arrays[ctx->postponed_arrays_count] = - addReplyDeferredLenAsync(c); + addReplyDeferredLen(c); ctx->postponed_arrays_count++; } else { - addReplyArrayLenAsync(c,len); + addReplyArrayLen(c,len); } return REDISMODULE_OK; } @@ -1444,7 +1444,7 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyNullArrayAsync(c); + addReplyNullArray(c); return REDISMODULE_OK; } @@ -1457,7 +1457,7 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyAsync(c,shared.emptyarray); + addReply(c,shared.emptyarray); return REDISMODULE_OK; } @@ -1502,7 +1502,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { return; } ctx->postponed_arrays_count--; - setDeferredArrayLenAsync(c, + setDeferredArrayLen(c, ctx->postponed_arrays[ctx->postponed_arrays_count], len); if (ctx->postponed_arrays_count == 0) { @@ -1520,7 +1520,7 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyBulkCBufferAsync(c,(char*)buf,len); + addReplyBulkCBuffer(c,(char*)buf,len); return REDISMODULE_OK; } @@ -1534,7 +1534,7 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyBulkCStringAsync(c,(char*)buf); + addReplyBulkCString(c,(char*)buf); return REDISMODULE_OK; } @@ -1547,7 +1547,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyBulkAsync(c,str); + addReplyBulk(c,str); return REDISMODULE_OK; } @@ -1560,7 +1560,7 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyAsync(c,shared.emptybulk); + addReply(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1574,7 +1574,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyVerbatimAsync(c, buf, len, "txt"); + addReplyVerbatim(c, buf, len, "txt"); return REDISMODULE_OK; } @@ -1587,7 +1587,7 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyNullAsync(c); + addReplyNull(c); return REDISMODULE_OK; } @@ -1604,7 +1604,7 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { std::unique_lock lock(c->lock); locker.arm(c); sds proto = sdsnewlen(reply->proto, reply->protolen); - addReplySdsAsync(c,proto); + addReplySds(c,proto); return REDISMODULE_OK; } @@ -1620,7 +1620,7 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyDoubleAsync(c,d); + addReplyDouble(c,d); return REDISMODULE_OK; } @@ -1638,7 +1638,7 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { AeLocker locker; std::unique_lock lock(c->lock); locker.arm(c); - addReplyHumanLongDoubleAsync(c, ld); + addReplyHumanLongDouble(c, ld); return REDISMODULE_OK; } diff --git a/src/networking.cpp b/src/networking.cpp index f5698282b..1c3e528c5 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -40,8 +40,6 @@ #include "aelocker.h" static void setProtocolError(const char *errstr, client *c); -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); -void addReplyBulkCStringCore(client *c, const char *s, bool fAsync); /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -251,10 +249,10 @@ void clientInstallAsyncWriteHandler(client *c) { * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns C_ERR no * data should be appended to the output buffers. */ -int prepareClientToWrite(client *c, bool fAsync) { - fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread - serverAssert(FCorrectThread(c) || fAsync); - if (FCorrectThread(c)) { +int prepareClientToWrite(client *c) { + bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread + + if (!fAsync) { serverAssert(c->conn == nullptr || c->lock.fOwnLock()); } else { serverAssert(GlobalLocksAcquired()); @@ -290,10 +288,10 @@ int prepareClientToWrite(client *c, bool fAsync) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { +int _addReplyToBuffer(client *c, const char *s, size_t len) { if (c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_AFTER_REPLY) return C_OK; - fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread + bool fAsync = !FCorrectThread(c); if (fAsync) { serverAssert(GlobalLocksAcquired()); @@ -377,11 +375,12 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) { * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */ -void addReplyCore(client *c, robj_roptr obj, bool fAsync) { - if (prepareClientToWrite(c, fAsync) != C_OK) return; +/* Add the object 'obj' string representation to the client output buffer. */ +void addReply(client *c, robj_roptr obj) { + if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { - if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)),fAsync) != C_OK) + if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))) != C_OK) _addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))); } else if (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string @@ -389,44 +388,26 @@ void addReplyCore(client *c, robj_roptr obj, bool fAsync) { * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj)); - if (_addReplyToBuffer(c,buf,len,fAsync) != C_OK) + if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } } -/* Add the object 'obj' string representation to the client output buffer. */ -void addReply(client *c, robj_roptr obj) -{ - addReplyCore(c, obj, false); -} -void addReplyAsync(client *c, robj_roptr obj) -{ - addReplyCore(c, obj, true); -} - /* Add the SDS 's' string to the client output buffer, as a side effect * the SDS string is freed. */ -void addReplySdsCore(client *c, sds s, bool fAsync) { - if (prepareClientToWrite(c, fAsync) != C_OK) { +void addReplySds(client *c, sds s) { + if (prepareClientToWrite(c) != C_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } - if (_addReplyToBuffer(c,s,sdslen(s), fAsync) != C_OK) + if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) _addReplyProtoToList(c,s,sdslen(s)); sdsfree(s); } -void addReplySds(client *c, sds s) { - addReplySdsCore(c, s, false); -} - -void addReplySdsAsync(client *c, sds s) { - addReplySdsCore(c, s, true); -} - /* This low level function just adds whatever protocol you send it to the * client buffer, trying the static buffer initially, and using the string * of objects if not possible. @@ -435,18 +416,10 @@ void addReplySdsAsync(client *c, sds s) { * if not needed. The object will only be created by calling * _addReplyProtoToList() if we fail to extend the existing tail object * in the list of objects. */ -void addReplyProtoCore(client *c, const char *s, size_t len, bool fAsync) { - if (prepareClientToWrite(c, fAsync) != C_OK) return; - if (_addReplyToBuffer(c,s,len,fAsync) != C_OK) - _addReplyProtoToList(c,s,len); -} - void addReplyProto(client *c, const char *s, size_t len) { - addReplyProtoCore(c, s, len, false); -} - -void addReplyProtoAsync(client *c, const char *s, size_t len) { - addReplyProtoCore(c, s, len, true); + if (prepareClientToWrite(c) != C_OK) return; + if (_addReplyToBuffer(c,s,len) != C_OK) + _addReplyProtoToList(c,s,len); } std::string escapeString(sds str) @@ -486,12 +459,12 @@ std::string escapeString(sds str) * code provided is used, otherwise the string "-ERR " for the generic * error code is automatically added. * Note that 's' must NOT end with \r\n. */ -void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) { +void addReplyErrorLength(client *c, const char *s, size_t len) { /* If the string already starts with "-..." then the error code * is provided by the caller. Otherwise we use "-ERR". */ - if (!len || s[0] != '-') addReplyProtoCore(c,"-ERR ",5,fAsync); - addReplyProtoCore(c,s,len,fAsync); - addReplyProtoCore(c,"\r\n",2,fAsync); + if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); + addReplyProto(c,s,len); + addReplyProto(c,"\r\n",2); } /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ @@ -535,11 +508,6 @@ void afterErrorReply(client *c, const char *s, size_t len) { } } -void addReplyErrorLength(client *c, const char *s, size_t len) -{ - addReplyErrorLengthCore(c, s, len, false); -} - /* The 'err' object is expected to start with -ERRORCODE and end with \r\n. * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ void addReplyErrorObject(client *c, robj *err) { @@ -547,13 +515,8 @@ void addReplyErrorObject(client *c, robj *err) { afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */ } -/* See addReplyErrorLength for expectations from the input string. */ void addReplyError(client *c, const char *err) { - addReplyErrorLengthCore(c,err,strlen(err), false); -} - -void addReplyErrorAsync(client *c, const char *err) { - addReplyErrorLengthCore(c, err, strlen(err), true); + addReplyErrorLength(c, err, strlen(err)); afterErrorReply(c,err,strlen(err)); } @@ -629,19 +592,19 @@ void trimReplyUnusedTailSpace(client *c) { /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ -void *addReplyDeferredLen(client *c) { +void *addReplyDeferredLenCore(client *c) { /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ - if (prepareClientToWrite(c, false) != C_OK) return NULL; + if (prepareClientToWrite(c) != C_OK) return NULL; trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } -void *addReplyDeferredLenAsync(client *c) { +void *addReplyDeferredLen(client *c) { if (FCorrectThread(c)) - return addReplyDeferredLen(c); + return addReplyDeferredLenCore(c); return (void*)((ssize_t)(c->replyAsync ? c->replyAsync->used : 0)); } @@ -718,11 +681,10 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi } void setDeferredArrayLen(client *c, void *node, long length) { - setDeferredAggregateLen(c,node,length,'*'); -} - -void setDeferredArrayLenAsync(client *c, void *node, long length) { - setDeferredAggregateLenAsync(c, node, length, '*'); + if (FCorrectThread(c)) + setDeferredAggregateLen(c,node,length,'*'); + else + setDeferredAggregateLenAsync(c, node, length, '*'); } void setDeferredMapLen(client *c, void *node, long length) { @@ -748,15 +710,15 @@ void setDeferredPushLen(client *c, void *node, long length) { } /* Add a double as a bulk reply */ -void addReplyDoubleCore(client *c, double d, bool fAsync) { +void addReplyDouble(client *c, double d) { if (std::isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ if (c->resp == 2) { - addReplyBulkCStringCore(c, d > 0 ? "inf" : "-inf", fAsync); + addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { - addReplyProtoCore(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", - d > 0 ? 6 : 7, fAsync); + addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", + d > 0 ? 6 : 7); } } else { char dbuf[MAX_LONG_DOUBLE_CHARS+3], @@ -765,52 +727,34 @@ void addReplyDoubleCore(client *c, double d, bool fAsync) { if (c->resp == 2) { dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); - addReplyProtoCore(c,sbuf,slen,fAsync); + addReplyProto(c,sbuf,slen); } else { dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d); - addReplyProtoCore(c,dbuf,dlen,fAsync); + addReplyProto(c,dbuf,dlen); } } } -void addReplyDouble(client *c, double d) { - addReplyDoubleCore(c, d, false); -} - -void addReplyDoubleAsync(client *c, double d) { - addReplyDoubleCore(c, d, true); -} - -void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync); - /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ -void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) { +void addReplyHumanLongDouble(client *c, long double d) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulkCore(c,o,fAsync); + addReplyBulk(c,o); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); - addReplyProtoCore(c,",",1,fAsync); - addReplyProtoCore(c,buf,len,fAsync); - addReplyProtoCore(c,"\r\n",2,fAsync); + addReplyProto(c,",",1); + addReplyProto(c,buf,len); + addReplyProto(c,"\r\n",2); } } -void addReplyHumanLongDouble(client *c, long double d) { - addReplyHumanLongDoubleCore(c, d, false); -} - -void addReplyHumanLongDoubleAsync(client *c, long double d) { - addReplyHumanLongDoubleCore(c, d, true); -} - /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { +void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { char buf[128]; int len; @@ -818,10 +762,10 @@ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool f * so we have a few shared objects to use if the integer is small * like it is most of the times. */ if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReplyCore(c,shared.mbulkhdr[ll], fAsync); + addReply(c,shared.mbulkhdr[ll]); return; } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReplyCore(c,shared.bulkhdr[ll], fAsync); + addReply(c,shared.bulkhdr[ll]); return; } @@ -829,65 +773,33 @@ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool f len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplyProtoCore(c,buf,len+3, fAsync); -} - -void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { - addReplyLongLongWithPrefixCore(c, ll, prefix, false); -} - -void addReplyLongLongCore(client *c, long long ll, bool fAsync) { - if (ll == 0) - addReplyCore(c,shared.czero, fAsync); - else if (ll == 1) - addReplyCore(c,shared.cone, fAsync); - else - addReplyLongLongWithPrefixCore(c,ll,':', fAsync); + addReplyProto(c,buf,len+3); } void addReplyLongLong(client *c, long long ll) { - addReplyLongLongCore(c, ll, false); -} - -void addReplyLongLongAsync(client *c, long long ll) { - addReplyLongLongCore(c, ll, true); -} - -void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) { - if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN) - addReplyCore(c,shared.mbulkhdr[length], fAsync); + if (ll == 0) + addReply(c,shared.czero); + else if (ll == 1) + addReply(c,shared.cone); else - addReplyLongLongWithPrefixCore(c,length,prefix, fAsync); + addReplyLongLongWithPrefix(c,ll,':'); } void addReplyAggregateLen(client *c, long length, int prefix) { - addReplyAggregateLenCore(c, length, prefix, false); -} - -void addReplyArrayLenCore(client *c, long length, bool fAsync) { - addReplyAggregateLenCore(c,length,'*', fAsync); + if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN) + addReply(c,shared.mbulkhdr[length]); + else + addReplyLongLongWithPrefix(c,length,prefix); } void addReplyArrayLen(client *c, long length) { - addReplyArrayLenCore(c, length, false); -} - -void addReplyArrayLenAsync(client *c, long length) { - addReplyArrayLenCore(c, length, true); -} - -void addReplyMapLenCore(client *c, long length, bool fAsync) { - int prefix = c->resp == 2 ? '*' : '%'; - if (c->resp == 2) length *= 2; - addReplyAggregateLenCore(c,length,prefix,fAsync); + addReplyAggregateLen(c,length,'*'); } void addReplyMapLen(client *c, long length) { - addReplyMapLenCore(c, length, false); -} - -void addReplyMapLenAsync(client *c, long length) { - addReplyMapLenCore(c, length, true); + int prefix = c->resp == 2 ? '*' : '%'; + if (c->resp == 2) length *= 2; + addReplyAggregateLen(c,length,prefix); } void addReplySetLen(client *c, long length) { @@ -901,38 +813,19 @@ void addReplyAttributeLen(client *c, long length) { addReplyAggregateLen(c,length,prefix); } -void addReplyPushLenCore(client *c, long length, bool fAsync) { - int prefix = c->resp == 2 ? '*' : '>'; - addReplyAggregateLenCore(c,length,prefix, fAsync); -} - void addReplyPushLen(client *c, long length) { - addReplyPushLenCore(c, length, false); + int prefix = c->resp == 2 ? '*' : '>'; + addReplyAggregateLen(c,length,prefix); } -void addReplyPushLenAsync(client *c, long length) { - addReplyPushLenCore(c, length, true); -} - -void addReplyNullCore(client *c, bool fAsync) { +void addReplyNull(client *c) { if (c->resp == 2) { - addReplyProtoCore(c,"$-1\r\n",5,fAsync); + addReplyProto(c,"$-1\r\n",5); } else { - addReplyProtoCore(c,"_\r\n",3,fAsync); + addReplyProto(c,"_\r\n",3); } } -void addReplyNull(client *c, robj_roptr objOldProtocol) { - if (c->resp < 3 && objOldProtocol != nullptr) - addReply(c, objOldProtocol); - else - addReplyNullCore(c, false); -} - -void addReplyNullAsync(client *c) { - addReplyNullCore(c, true); -} - void addReplyBool(client *c, int b) { if (c->resp == 2) { addReply(c, b ? shared.cone : shared.czero); @@ -945,107 +838,58 @@ void addReplyBool(client *c, int b) { * RESP2 had it, so API-wise we have this call, that will emit the correct * RESP2 protocol, however for RESP3 the reply will always be just the * Null type "_\r\n". */ -void addReplyNullArrayCore(client *c, bool fAsync) +void addReplyNullArray(client *c) { if (c->resp == 2) { - addReplyProtoCore(c,"*-1\r\n",5,fAsync); + addReplyProto(c,"*-1\r\n",5); } else { - addReplyProtoCore(c,"_\r\n",3,fAsync); + addReplyProto(c,"_\r\n",3); } } -void addReplyNullArray(client *c) -{ - addReplyNullArrayCore(c, false); -} - -void addReplyNullArrayAsync(client *c) -{ - addReplyNullArrayCore(c, true); -} - /* Create the length prefix of a bulk reply, example: $2234 */ -void addReplyBulkLenCore(client *c, robj_roptr obj, bool fAsync) { +void addReplyBulkLen(client *c, robj_roptr obj) { size_t len = stringObjectLen(obj); if (len < OBJ_SHARED_BULKHDR_LEN) - addReplyCore(c,shared.bulkhdr[len], fAsync); + addReply(c,shared.bulkhdr[len]); else - addReplyLongLongWithPrefixCore(c,len,'$', fAsync); -} - -void addReplyBulkLen(client *c, robj *obj) -{ - addReplyBulkLenCore(c, obj, false); + addReplyLongLongWithPrefix(c,len,'$'); } /* Add a Redis Object as a bulk reply */ -void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync) { - addReplyBulkLenCore(c,obj,fAsync); - addReplyCore(c,obj,fAsync); - addReplyCore(c,shared.crlf,fAsync); -} - -void addReplyBulk(client *c, robj_roptr obj) -{ - addReplyBulkCore(c, obj, false); -} - -void addReplyBulkAsync(client *c, robj_roptr obj) -{ - addReplyBulkCore(c, obj, true); +void addReplyBulk(client *c, robj_roptr obj) { + addReplyBulkLen(c,obj); + addReply(c,obj); + addReply(c,shared.crlf); } /* Add a C buffer as bulk reply */ -void addReplyBulkCBufferCore(client *c, const void *p, size_t len, bool fAsync) { - addReplyLongLongWithPrefixCore(c,len,'$',fAsync); - addReplyProtoCore(c,(const char*)p,len,fAsync); - addReplyCore(c,shared.crlf,fAsync); -} - void addReplyBulkCBuffer(client *c, const void *p, size_t len) { - addReplyBulkCBufferCore(c, p, len, false); -} - -void addReplyBulkCBufferAsync(client *c, const void *p, size_t len) { - addReplyBulkCBufferCore(c, p, len, true); + addReplyLongLongWithPrefix(c,len,'$'); + addReplyProto(c,(const char*)p,len); + addReply(c,shared.crlf); } /* Add sds to reply (takes ownership of sds and frees it) */ -void addReplyBulkSdsCore(client *c, sds s, bool fAsync) { - addReplyLongLongWithPrefixCore(c,sdslen(s),'$', fAsync); - addReplySdsCore(c,s,fAsync); - addReplyCore(c,shared.crlf,fAsync); -} - -void addReplyBulkSds(client *c, sds s) { - addReplyBulkSdsCore(c, s, false); -} - -void addReplyBulkSdsAsync(client *c, sds s) { - addReplyBulkSdsCore(c, s, true); +void addReplyBulkSds(client *c, sds s) { + addReplyLongLongWithPrefix(c,sdslen(s),'$'); + addReplySds(c,s); + addReply(c,shared.crlf); } /* Add a C null term string as bulk reply */ -void addReplyBulkCStringCore(client *c, const char *s, bool fAsync) { +void addReplyBulkCString(client *c, const char *s) { if (s == NULL) { if (c->resp < 3) - addReplyCore(c,shared.nullbulk, fAsync); + addReply(c,shared.nullbulk); else - addReplyNullCore(c,fAsync); + addReplyNull(c); } else { - addReplyBulkCBufferCore(c,s,strlen(s),fAsync); + addReplyBulkCBuffer(c,s,strlen(s)); } } -void addReplyBulkCString(client *c, const char *s) { - addReplyBulkCStringCore(c, s, false); -} - -void addReplyBulkCStringAsync(client *c, const char *s) { - addReplyBulkCStringCore(c, s, true); -} - /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -1064,9 +908,9 @@ void addReplyBulkLongLong(client *c, long long ll) { * three first characters of the extension are used, and if the * provided one is shorter than that, the remaining is filled with * spaces. */ -void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) { +void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { if (c->resp == 2) { - addReplyBulkCBufferCore(c,s,len,fAsync); + addReplyBulkCBuffer(c,s,len); } else { char buf[32]; size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); @@ -1078,20 +922,12 @@ void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, p[i] = *ext++; } } - addReplyProtoCore(c,buf,preflen,fAsync); - addReplyProtoCore(c,s,len,fAsync); - addReplyProtoCore(c,"\r\n",2,fAsync); + addReplyProto(c,buf,preflen); + addReplyProto(c,s,len); + addReplyProto(c,"\r\n",2); } } -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { - addReplyVerbatimCore(c, s, len, ext, false); -} - -void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) { - addReplyVerbatimCore(c, s, len, ext, true); -} - /* Add an array of C strings as status replies with a heading. * This function is typically invoked by from commands that support * subcommands in response to the 'help' subcommand. The help array @@ -1127,7 +963,7 @@ void addReplySubcommandSyntaxError(client *c) { /* Append 'src' client output buffers into 'dst' client output buffers. * This function clears the output buffers of 'src' */ void AddReplyFromClient(client *dst, client *src) { - if (prepareClientToWrite(dst, false) != C_OK) + if (prepareClientToWrite(dst) != C_OK) return; addReplyProto(dst,src->buf, src->bufpos); if (listLength(src->reply)) @@ -1907,7 +1743,7 @@ void ProcessPendingAsyncWrites() if (FCorrectThread(c)) { - prepareClientToWrite(c, false); // queue an event + prepareClientToWrite(c); // queue an event } else { @@ -2898,7 +2734,7 @@ NULL if (target && target->flags & CLIENT_BLOCKED) { std::unique_lock ul(target->lock); if (unblock_error) - addReplyErrorAsync(target, + addReplyError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else replyToBlockedClientTimedOut(target); diff --git a/src/object.cpp b/src/object.cpp index 4b32c5a4d..4988fa0bf 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -433,7 +433,7 @@ robj *resetRefCount(robj *obj) { int checkType(client *c, robj_roptr o, int type) { if (o->type != type) { - addReplyAsync(c,shared.wrongtypeerr); + addReply(c,shared.wrongtypeerr); return 1; } return 0; diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 3ccbb6a66..176a8271e 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -43,12 +43,12 @@ int clientSubscriptionsCount(client *c); * addReply*() API family. */ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { if (c->resp == 2) - addReplyAsync(c,shared.mbulkhdr[3]); + addReply(c,shared.mbulkhdr[3]); else - addReplyPushLenAsync(c,3); - addReplyAsync(c,shared.messagebulk); - addReplyBulkAsync(c,channel); - if (msg) addReplyBulkAsync(c,msg); + addReplyPushLen(c,3); + addReply(c,shared.messagebulk); + addReplyBulk(c,channel); + if (msg) addReplyBulk(c,msg); } /* Send a pubsub message of type "pmessage" to the client. The difference @@ -56,13 +56,13 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { * this message format also includes the pattern that matched the message. */ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { if (c->resp == 2) - addReplyAsync(c,shared.mbulkhdr[4]); + addReply(c,shared.mbulkhdr[4]); else - addReplyPushLenAsync(c,4); - addReplyAsync(c,shared.pmessagebulk); - addReplyBulkAsync(c,pat); - addReplyBulkAsync(c,channel); - addReplyBulkAsync(c,msg); + addReplyPushLen(c,4); + addReply(c,shared.pmessagebulk); + addReplyBulk(c,pat); + addReplyBulk(c,channel); + addReplyBulk(c,msg); } /* Send the pubsub subscription notification to the client. */ diff --git a/src/replication.cpp b/src/replication.cpp index cf63c6e81..b9de0bce5 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -315,7 +315,7 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves */ - addReplyAsync(replica,selectcmd); + addReply(replica,selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); @@ -329,18 +329,18 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo if (fSendRaw) { /* Add the multi bulk length. */ - addReplyArrayLenAsync(replica,argc); + addReplyArrayLen(replica,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (int j = 0; j < argc; j++) - addReplyBulkAsync(replica,argv[j]); + addReplyBulk(replica,argv[j]); } else { struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); - addReplyProtoAsync(replica, buf, sdslen(buf)); + addReplyProto(replica, buf, sdslen(buf)); sdsfree(buf); } } @@ -516,21 +516,21 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * or are already in sync with the master. */ if (!fSendRaw) - addReplyProtoAsync(replica, proto, cchProto); + addReplyProto(replica, proto, cchProto); - addReplyProtoAsync(replica,fake->buf,fake->bufpos); + addReplyProto(replica,fake->buf,fake->bufpos); listRewind(fake->reply, &liReply); while ((lnReply = listNext(&liReply))) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - addReplyProtoAsync(replica, reply->buf(), reply->used); + addReplyProto(replica, reply->buf(), reply->used); } if (!fSendRaw) { - addReplyAsync(replica,shared.crlf); - addReplyProtoAsync(replica, szDbNum, cchDbNum); - addReplyProtoAsync(replica, szMvcc, cchMvcc); + addReply(replica,shared.crlf); + addReplyProto(replica, szDbNum, cchDbNum); + addReplyProto(replica, szMvcc, cchMvcc); } } @@ -605,7 +605,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReplyProtoAsync(replica,buf,buflen); + addReplyProto(replica,buf,buflen); } if (listLength(slaves)) @@ -651,7 +651,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() if (FCorrectThread(c)) lock.lock(); - addReplyAsync(monitor,cmdobj); + addReply(monitor,cmdobj); } decrRefCount(cmdobj); } @@ -3267,7 +3267,7 @@ void replicaofCommand(client *c) { miNew->masterhost, miNew->masterport, client); sdsfree(client); } - addReplyAsync(c,shared.ok); + addReply(c,shared.ok); } /* ROLE command: provide information about the role of the instance @@ -3747,7 +3747,7 @@ void processClientsWaitingReplicas(void) { last_numreplicas > c->bpop.numreplicas) { unblockClient(c); - addReplyLongLongAsync(c,last_numreplicas); + addReplyLongLong(c,last_numreplicas); } else { int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); @@ -3755,7 +3755,7 @@ void processClientsWaitingReplicas(void) { last_offset = c->bpop.reploffset; last_numreplicas = numreplicas; unblockClient(c); - addReplyLongLongAsync(c,numreplicas); + addReplyLongLong(c,numreplicas); } } fastlock_unlock(&c->lock); diff --git a/src/server.h b/src/server.h index 198633f90..9f7d8f22c 100644 --- a/src/server.h +++ b/src/server.h @@ -2004,17 +2004,14 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(connection *conn); -void addReplyNull(client *c, robj_roptr objOldProtocol = nullptr); +void addReplyNull(client *c); void addReplyNullArray(client *c); -void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); -void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); -void addReplyBulkCStringAsync(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); @@ -2026,10 +2023,9 @@ void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); -void addReplyHumanLongDoubleAsync(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); +void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix); #endif void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); @@ -2074,23 +2070,6 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); -// Special Thread-safe addReply() commands for posting messages to clients from a different thread -void addReplyAsync(client *c, robj_roptr obj); -void addReplyArrayLenAsync(client *c, long length); -void addReplyProtoAsync(client *c, const char *s, size_t len); -void addReplyBulkAsync(client *c, robj_roptr obj); -void addReplyBulkCBufferAsync(client *c, const void *p, size_t len); -void addReplyErrorAsync(client *c, const char *err); -void addReplyMapLenAsync(client *c, long length); -void addReplyNullAsync(client *c); -void addReplyDoubleAsync(client *c, double d); -void *addReplyDeferredLenAsync(client *c); -void setDeferredArrayLenAsync(client *c, void *node, long length); -void addReplySdsAsync(client *c, sds s); -void addReplyBulkSdsAsync(client *c, sds s); -void addReplyPushLenAsync(client *c, long length); -void addReplyLongLongAsync(client *c, long long ll); - void ProcessPendingAsyncWrites(void); client *lookupClientByID(uint64_t id); diff --git a/src/t_list.cpp b/src/t_list.cpp index 74634f91d..e81d94f1a 100644 --- a/src/t_list.cpp +++ b/src/t_list.cpp @@ -677,7 +677,7 @@ static void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *val listTypePush(dstobj,value,LIST_HEAD); notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id); /* Always send the pushed value to the client. */ - addReplyBulkAsync(c,value); + addReplyBulk(c,value); } void rpoplpushCommand(client *c) { @@ -758,9 +758,9 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); /* BRPOP/BLPOP */ - addReplyArrayLenAsync(receiver,2); - addReplyBulkAsync(receiver,key); - addReplyBulkAsync(receiver,value); + addReplyArrayLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,value); /* Notify event. */ const char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; diff --git a/src/t_stream.cpp b/src/t_stream.cpp index fd7a1329b..66157dbb1 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -818,7 +818,7 @@ static void addReplyStreamID(client *c, streamID *id) { static void addReplyStreamIDAsync(client *c, streamID *id) { sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); - addReplyBulkSdsAsync(c,replyid); + addReplyBulkSds(c,replyid); } /* Similar to the above function, but just creates an object, usually useful @@ -968,7 +968,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end } if (!(flags & STREAM_RWR_RAWENTRIES)) - arraylen_ptr = addReplyDeferredLenAsync(c); + arraylen_ptr = addReplyDeferredLen(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ @@ -982,18 +982,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ - addReplyArrayLenAsync(c,2); + addReplyArrayLen(c,2); addReplyStreamIDAsync(c,&id); - addReplyArrayLenAsync(c,numfields*2); + addReplyArrayLen(c,numfields*2); /* Emit the field-value pairs. */ while(numfields--) { unsigned char *key, *value; int64_t key_len, value_len; streamIteratorGetField(&si,&key,&value,&key_len,&value_len); - addReplyBulkCBufferAsync(c,key,key_len); - addReplyBulkCBufferAsync(c,value,value_len); + addReplyBulkCBuffer(c,key,key_len); + addReplyBulkCBuffer(c,value,value_len); } /* If a group is passed, we need to create an entry in the @@ -1052,7 +1052,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamPropagateGroupID(c,spi->keyname,group,spi->groupname); streamIteratorStop(&si); - if (arraylen_ptr) setDeferredArrayLenAsync(c,arraylen_ptr,arraylen); + if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); return arraylen; } diff --git a/src/t_zset.cpp b/src/t_zset.cpp index a2118b348..973e22ce6 100644 --- a/src/t_zset.cpp +++ b/src/t_zset.cpp @@ -3165,11 +3165,11 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey return; } - void *arraylen_ptr = addReplyDeferredLenAsync(c); + void *arraylen_ptr = addReplyDeferredLen(c); long arraylen = 0; /* We emit the key only for the blocking variant. */ - if (emitkey) addReplyBulkAsync(c,key); + if (emitkey) addReplyBulk(c,key); /* Remove the element. */ do { @@ -3219,8 +3219,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey signalModifiedKey(c,c->db,key); } - addReplyBulkCBufferAsync(c,ele,sdslen(ele)); - addReplyDoubleAsync(c,score); + addReplyBulkCBuffer(c,ele,sdslen(ele)); + addReplyDouble(c,score); sdsfree(ele); arraylen += 2; @@ -3232,7 +3232,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey } } while(--count); - setDeferredArrayLenAsync(c,arraylen_ptr,arraylen + (emitkey != 0)); + setDeferredArrayLen(c,arraylen_ptr,arraylen + (emitkey != 0)); } /* ZPOPMIN key [] */ diff --git a/src/timeout.cpp b/src/timeout.cpp index d59bc44e6..18a553211 100644 --- a/src/timeout.cpp +++ b/src/timeout.cpp @@ -179,7 +179,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int } if (tval < 0) { - addReplyErrorAsync(c,"timeout is negative"); + addReplyError(c,"timeout is negative"); return C_ERR; } diff --git a/src/tracking.cpp b/src/tracking.cpp index ad10c3a57..58c675096 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -215,9 +215,9 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { * are unable to send invalidation messages to the redirected * connection, because the client no longer exist. */ if (c->resp > 2) { - addReplyPushLenAsync(c,3); - addReplyBulkCBufferAsync(c,"tracking-redir-broken",21); - addReplyLongLongAsync(c,c->client_tracking_redirection); + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); } return; } @@ -232,8 +232,8 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { * in Pub/Sub mode, we can support the feature with RESP 2 as well, * by sending Pub/Sub messages in the __redis__:invalidate channel. */ if (c->resp > 2) { - addReplyPushLenAsync(c,2); - addReplyBulkCBufferAsync(c,"invalidate",10); + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ @@ -248,10 +248,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { /* Send the "value" part, which is the array of keys. */ if (proto) { - addReplyProtoAsync(c,keyname,keylen); + addReplyProto(c,keyname,keylen); } else { - addReplyArrayLenAsync(c,1); - addReplyBulkCBufferAsync(c,keyname,keylen); + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); } }