diff --git a/src/cluster.cpp b/src/cluster.cpp index aa366c6ef..d4e8567e3 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -971,7 +971,7 @@ int clusterAddNode(clusterNode *node) { return (retval == DICT_OK) ? C_OK : C_ERR; } -/* Remove a node from the cluster. The functio performs the high level +/* Remove a node from the cluster. The function performs the high level * cleanup, calling freeClusterNode() for the low level cleanup. * Here we do the following: * diff --git a/src/debug.cpp b/src/debug.cpp index 0ed278477..4f8383b3b 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -504,7 +504,7 @@ NULL "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", (void*)val, static_cast(val->getrefcount(std::memory_order_relaxed)), - strenc, rdbSavedObjectLen(val), + strenc, rdbSavedObjectLen(val, c->argv[2]), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { dictEntry *de; @@ -1081,6 +1081,61 @@ void logRegisters(ucontext_t *uc) { (unsigned long) uc->uc_mcontext.gregs[18] ); logStackContent((void**)uc->uc_mcontext.gregs[15]); + #elif defined(__aarch64__) /* Linux AArch64 */ + serverLog(LL_WARNING, + "\n" + "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n" + "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n" + "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n" + "X30:%016lx\n" + "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n", + (unsigned long) uc->uc_mcontext.regs[18], + (unsigned long) uc->uc_mcontext.regs[19], + (unsigned long) uc->uc_mcontext.regs[20], + (unsigned long) uc->uc_mcontext.regs[21], + (unsigned long) uc->uc_mcontext.regs[22], + (unsigned long) uc->uc_mcontext.regs[23], + (unsigned long) uc->uc_mcontext.regs[24], + (unsigned long) uc->uc_mcontext.regs[25], + (unsigned long) uc->uc_mcontext.regs[26], + (unsigned long) uc->uc_mcontext.regs[27], + (unsigned long) uc->uc_mcontext.regs[28], + (unsigned long) uc->uc_mcontext.regs[29], + (unsigned long) uc->uc_mcontext.regs[30], + (unsigned long) uc->uc_mcontext.pc, + (unsigned long) uc->uc_mcontext.sp, + (unsigned long) uc->uc_mcontext.pstate, + (unsigned long) uc->uc_mcontext.fault_address + ); + logStackContent((void**)uc->uc_mcontext.sp); + #elif defined(__arm__) /* Linux ARM */ + serverLog(LL_WARNING, + "\n" + "R10:%016lx R9 :%016lx\nR8 :%016lx R7 :%016lx\n" + "R6 :%016lx R5 :%016lx\nR4 :%016lx R3 :%016lx\n" + "R2 :%016lx R1 :%016lx\nR0 :%016lx EC :%016lx\n" + "fp: %016lx ip:%016lx\n", + "pc:%016lx sp:%016lx\ncpsr:%016lx fault_address:%016lx\n", + (unsigned long) uc->uc_mcontext.arm_r10, + (unsigned long) uc->uc_mcontext.arm_r9, + (unsigned long) uc->uc_mcontext.arm_r8, + (unsigned long) uc->uc_mcontext.arm_r7, + (unsigned long) uc->uc_mcontext.arm_r6, + (unsigned long) uc->uc_mcontext.arm_r5, + (unsigned long) uc->uc_mcontext.arm_r4, + (unsigned long) uc->uc_mcontext.arm_r3, + (unsigned long) uc->uc_mcontext.arm_r2, + (unsigned long) uc->uc_mcontext.arm_r1, + (unsigned long) uc->uc_mcontext.arm_r0, + (unsigned long) uc->uc_mcontext.error_code, + (unsigned long) uc->uc_mcontext.arm_fp, + (unsigned long) uc->uc_mcontext.arm_ip, + (unsigned long) uc->uc_mcontext.arm_pc, + (unsigned long) uc->uc_mcontext.arm_sp, + (unsigned long) uc->uc_mcontext.arm_cpsr, + (unsigned long) uc->uc_mcontext.fault_address + ); + logStackContent((void**)uc->uc_mcontext.arm_sp); #endif #elif defined(__FreeBSD__) #if defined(__x86_64__) @@ -1221,33 +1276,6 @@ void logRegisters(ucontext_t *uc) { (unsigned long) uc->uc_mcontext.mc_cs ); logStackContent((void**)uc->uc_mcontext.mc_rsp); -#elif defined(__aarch64__) /* Linux AArch64 */ - serverLog(LL_WARNING, - "\n" - "X18:%016lx X19:%016lx\nX20:%016lx X21:%016lx\n" - "X22:%016lx X23:%016lx\nX24:%016lx X25:%016lx\n" - "X26:%016lx X27:%016lx\nX28:%016lx X29:%016lx\n" - "X30:%016lx\n" - "pc:%016lx sp:%016lx\npstate:%016lx fault_address:%016lx\n", - (unsigned long) uc->uc_mcontext.regs[18], - (unsigned long) uc->uc_mcontext.regs[19], - (unsigned long) uc->uc_mcontext.regs[20], - (unsigned long) uc->uc_mcontext.regs[21], - (unsigned long) uc->uc_mcontext.regs[22], - (unsigned long) uc->uc_mcontext.regs[23], - (unsigned long) uc->uc_mcontext.regs[24], - (unsigned long) uc->uc_mcontext.regs[25], - (unsigned long) uc->uc_mcontext.regs[26], - (unsigned long) uc->uc_mcontext.regs[27], - (unsigned long) uc->uc_mcontext.regs[28], - (unsigned long) uc->uc_mcontext.regs[29], - (unsigned long) uc->uc_mcontext.regs[30], - (unsigned long) uc->uc_mcontext.pc, - (unsigned long) uc->uc_mcontext.sp, - (unsigned long) uc->uc_mcontext.pstate, - (unsigned long) uc->uc_mcontext.fault_address - ); - logStackContent((void**)uc->uc_mcontext.sp); #else serverLog(LL_WARNING, " Dumping of registers not supported for this OS/arch"); diff --git a/src/expire.cpp b/src/expire.cpp index ae9c75e8a..0d435af40 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -704,6 +704,7 @@ void persistCommand(client *c) { if (lookupKeyWrite(c->db,c->argv[1])) { if (c->argc == 2) { if (removeExpire(c->db,c->argv[1])) { + notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id); addReply(c,shared.cone); g_pserver->dirty++; } else { @@ -711,6 +712,7 @@ void persistCommand(client *c) { } } else if (c->argc == 3) { if (removeSubkeyExpire(c->db, c->argv[1], c->argv[2])) { + notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id); addReply(c,shared.cone); g_pserver->dirty++; } else { diff --git a/src/networking.cpp b/src/networking.cpp index dd2928879..f43dea60b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2901,6 +2901,25 @@ NULL return; } + if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT) + { + addReplyError(c, + "You can't specify both OPTIN mode and OPTOUT mode"); + zfree(prefix); + return; + } + + if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) || + (options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN)) + { + addReplyError(c, + "You can't switch OPTIN/OPTOUT mode before disabling " + "tracking for this client, and then re-enabling it with " + "a different mode."); + zfree(prefix); + return; + } + enableTracking(c,redir,options,prefix,numprefix); } else if (!strcasecmp(szFromObj(c->argv[2]),"off")) { disableTracking(c); diff --git a/src/rdb.cpp b/src/rdb.cpp index 3d9221f37..10c22e31f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1047,8 +1047,8 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, const char *key, long long val) { * the rdbSaveObject() function. Currently we use a trick to get * this length with very little changes to the code. In the future * we could switch to a faster solution. */ -size_t rdbSavedObjectLen(robj *o) { - ssize_t len = rdbSaveObject(NULL,o,NULL); +size_t rdbSavedObjectLen(robj *o, robj *key) { + ssize_t len = rdbSaveObject(NULL,o,key); serverAssertWithInfo(NULL,o,len != -1); return len; } diff --git a/src/rdb.h b/src/rdb.h index 925a51903..7e5406b18 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -151,7 +151,7 @@ int rdbSaveFp(FILE *pf, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi, int rdbflags); ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key); -size_t rdbSavedObjectLen(robj *o); +size_t rdbSavedObjectLen(robj *o, robj *key); robj *rdbLoadObject(int type, rio *rdb, robj *key, uint64_t mvcc_tstamp); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 6f40b5449..ab06f733c 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -1928,11 +1928,21 @@ void xackCommand(client *c) { return; } + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully acknowledged some messages, so it should be + * executed in a "all or nothing" fashion. */ + for (int j = 3; j < c->argc; j++) { + streamID id; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + } + int acknowledged = 0; for (int j = 3; j < c->argc; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) + serverPanic("StreamID invalid after check. Should not be possible."); streamEncodeID(buf,&id); /* Lookup the ID in the group PEL: it will have a reference to the diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a27e1f582..04661707b 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -93,6 +93,18 @@ start_server { assert {[r XACK mystream mygroup $id1 $id2] eq 1} } + test {XACK should fail if got at least one invalid ID} { + r del mystream + r xgroup create s g $ MKSTREAM + r xadd s * f1 v1 + set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]] + assert {$c == 1} + set pending [r xpending s g - + 10 c] + set id1 [lindex $pending 0 0] + assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id} + assert {[r xack s g $id1] eq 1} + } + test {PEL NACK reassignment after XGROUP SETID event} { r del events r xadd events * f1 v1