Don't send replies to slaves after we've queued them to be closed

Former-commit-id: a52cd974b90cdf00b7f10525e754755ca5428dbb
This commit is contained in:
John Sully 2019-03-06 15:21:05 -05:00
parent 9a615a4850
commit 809cbfd495
2 changed files with 6 additions and 4 deletions

View File

@ -1059,7 +1059,7 @@ int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
* *
* If the HyperLogLog is sparse and is found to be invalid, C_ERR * If the HyperLogLog is sparse and is found to be invalid, C_ERR
* is returned, otherwise the function always succeeds. */ * is returned, otherwise the function always succeeds. */
int hllMerge(uint8_t *max, robj *hll) { int hllMerge(uint8_t *max, size_t cmax, robj *hll) {
struct hllhdr *hdr = ptrFromObj(hll); struct hllhdr *hdr = ptrFromObj(hll);
int i; int i;
@ -1089,6 +1089,8 @@ int hllMerge(uint8_t *max, robj *hll) {
runlen = HLL_SPARSE_VAL_LEN(p); runlen = HLL_SPARSE_VAL_LEN(p);
regval = HLL_SPARSE_VAL_VALUE(p); regval = HLL_SPARSE_VAL_VALUE(p);
while(runlen--) { while(runlen--) {
if (i < 0 || (size_t)i >= cmax)
return C_ERR;
if (regval > max[i]) max[i] = regval; if (regval > max[i]) max[i] = regval;
i++; i++;
} }
@ -1237,7 +1239,7 @@ void pfcountCommand(client *c) {
/* Merge with this HLL with our 'max' HHL by setting max[i] /* Merge with this HLL with our 'max' HHL by setting max[i]
* to MAX(max[i],hll[i]). */ * to MAX(max[i],hll[i]). */
if (hllMerge(registers,o) == C_ERR) { if (hllMerge(registers,sizeof(max),o) == C_ERR) {
addReplySds(c,sdsnew(invalid_hll_err)); addReplySds(c,sdsnew(invalid_hll_err));
return; return;
} }
@ -1324,7 +1326,7 @@ void pfmergeCommand(client *c) {
/* Merge with this HLL with our 'max' HHL by setting max[i] /* Merge with this HLL with our 'max' HHL by setting max[i]
* to MAX(max[i],hll[i]). */ * to MAX(max[i],hll[i]). */
if (hllMerge(max,o) == C_ERR) { if (hllMerge(max,sizeof(max),o) == C_ERR) {
addReplySds(c,sdsnew(invalid_hll_err)); addReplySds(c,sdsnew(invalid_hll_err));
return; return;
} }

View File

@ -1021,7 +1021,7 @@ void copyClientOutputBuffer(client *dst, client *src) {
/* Return true if the specified client has pending reply buffers to write to /* Return true if the specified client has pending reply buffers to write to
* the socket. */ * the socket. */
int clientHasPendingReplies(client *c) { int clientHasPendingReplies(client *c) {
return c->bufpos || listLength(c->reply); return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
} }
#define MAX_ACCEPTS_PER_CALL 1000 #define MAX_ACCEPTS_PER_CALL 1000