Replace client flags to bitfield (#614)

This commit is contained in:
skyfirelee 2024-07-01 02:33:10 +08:00 committed by GitHub
parent 7415a576a8
commit e4c1f6d45a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 549 additions and 542 deletions

View File

@ -506,11 +506,11 @@ void ACLFreeUserAndKillClients(user *u) {
* more defensive to set the default user and put
* it in non authenticated mode. */
c->user = DefaultUser;
c->flags &= ~CLIENT_AUTHENTICATED;
c->flag.authenticated = 0;
/* We will write replies to this client later, so we can't
* close it directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
c->flag.close_after_command = 1;
} else {
freeClientAsync(c);
}
@ -1494,13 +1494,13 @@ void addAuthErrReply(client *c, robj *err) {
* The return value is AUTH_OK on success (valid username / password pair) & AUTH_ERR otherwise. */
int checkPasswordBasedAuth(client *c, robj *username, robj *password) {
if (ACLCheckUserCredentials(username, password) == C_OK) {
c->flags |= CLIENT_AUTHENTICATED;
c->flag.authenticated = 1;
c->user = ACLGetUserByName(username->ptr, sdslen(username->ptr));
moduleNotifyUserChanged(c);
return AUTH_OK;
} else {
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0,
username->ptr, NULL);
addACLLogEntry(c, ACL_DENIED_AUTH, (c->flag.multi) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, 0, username->ptr,
NULL);
return AUTH_ERR;
}
}

View File

@ -1364,7 +1364,8 @@ struct client *createAOFClient(void) {
* background processing there is a chance that the
* command execution order will be violated.
*/
c->flags = CLIENT_DENY_BLOCKING;
c->raw_flag = 0;
c->flag.deny_blocking = 1;
/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
@ -1536,7 +1537,7 @@ int loadSingleAppendOnlyFile(char *filename) {
/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;
if (fakeClient->flags & CLIENT_MULTI && fakeClient->cmd->proc != execCommand) {
if (fakeClient->flag.multi && fakeClient->cmd->proc != execCommand) {
/* Note: we don't have to attempt calling evalGetCommandFlags,
* since this is AOF, the checks in processCommand are not made
* anyway.*/
@ -1549,7 +1550,7 @@ int loadSingleAppendOnlyFile(char *filename) {
serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
serverAssert(fakeClient->flag.blocked == 0);
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
@ -1562,7 +1563,7 @@ int loadSingleAppendOnlyFile(char *filename) {
* If the client is in the middle of a MULTI/EXEC, handle it as it was
* a short read, even if technically the protocol is correct: we want
* to remove the unprocessed tail and continue. */
if (fakeClient->flags & CLIENT_MULTI) {
if (fakeClient->flag.multi) {
serverLog(LL_WARNING, "Revert incomplete MULTI/EXEC transaction in AOF file %s", filename);
valid_up_to = valid_before_multi;
goto uxeof;

View File

@ -87,11 +87,11 @@ void initClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Primary client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flag.primary && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
c->flags |= CLIENT_BLOCKED;
c->flag.blocked = 1;
c->bstate.btype = btype;
if (!(c->flags & CLIENT_MODULE))
if (!c->flag.module)
server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
@ -130,10 +130,10 @@ void processUnblockedClients(void) {
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients, ln);
c->flags &= ~CLIENT_UNBLOCKED;
c->flag.unblocked = 0;
if (c->flags & CLIENT_MODULE) {
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->flag.module) {
if (!c->flag.blocked) {
moduleCallCommandUnblockedHandler(c);
}
continue;
@ -143,7 +143,7 @@ void processUnblockedClients(void) {
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (!c->flag.blocked) {
/* If we have a queued command, execute it now. */
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
@ -172,8 +172,8 @@ void processUnblockedClients(void) {
void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
if (!c->flag.unblocked) {
c->flag.unblocked = 1;
listAddNodeTail(server.unblocked_clients, c);
}
}
@ -199,7 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
@ -210,11 +210,11 @@ void unblockClient(client *c, int queue_for_reprocessing) {
}
/* We count blocked client stats on regular clients and not on module clients */
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--;
if (!c->flag.module) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flags &= ~CLIENT_BLOCKED;
c->flag.blocked = 0;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
@ -256,7 +256,7 @@ void replyToClientsBlockedOnShutdown(void) {
listRewind(server.clients, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
if (c->flag.blocked && c->bstate.btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
}
@ -278,7 +278,7 @@ void disconnectAllBlockedClients(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
if (c->flag.blocked) {
/* POSTPONEd clients are an exception, when they'll be unblocked, the
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
@ -287,7 +287,7 @@ void disconnectAllBlockedClients(void) {
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
c->flag.close_after_reply = 1;
}
}
}
@ -368,7 +368,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
if (!c->flag.reprocessing_command) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
@ -411,7 +411,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
/* Currently we assume key blocking will require reprocessing the command.
* However in case of modules, they have a different way to handle the reprocessing
* which does not require setting the pending command flag */
if (btype != BLOCKED_MODULE) c->flags |= CLIENT_PENDING_COMMAND;
if (btype != BLOCKED_MODULE) c->flag.pending_command = 1;
blockClient(c, btype);
}
@ -605,7 +605,7 @@ void blockPostponeClient(client *c) {
listAddNodeTail(server.postponed_clients, c);
c->postponed_list_node = listLast(server.postponed_clients);
/* Mark this client to execute its command */
c->flags |= CLIENT_PENDING_COMMAND;
c->flag.pending_command = 1;
}
/* Block client due to shutdown command */
@ -633,8 +633,8 @@ static void unblockClientOnKey(client *c, robj *key) {
unblockClient(c, 0);
/* In case this client was blocked on keys during command
* we need to re process the command again */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) {
c->flag.pending_command = 0;
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
* to run atomically, this is why we must enter the execution unit here before
* running the command, and exit the execution unit after calling the unblock handler (if exists).
@ -644,8 +644,8 @@ static void unblockClientOnKey(client *c, robj *key) {
server.current_client = c;
enterExecutionUnit(1, 0);
processCommandAndResetClient(c);
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->flags & CLIENT_MODULE) {
if (!c->flag.blocked) {
if (c->flag.module) {
moduleCallCommandUnblockedHandler(c);
} else {
queueClientForReprocessing(c);
@ -690,7 +690,7 @@ void unblockClientOnTimeout(client *c) {
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) c->flag.pending_command = 0;
unblockClient(c, 1);
}
@ -699,7 +699,7 @@ void unblockClientOnTimeout(client *c) {
void unblockClientOnError(client *c, const char *err_str) {
if (err_str) addReplyError(c, err_str);
updateStatsOnUnblock(c, 0, 0, 1);
if (c->flags & CLIENT_PENDING_COMMAND) c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) c->flag.pending_command = 0;
unblockClient(c, 1);
}

View File

@ -985,7 +985,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
if (!c->flag.multi) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
@ -1048,7 +1048,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsPrimary(myself) || c->flags & CLIENT_READONLY) {
if (clusterNodeIsPrimary(myself) || c->flag.readonly) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
@ -1143,7 +1143,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot && (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING)) {
if (importing_slot && (c->flag.asking || cmd_flags & CMD_ASKING)) {
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
@ -1157,7 +1157,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
if ((c->flag.readonly || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
}
@ -1213,8 +1213,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flags & CLIENT_BLOCKED && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
dictEntry *de;
dictIterator *di;
@ -1240,7 +1240,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
/* if the client is read-only and attempting to access key that our
* replica can handle, allow it. */
if ((c->flags & CLIENT_READONLY) && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsReplica(myself) &&
if (c->flag.readonly && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == node) {
node = myself;
}
@ -1443,7 +1443,7 @@ void askingCommand(client *c) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_ASKING;
c->flag.asking = 1;
addReply(c, shared.ok);
}
@ -1451,12 +1451,12 @@ void askingCommand(client *c) {
* In this mode replica will not redirect clients as long as clients access
* with read-only commands to keys that are served by the replica's primary. */
void readonlyCommand(client *c) {
c->flags |= CLIENT_READONLY;
c->flag.readonly = 1;
addReply(c, shared.ok);
}
/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
c->flags &= ~CLIENT_READONLY;
c->flag.readonly = 0;
addReply(c, shared.ok);
}

View File

@ -5904,7 +5904,7 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
int optarg_pos = 0;
/* Allow primaries to replicate "CLUSTER SETSLOT" */
if (!(c->flags & CLIENT_PRIMARY) && nodeIsReplica(myself)) {
if (!c->flag.primary && nodeIsReplica(myself)) {
addReplyError(c, "Please use SETSLOT only with masters.");
return 0;
}
@ -6028,7 +6028,7 @@ void clusterCommandSetSlot(client *c) {
* This ensures that all replicas have the latest topology information, enabling
* a reliable slot ownership transfer even if the primary node went down during
* the process. */
if (nodeIsPrimary(myself) && myself->num_replicas != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) {
if (nodeIsPrimary(myself) && myself->num_replicas != 0 && !c->flag.replication_done) {
/* Iterate through the list of replicas to check if there are any running
* a version older than 8.0.0. Replicas with versions older than 8.0.0 do
* not support the CLUSTER SETSLOT command on replicas. If such a replica
@ -6058,7 +6058,7 @@ void clusterCommandSetSlot(client *c) {
* ack the repl offset at the command boundary. */
blockClientForReplicaAck(c, timeout_ms, server.primary_repl_offset + 1, myself->num_replicas, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
c->flag.pending_command = 1;
replicationRequestAckFromReplicas();
return;
}

View File

@ -118,7 +118,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH &&
if (server.current_client && server.current_client->flag.no_touch &&
server.current_client->cmd->proc != touchCommand)
flags |= LOOKUP_NOTOUCH;
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) {
@ -231,8 +231,7 @@ int getKeySlot(sds key) {
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 &&
server.current_client->flags & CLIENT_EXECUTING_COMMAND) {
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
@ -822,7 +821,7 @@ void keysCommand(client *c) {
numkeys++;
}
}
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
if (kvs_di) kvstoreReleaseDictIterator(kvs_di);
if (kvs_it) kvstoreIteratorRelease(kvs_it);
@ -1238,7 +1237,7 @@ void shutdownCommand(client *c) {
return;
}
if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
if (!(flags & SHUTDOWN_NOW) && c->flag.deny_blocking) {
addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
return;
}
@ -1667,7 +1666,7 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {
}
int writable_replica = server.primary_host && server.repl_replica_ro == 0;
if (c && writable_replica && !(c->flags & CLIENT_PRIMARY)) rememberReplicaKeyWithExpire(db, key);
if (c && writable_replica && !c->flag.primary) rememberReplicaKeyWithExpire(db, key);
}
/* Return the expire time of the specified key, or -1 if no expire
@ -1796,7 +1795,7 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
* When replicating commands from the primary, keys are never considered
* expired. */
if (server.primary_host != NULL) {
if (server.current_client && (server.current_client->flags & CLIENT_PRIMARY)) return KEY_VALID;
if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}

View File

@ -804,12 +804,12 @@ void debugCommand(client *c) {
addReplyError(c, "RESP2 is not supported by this command");
return;
}
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
addReplyPushLen(c, 2);
addReplyBulkCString(c, "server-cpu-usage");
addReplyLongLong(c, 42);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
/* Push replies are not synchronous replies, so we emit also a
* normal reply in order for blocking clients just discarding the
* push reply, to actually consume the reply and continue. */
@ -1026,7 +1026,7 @@ void _serverAssertPrintClientInfo(const client *c) {
bugReportStart();
serverLog(LL_WARNING, "=== ASSERTION FAILED CLIENT CONTEXT ===");
serverLog(LL_WARNING, "client->flags = %llu", (unsigned long long)c->flags);
serverLog(LL_WARNING, "client->flags = %llu", (unsigned long long)c->raw_flag);
serverLog(LL_WARNING, "client->conn = %s", connGetInfo(c->conn, conninfo, sizeof(conninfo)));
serverLog(LL_WARNING, "client->argc = %d", c->argc);
for (j = 0; j < c->argc; j++) {

View File

@ -258,10 +258,10 @@ void scriptingInit(int setup) {
* by scriptingReset(). */
if (lctx.lua_client == NULL) {
lctx.lua_client = createClient(NULL);
lctx.lua_client->flags |= CLIENT_SCRIPT;
lctx.lua_client->flag.script = 1;
/* We do not want to allow blocking commands inside Lua */
lctx.lua_client->flags |= CLIENT_DENY_BLOCKING;
lctx.lua_client->flag.deny_blocking = 1;
}
/* Lock the global table from any changes */
@ -630,7 +630,7 @@ void evalCommand(client *c) {
/* Explicitly feed monitor here so that lua commands appear after their
* script command. */
replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc);
if (!(c->flags & CLIENT_LUA_DEBUG))
if (!c->flag.lua_debug)
evalGenericCommand(c, 0);
else
evalGenericCommandWithDebugging(c, 0);
@ -652,7 +652,7 @@ void evalShaCommand(client *c) {
addReplyErrorObject(c, shared.noscripterr);
return;
}
if (!(c->flags & CLIENT_LUA_DEBUG))
if (!c->flag.lua_debug)
evalGenericCommand(c, 1);
else {
addReplyError(c, "Please use EVAL instead of EVALSHA for debugging");
@ -732,7 +732,7 @@ NULL
} else if (!strcasecmp(c->argv[2]->ptr, "sync")) {
ldbEnable(c);
addReply(c, shared.ok);
c->flags |= CLIENT_LUA_DEBUG_SYNC;
c->flag.lua_debug_sync = 1;
} else {
addReplyError(c, "Use SCRIPT DEBUG YES/SYNC/NO");
return;
@ -794,7 +794,7 @@ int ldbIsEnabled(void) {
/* Enable debug mode of Lua scripts for this client. */
void ldbEnable(client *c) {
c->flags |= CLIENT_LUA_DEBUG;
c->flag.lua_debug = 1;
ldbFlushLog(ldb.logs);
ldb.conn = c->conn;
ldb.step = 1;
@ -810,7 +810,8 @@ void ldbEnable(client *c) {
* to properly shut down a client debugging session, see ldbEndSession()
* for more information. */
void ldbDisable(client *c) {
c->flags &= ~(CLIENT_LUA_DEBUG | CLIENT_LUA_DEBUG_SYNC);
c->flag.lua_debug = 0;
c->flag.lua_debug_sync = 0;
}
/* Append a log entry to the specified LDB log. */
@ -871,7 +872,7 @@ void ldbSendLogs(void) {
* The caller should call ldbEndSession() only if ldbStartSession()
* returned 1. */
int ldbStartSession(client *c) {
ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0;
ldb.forked = !c->flag.lua_debug_sync;
if (ldb.forked) {
pid_t cp = serverFork(CHILD_TYPE_LDB);
if (cp == -1) {
@ -940,7 +941,7 @@ void ldbEndSession(client *c) {
/* Close the client connection after sending the final EVAL reply
* in order to signal the end of the debugging session. */
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
c->flag.close_after_reply = 1;
/* Cleanup. */
sdsfreesplitres(ldb.src, ldb.lines);

View File

@ -406,7 +406,8 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) {
}
client *c = createClient(NULL);
c->flags |= (CLIENT_DENY_BLOCKING | CLIENT_SCRIPT);
c->flag.deny_blocking = 1;
c->flag.script = 1;
engineInfo *ei = zmalloc(sizeof(*ei));
*ei = (engineInfo){
.name = engine_name_sds,

View File

@ -78,7 +78,7 @@ static int reqresShouldLog(client *c) {
if (!server.req_res_logfile) return 0;
/* Ignore client with streaming non-standard response */
if (c->flags & (CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_REPLICA)) return 0;
if (c->flag.pubsub || c->flag.monitor || c->flag.replica) return 0;
/* We only work on primaries (didn't implement reqresAppendResponse to work on shared replica buffers) */
if (getClientType(c) == CLIENT_TYPE_PRIMARY) return 0;

View File

@ -654,7 +654,7 @@ client *moduleAllocTempClient(void) {
if (moduleTempClientCount < moduleTempClientMinCount) moduleTempClientMinCount = moduleTempClientCount;
} else {
c = createClient(NULL);
c->flags |= CLIENT_MODULE;
c->flag.module = 1;
c->user = NULL; /* Root user */
}
return c;
@ -681,7 +681,8 @@ void moduleReleaseTempClient(client *c) {
c->duration = 0;
resetClient(c);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->raw_flag = 0;
c->flag.module = 1;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
@ -3638,11 +3639,11 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
ValkeyModuleClientInfoV1 *ci1 = ci;
memset(ci1, 0, sizeof(*ci1));
ci1->version = structver;
if (client->flags & CLIENT_MULTI) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_MULTI;
if (client->flags & CLIENT_PUBSUB) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_PUBSUB;
if (client->flags & CLIENT_UNIX_SOCKET) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_UNIXSOCKET;
if (client->flags & CLIENT_TRACKING) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_TRACKING;
if (client->flags & CLIENT_BLOCKED) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_BLOCKED;
if (client->flag.multi) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_MULTI;
if (client->flag.pubsub) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_PUBSUB;
if (client->flag.unix_socket) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_UNIXSOCKET;
if (client->flag.tracking) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_TRACKING;
if (client->flag.blocked) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_BLOCKED;
if (client->conn->type == connectionTypeTls()) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_SSL;
int port;
@ -3853,9 +3854,9 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
/* Client specific flags */
if (ctx) {
if (ctx->client) {
if (ctx->client->flags & CLIENT_DENY_BLOCKING) flags |= VALKEYMODULE_CTX_FLAGS_DENY_BLOCKING;
if (ctx->client->flag.deny_blocking) flags |= VALKEYMODULE_CTX_FLAGS_DENY_BLOCKING;
/* Module command received from PRIMARY, is replicated. */
if (ctx->client->flags & CLIENT_PRIMARY) flags |= VALKEYMODULE_CTX_FLAGS_REPLICATED;
if (ctx->client->flag.primary) flags |= VALKEYMODULE_CTX_FLAGS_REPLICATED;
if (ctx->client->resp == 3) {
flags |= VALKEYMODULE_CTX_FLAGS_RESP3;
}
@ -3863,7 +3864,7 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
/* For DIRTY flags, we need the blocked client if used */
client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client;
if (c && (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC))) {
if (c && (c->flag.dirty_cas || c->flag.dirty_exec)) {
flags |= VALKEYMODULE_CTX_FLAGS_MULTI_DIRTY;
}
}
@ -5955,8 +5956,7 @@ int VM_CallReplyPromiseAbort(ValkeyModuleCallReply *reply, void **private_data)
ValkeyModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
if (!promise->c)
return VALKEYMODULE_ERR; /* Promise can not be aborted, either already aborted or already finished. */
if (!(promise->c->flags & CLIENT_BLOCKED))
return VALKEYMODULE_ERR; /* Client is not blocked anymore, can not abort it. */
if (!(promise->c->flag.blocked)) return VALKEYMODULE_ERR; /* Client is not blocked anymore, can not abort it. */
/* Client is still blocked, remove it from any blocking state and release it. */
if (private_data) *private_data = promise->private_data;
@ -6227,7 +6227,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
if (!(flags & VALKEYMODULE_ARGV_ALLOW_BLOCK)) {
/* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING;
c->flag.deny_blocking = 1;
}
c->db = ctx->client->db;
c->argv = argv;
@ -6324,7 +6324,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
}
} else {
/* if we aren't OOM checking in VM_Call, we want further executions from this client to also not fail on OOM */
c->flags |= CLIENT_ALLOW_OOM;
c->flag.allow_oom = 1;
}
if (flags & VALKEYMODULE_ARGV_NO_WRITES) {
@ -6422,8 +6422,8 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
if (server.cluster_enabled && !mustObeyClient(ctx->client)) {
int error_code;
/* Duplicate relevant flags in the module client. */
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
c->flags |= ctx->client->flags & (CLIENT_READONLY | CLIENT_ASKING);
c->flag.readonly = ctx->client->flag.readonly;
c->flag.asking = ctx->client->flag.asking;
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, &error_code) != getMyClusterNode()) {
sds msg = NULL;
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
@ -6474,7 +6474,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
call(c, call_flags);
server.replication_allowed = prev_replication_allowed;
if (c->flags & CLIENT_BLOCKED) {
if (c->flag.blocked) {
serverAssert(flags & VALKEYMODULE_ARGV_ALLOW_BLOCK);
serverAssert(ctx->module);
ValkeyModuleAsyncRMCallPromise *promise = zmalloc(sizeof(ValkeyModuleAsyncRMCallPromise));
@ -6492,11 +6492,11 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const
c->bstate.async_rm_call_handle = promise;
if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) {
/* No need for AOF propagation, set the relevant flags of the client */
c->flags |= CLIENT_MODULE_PREVENT_AOF_PROP;
c->flag.module_prevent_aof_prop = 1;
}
if (!(call_flags & CMD_CALL_PROPAGATE_REPL)) {
/* No need for replication propagation, set the relevant flags of the client */
c->flags |= CLIENT_MODULE_PREVENT_REPL_PROP;
c->flag.module_prevent_repl_prop = 1;
}
c = NULL; /* Make sure not to free the client */
} else {
@ -7847,7 +7847,7 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) {
continue;
}
/* Remove the module auth complete flag before we attempt the next cb. */
c->flags &= ~CLIENT_MODULE_AUTH_HAS_RESULT;
c->flag.module_auth_has_result = 0;
ValkeyModuleCtx ctx;
moduleCreateContext(&ctx, cur_auth_ctx->module, VALKEYMODULE_CTX_NONE);
ctx.client = c;
@ -7905,19 +7905,20 @@ int checkModuleAuthentication(client *c, robj *username, robj *password, robj **
if (result == VALKEYMODULE_AUTH_NOT_HANDLED) {
result = attemptNextAuthCb(c, username, password, err);
}
if (c->flags & CLIENT_BLOCKED) {
if (c->flag.blocked) {
/* Modules are expected to return VALKEYMODULE_AUTH_HANDLED when blocking clients. */
serverAssert(result == VALKEYMODULE_AUTH_HANDLED);
return AUTH_BLOCKED;
}
c->module_auth_ctx = NULL;
if (result == VALKEYMODULE_AUTH_NOT_HANDLED) {
c->flags &= ~CLIENT_MODULE_AUTH_HAS_RESULT;
c->flag.module_auth_has_result = 0;
return AUTH_NOT_HANDLED;
}
if (c->flags & CLIENT_MODULE_AUTH_HAS_RESULT) {
c->flags &= ~CLIENT_MODULE_AUTH_HAS_RESULT;
if (c->flags & CLIENT_AUTHENTICATED) return AUTH_OK;
if (c->flag.module_auth_has_result) {
c->flag.module_auth_has_result = 0;
if (c->flag.authenticated) return AUTH_OK;
}
return AUTH_ERR;
}
@ -8010,8 +8011,8 @@ ValkeyModuleBlockedClient *VM_BlockClientOnAuth(ValkeyModuleCtx *ctx,
}
ValkeyModuleBlockedClient *bc =
moduleBlockClient(ctx, NULL, reply_callback, NULL, free_privdata, 0, NULL, 0, NULL, 0);
if (ctx->client->flags & CLIENT_BLOCKED) {
ctx->client->flags |= CLIENT_PENDING_COMMAND;
if (ctx->client->flag.blocked) {
ctx->client->flag.pending_command = 1;
}
return bc;
}
@ -8298,9 +8299,8 @@ void moduleHandleBlockedClients(void) {
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE) &&
c->conn) {
c->flags |= CLIENT_PENDING_WRITE;
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) && !c->flag.pending_write && c->conn) {
c->flag.pending_write = 1;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}
@ -9465,11 +9465,11 @@ void revokeClientAuthentication(client *c) {
moduleNotifyUserChanged(c);
c->user = DefaultUser;
c->flags &= ~CLIENT_AUTHENTICATED;
c->flag.authenticated = 0;
/* We will write replies to this client later, so we can't close it
* directly even if async. */
if (c == server.current_client) {
c->flags |= CLIENT_CLOSE_AFTER_COMMAND;
c->flag.close_after_command = 1;
} else {
freeClientAsync(c);
}
@ -9780,17 +9780,17 @@ static int authenticateClientWithUser(ValkeyModuleCtx *ctx,
}
/* Avoid settings which are meaningless and will be lost */
if (!ctx->client || (ctx->client->flags & CLIENT_MODULE)) {
if (!ctx->client || (ctx->client->flag.module)) {
return VALKEYMODULE_ERR;
}
moduleNotifyUserChanged(ctx->client);
ctx->client->user = user;
ctx->client->flags |= CLIENT_AUTHENTICATED;
ctx->client->flag.authenticated = 1;
if (clientHasModuleAuthInProgress(ctx->client)) {
ctx->client->flags |= CLIENT_MODULE_AUTH_HAS_RESULT;
ctx->client->flag.module_auth_has_result = 1;
}
if (callback) {

View File

@ -63,7 +63,7 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
* aborted. */
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) return;
if (c->flag.dirty_cas || c->flag.dirty_exec) return;
if (c->mstate.count == 0) {
/* If a client is using multi/exec, assuming it is used to execute at least
* two commands. Hence, creating by default size of 2. */
@ -96,28 +96,30 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
void discardTransaction(client *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI | CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC);
c->flag.multi = 0;
c->flag.dirty_cas = 0;
c->flag.dirty_exec = 0;
unwatchAllKeys(c);
}
/* Flag the transaction as DIRTY_EXEC so that EXEC will fail.
* Should be called every time there is an error while queueing a command. */
void flagTransaction(client *c) {
if (c->flags & CLIENT_MULTI) c->flags |= CLIENT_DIRTY_EXEC;
if (c->flag.multi) c->flag.dirty_exec = 1;
}
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
if (c->flag.multi) {
addReplyError(c, "MULTI calls can not be nested");
return;
}
c->flags |= CLIENT_MULTI;
c->flag.multi = 1;
addReply(c, shared.ok);
}
void discardCommand(client *c) {
if (!(c->flags & CLIENT_MULTI)) {
if (!c->flag.multi) {
addReplyError(c, "DISCARD without MULTI");
return;
}
@ -148,14 +150,14 @@ void execCommand(client *c) {
int orig_argc, orig_argv_len;
struct serverCommand *orig_cmd;
if (!(c->flags & CLIENT_MULTI)) {
if (!c->flag.multi) {
addReplyError(c, "EXEC without MULTI");
return;
}
/* EXEC with expired watched key is disallowed*/
if (isWatchedKeyExpired(c)) {
c->flags |= (CLIENT_DIRTY_CAS);
c->flag.dirty_cas = 1;
}
/* Check if we need to abort the EXEC because:
@ -164,8 +166,8 @@ void execCommand(client *c) {
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
if (c->flags & CLIENT_DIRTY_EXEC) {
if (c->flag.dirty_cas || c->flag.dirty_exec) {
if (c->flag.dirty_exec) {
addReplyErrorObject(c, shared.execaborterr);
} else {
addReply(c, shared.nullarray[c->resp]);
@ -175,10 +177,10 @@ void execCommand(client *c) {
return;
}
uint64_t old_flags = c->flags;
struct ClientFlags old_flags = c->flag;
/* we do not want to allow blocking commands inside multi */
c->flags |= CLIENT_DENY_BLOCKING;
c->flag.deny_blocking = 1;
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
@ -224,7 +226,7 @@ void execCommand(client *c) {
else
call(c, CMD_CALL_FULL);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
serverAssert(c->flag.blocked == 0);
}
/* Commands may alter argc/argv, restore mstate. */
@ -235,7 +237,7 @@ void execCommand(client *c) {
}
// restore old DENY_BLOCKING value
if (!(old_flags & CLIENT_DENY_BLOCKING)) c->flags &= ~CLIENT_DENY_BLOCKING;
if (!(old_flags.deny_blocking)) c->flag.deny_blocking = 0;
c->argv = orig_argv;
c->argv_len = orig_argv_len;
@ -393,7 +395,7 @@ void touchWatchedKey(serverDb *db, robj *key) {
break;
}
c->flags |= CLIENT_DIRTY_CAS;
c->flag.dirty_cas = 1;
/* As the client is marked as dirty, there is no point in getting here
* again in case that key (or others) are modified again (or keep the
* memory overhead till EXEC). */
@ -444,7 +446,7 @@ void touchAllWatchedKeysInDb(serverDb *emptied, serverDb *replaced_with) {
continue;
}
client *c = wk->client;
c->flags |= CLIENT_DIRTY_CAS;
c->flag.dirty_cas = 1;
/* Note - we could potentially call unwatchAllKeys for this specific client in order to reduce
* the total number of iterations. BUT this could also free the current next entry pointer
* held by the iterator and can lead to use-after-free. */
@ -457,12 +459,12 @@ void touchAllWatchedKeysInDb(serverDb *emptied, serverDb *replaced_with) {
void watchCommand(client *c) {
int j;
if (c->flags & CLIENT_MULTI) {
if (c->flag.multi) {
addReplyError(c, "WATCH inside MULTI is not allowed");
return;
}
/* No point in watching if the client is already dirty. */
if (c->flags & CLIENT_DIRTY_CAS) {
if (c->flag.dirty_cas) {
addReply(c, shared.ok);
return;
}
@ -472,7 +474,7 @@ void watchCommand(client *c) {
void unwatchCommand(client *c) {
unwatchAllKeys(c);
c->flags &= (~CLIENT_DIRTY_CAS);
c->flag.dirty_cas = 0;
addReply(c, shared.ok);
}

View File

@ -105,18 +105,14 @@ static void clientSetDefaultAuth(client *c) {
/* If the default user does not require authentication, the user is
* directly authenticated. */
c->user = DefaultUser;
if ((c->user->flags & USER_FLAG_NOPASS) && !(c->user->flags & USER_FLAG_DISABLED)) {
c->flags |= CLIENT_AUTHENTICATED;
} else {
c->flags &= ~CLIENT_AUTHENTICATED;
}
c->flag.authenticated = (c->user->flags & USER_FLAG_NOPASS) && !(c->user->flags & USER_FLAG_DISABLED);
}
int authRequired(client *c) {
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || (DefaultUser->flags & USER_FLAG_DISABLED)) &&
!(c->flags & CLIENT_AUTHENTICATED);
!c->flag.authenticated;
return auth_required;
}
@ -167,7 +163,7 @@ client *createClient(connection *conn) {
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->raw_flag = 0;
c->capa = 0;
c->slot = -1;
c->ctime = c->last_interaction = server.unixtime;
@ -250,16 +246,15 @@ void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for replicas, if the replica can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->repl_state == REPL_STATE_NONE ||
(c->repl_state == REPLICA_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) {
if (!c->flag.pending_write && (c->repl_state == REPL_STATE_NONE ||
(c->repl_state == REPLICA_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack))) {
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
c->flag.pending_write = 1;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}
@ -289,18 +284,18 @@ void putClientInPendingWriteQueue(client *c) {
int prepareClientToWrite(client *c) {
/* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */
if (c->flags & (CLIENT_SCRIPT | CLIENT_MODULE)) return C_OK;
if (c->flag.script || c->flag.module) return C_OK;
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
if (c->flag.close_asap) return C_ERR;
/* CLIENT REPLY OFF / SKIP handling: don't send replies.
* CLIENT_PUSHING handling: disables the reply silencing flags. */
if ((c->flags & (CLIENT_REPLY_OFF | CLIENT_REPLY_SKIP)) && !(c->flags & CLIENT_PUSHING)) return C_ERR;
if ((c->flag.reply_off || c->flag.reply_skip) && !c->flag.pushing) return C_ERR;
/* Primaries don't receive replies, unless CLIENT_PRIMARY_FORCE_REPLY flag
* is set. */
if ((c->flags & CLIENT_PRIMARY) && !(c->flags & CLIENT_PRIMARY_FORCE_REPLY)) return C_ERR;
if (c->flag.primary && !c->flag.primary_force_reply) return C_ERR;
if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
@ -430,7 +425,7 @@ int cmdHasPushAsReply(struct serverCommand *cmd) {
}
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
if (c->flag.close_after_reply) return;
/* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
* replication link that caused a reply to be generated we'll simply disconnect it.
@ -453,7 +448,7 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
* The check for executing_client also avoids affecting push messages that are part of eviction.
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
if ((c->flags & CLIENT_PUSHING) && c == server.current_client && server.executing_client &&
if (c->flag.pushing && c == server.current_client && server.executing_client &&
!cmdHasPushAsReply(server.executing_client->cmd)) {
_addReplyProtoToList(c, server.pending_push_messages, s, len);
return;
@ -536,7 +531,7 @@ void afterErrorReply(client *c, const char *s, size_t len, int flags) {
* Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted.
* Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread
* later. */
if (c->flags & CLIENT_MODULE) {
if (c->flag.module) {
if (!c->deferred_reply_errors) {
c->deferred_reply_errors = listCreate();
listSetFreeMethod(c->deferred_reply_errors, (void (*)(void *))sdsfree);
@ -1034,7 +1029,7 @@ void addReplyAttributeLen(client *c, long length) {
void addReplyPushLen(client *c, long length) {
serverAssert(c->resp >= 3);
serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
serverAssertWithInfo(c, NULL, c->flag.pushing);
addReplyAggregateLen(c, length, '>');
}
@ -1213,7 +1208,7 @@ void AddReplyFromClient(client *dst, client *src) {
* reply. We don't wanna run the risk of copying partial response in case
* for some reason the output limits don't reach the same decision (maybe
* they changed) */
if (src->flags & CLIENT_CLOSE_ASAP) {
if (src->flag.close_asap) {
sds client = catClientInfoString(sdsempty(), dst);
freeClientAsync(dst);
serverLog(LL_WARNING, "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
@ -1230,7 +1225,7 @@ void AddReplyFromClient(client *dst, client *src) {
/* We're bypassing _addReplyProtoToList, so we need to add the pre/post
* checks in it. */
if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
if (dst->flag.close_after_reply) return;
/* Concatenate the reply list into the dest */
if (listLength(src->reply)) listJoin(dst->reply, src->reply);
@ -1340,7 +1335,7 @@ void clientAcceptHandler(connection *conn) {
moduleFireServerEvent(VALKEYMODULE_EVENT_CLIENT_CHANGE, VALKEYMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED, c);
}
void acceptCommonHandler(connection *conn, int flags, char *ip) {
void acceptCommonHandler(connection *conn, struct ClientFlags flags, char *ip) {
client *c;
UNUSED(ip);
@ -1392,7 +1387,7 @@ void acceptCommonHandler(connection *conn, int flags, char *ip) {
}
/* Last chance to keep flags */
c->flags |= flags;
if (flags.unix_socket) c->flag.unix_socket = 1;
/* Initiate accept.
*
@ -1484,7 +1479,7 @@ void unlinkClient(client *c) {
/* Check if this is a replica waiting for diskless replication (rdb pipe),
* in which case it needs to be cleaned from that list */
if (c->flags & CLIENT_REPLICA && c->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && server.rdb_pipe_conns) {
if (c->flag.replica && c->repl_state == REPLICA_STATE_WAIT_BGSAVE_END && server.rdb_pipe_conns) {
int i;
for (i = 0; i < server.rdb_pipe_numconns; i++) {
if (server.rdb_pipe_conns[i] == c->conn) {
@ -1501,10 +1496,10 @@ void unlinkClient(client *c) {
}
/* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) {
if (c->flag.pending_write) {
serverAssert(&c->clients_pending_write_node.next != NULL || &c->clients_pending_write_node.prev != NULL);
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flags &= ~CLIENT_PENDING_WRITE;
c->flag.pending_write = 0;
}
/* Remove from the list of pending reads if needed. */
@ -1517,15 +1512,15 @@ void unlinkClient(client *c) {
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
if (c->flag.unblocked) {
ln = listSearchKey(server.unblocked_clients, c);
serverAssert(ln != NULL);
listDelNode(server.unblocked_clients, ln);
c->flags &= ~CLIENT_UNBLOCKED;
c->flag.unblocked = 0;
}
/* Clear the tracking status. */
if (c->flags & CLIENT_TRACKING) disableTracking(c);
if (c->flag.tracking) disableTracking(c);
}
/* Clear the client state to resemble a newly connected client. */
@ -1535,17 +1530,18 @@ void clearClientConnectionState(client *c) {
/* MONITOR clients are also marked with CLIENT_REPLICA, we need to
* distinguish between the two.
*/
if (c->flags & CLIENT_MONITOR) {
if (c->flag.monitor) {
ln = listSearchKey(server.monitors, c);
serverAssert(ln != NULL);
listDelNode(server.monitors, ln);
c->flags &= ~(CLIENT_MONITOR | CLIENT_REPLICA);
c->flag.monitor = 0;
c->flag.replica = 0;
}
serverAssert(!(c->flags & (CLIENT_REPLICA | CLIENT_PRIMARY)));
serverAssert(!(c->flag.replica || c->flag.primary));
if (c->flags & CLIENT_TRACKING) disableTracking(c);
if (c->flag.tracking) disableTracking(c);
selectDb(c, 0);
#ifdef LOG_REQ_RES
c->resp = server.client_default_resp;
@ -1571,8 +1567,12 @@ void clearClientConnectionState(client *c) {
* represent the client library behind the connection. */
/* Selectively clear state flags not covered above */
c->flags &= ~(CLIENT_ASKING | CLIENT_READONLY | CLIENT_REPLY_OFF | CLIENT_REPLY_SKIP_NEXT | CLIENT_NO_TOUCH |
CLIENT_NO_EVICT);
c->flag.asking = 0;
c->flag.readonly = 0;
c->flag.reply_off = 0;
c->flag.reply_skip_next = 0;
c->flag.no_touch = 0;
c->flag.no_evict = 0;
}
void freeClient(client *c) {
@ -1580,7 +1580,7 @@ void freeClient(client *c) {
/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED) {
if (c->flag.protected) {
freeClientAsync(c);
return;
}
@ -1600,7 +1600,7 @@ void freeClient(client *c) {
* from the queue. Note that we need to do this here, because later
* we may call replicationCachePrimary() and the client should already
* be removed from the list of clients to free. */
if (c->flags & CLIENT_CLOSE_ASAP) {
if (c->flag.close_asap) {
ln = listSearchKey(server.clients_to_close, c);
serverAssert(ln != NULL);
listDelNode(server.clients_to_close, ln);
@ -1611,10 +1611,11 @@ void freeClient(client *c) {
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.primary && c->flags & CLIENT_PRIMARY) {
if (server.primary && c->flag.primary) {
serverLog(LL_NOTICE, "Connection with primary lost.");
if (!(c->flags & (CLIENT_PROTOCOL_ERROR | CLIENT_BLOCKED))) {
c->flags &= ~(CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY);
if (!(c->flag.protocol_error || c->flag.blocked)) {
c->flag.close_asap = 0;
c->flag.close_after_reply = 0;
replicationCachePrimary(c);
return;
}
@ -1636,7 +1637,7 @@ void freeClient(client *c) {
/* Deallocate structures used to block on blocking ops. */
/* If there is any in-flight command, we don't record their duration. */
c->duration = 0;
if (c->flags & CLIENT_BLOCKED) unblockClient(c, 1);
if (c->flag.blocked) unblockClient(c, 1);
dictRelease(c->bstate.keys);
/* UNWATCH all the keys */
@ -1674,7 +1675,7 @@ void freeClient(client *c) {
/* Primary/replica cleanup Case 1:
* we lost the connection with a replica. */
if (c->flags & CLIENT_REPLICA) {
if (c->flag.replica) {
/* If there is no any other replica waiting dumping RDB finished, the
* current child process need not continue to dump RDB, then we kill it.
* So child process won't use more memory, and we also can fork a new
@ -1691,7 +1692,7 @@ void freeClient(client *c) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.replicas;
list *l = (c->flag.monitor) ? server.monitors : server.replicas;
ln = listSearchKey(l, c);
serverAssert(ln != NULL);
listDelNode(l, ln);
@ -1709,7 +1710,7 @@ void freeClient(client *c) {
/* Primary/replica cleanup Case 2:
* we lost the connection with the primary. */
if (c->flags & CLIENT_PRIMARY) replicationHandlePrimaryDisconnection();
if (c->flag.primary) replicationHandlePrimaryDisconnection();
/* Remove client from memory usage buckets */
if (c->mem_usage_bucket) {
@ -1739,8 +1740,8 @@ void freeClientAsync(client *c) {
* may access the list while the server uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_SCRIPT) return;
c->flags |= CLIENT_CLOSE_ASAP;
if (c->flag.close_asap || c->flag.script) return;
c->flag.close_asap = 1;
if (server.io_threads_num == 1) {
/* no need to bother with locking if there's just one thread (the main thread) */
listAddNodeTail(server.clients_to_close, c);
@ -1788,7 +1789,7 @@ int beforeNextClient(client *c) {
* cases where we want an async free of a client other than myself. For example
* in ACL modifications we disconnect clients authenticated to non-existent
* users (see ACL LOAD). */
if (c && (c->flags & CLIENT_CLOSE_ASAP)) {
if (c && (c->flag.close_asap)) {
freeClient(c);
return C_ERR;
}
@ -1806,9 +1807,9 @@ int freeClientsInAsyncFreeQueue(void) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_PROTECTED) continue;
if (c->flag.protected) continue;
c->flags &= ~CLIENT_CLOSE_ASAP;
c->flag.close_asap = 0;
freeClient(c);
listDelNode(server.clients_to_close, ln);
freed++;
@ -1986,7 +1987,7 @@ int writeToClient(client *c, int handler_installed) {
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_REPLICA))
(server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !c->flag.replica)
break;
}
@ -2009,7 +2010,7 @@ int writeToClient(client *c, int handler_installed) {
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_PRIMARY)) c->last_interaction = server.unixtime;
if (!c->flag.primary) c->last_interaction = server.unixtime;
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
@ -2023,7 +2024,7 @@ int writeToClient(client *c, int handler_installed) {
}
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
if (c->flag.close_after_reply) {
freeClientAsync(c);
return C_ERR;
}
@ -2053,15 +2054,15 @@ int handleClientsWithPendingWrites(void) {
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
if (c->flag.protected) continue;
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
if (c->flag.close_asap) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c, 0) == C_ERR) continue;
@ -2085,7 +2086,8 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE);
c->flag.executing_command = 0;
c->flag.replication_done = 0;
/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
@ -2098,20 +2100,20 @@ void resetClient(client *c) {
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) c->flags &= ~CLIENT_ASKING;
if (!c->flag.multi && prevcmd != askingCommand) c->flag.asking = 0;
/* We do the same for the CACHING command as well. It also affects
* the next command or transaction executed, in a way very similar
* to ASKING. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) c->flags &= ~CLIENT_TRACKING_CACHING;
if (!c->flag.multi && prevcmd != clientCommand) c->flag.tracking_caching = 0;
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
* to the next command will be sent, but set the flag if the command
* we just processed was "CLIENT REPLY SKIP". */
c->flags &= ~CLIENT_REPLY_SKIP;
if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
c->flags |= CLIENT_REPLY_SKIP;
c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
c->flag.reply_skip = 0;
if (c->flag.reply_skip_next) {
c->flag.reply_skip = 1;
c->flag.reply_skip_next = 0;
}
}
@ -2171,7 +2173,7 @@ void trimClientQueryBuffer(client *c) {
* 2) Moreover it makes sure that if the client is freed in a different code
* path, it is not really released, but only marked for later release. */
void protectClient(client *c) {
c->flags |= CLIENT_PROTECTED;
c->flag.protected = 1;
if (c->conn) {
connSetReadHandler(c->conn, NULL);
connSetWriteHandler(c->conn, NULL);
@ -2180,8 +2182,8 @@ void protectClient(client *c) {
/* This will undo the client protection done by protectClient() */
void unprotectClient(client *c) {
if (c->flags & CLIENT_PROTECTED) {
c->flags &= ~CLIENT_PROTECTED;
if (c->flag.protected) {
c->flag.protected = 0;
if (c->conn) {
connSetReadHandler(c->conn, readQueryFromClient);
if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
@ -2240,7 +2242,7 @@ int processInlineBuffer(client *c) {
*
* However there is an exception: primaries may send us just a newline
* to keep the connection active. */
if (querylen != 0 && c->flags & CLIENT_PRIMARY) {
if (querylen != 0 && c->flag.primary) {
sdsfreesplitres(argv, argc);
serverLog(LL_WARNING, "WARNING: Receiving inline protocol from primary, primary stream corruption? Closing the "
"primary connection and discarding the cached primary.");
@ -2274,7 +2276,7 @@ int processInlineBuffer(client *c) {
* CLIENT_PROTOCOL_ERROR. */
#define PROTO_DUMP_LEN 128
static void setProtocolError(const char *errstr, client *c) {
if (server.verbosity <= LL_VERBOSE || c->flags & CLIENT_PRIMARY) {
if (server.verbosity <= LL_VERBOSE || c->flag.primary) {
sds client = catClientInfoString(sdsempty(), c);
/* Sample some protocol to given an idea about what was inside. */
@ -2295,11 +2297,12 @@ static void setProtocolError(const char *errstr, client *c) {
}
/* Log all the client and protocol info. */
int loglevel = (c->flags & CLIENT_PRIMARY) ? LL_WARNING : LL_VERBOSE;
int loglevel = (c->flag.primary) ? LL_WARNING : LL_VERBOSE;
serverLog(loglevel, "Protocol error (%s) from client: %s. %s", errstr, client, buf);
sdsfree(client);
}
c->flags |= (CLIENT_CLOSE_AFTER_REPLY | CLIENT_PROTOCOL_ERROR);
c->flag.close_after_reply = 1;
c->flag.protocol_error = 1;
}
/* Process the query buffer for client 'c', setting up the client argument
@ -2386,7 +2389,7 @@ int processMultibulkBuffer(client *c) {
}
ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
if (!ok || ll < 0 || (!(c->flags & CLIENT_PRIMARY) && ll > server.proto_max_bulk_len)) {
if (!ok || ll < 0 || (!c->flag.primary && ll > server.proto_max_bulk_len)) {
addReplyError(c, "Protocol error: invalid bulk length");
setProtocolError("invalid bulk length", c);
return C_ERR;
@ -2397,7 +2400,7 @@ int processMultibulkBuffer(client *c) {
}
c->qb_pos = newline - c->querybuf + 2;
if (!(c->flags & CLIENT_PRIMARY) && ll >= PROTO_MBULK_BIG_ARG) {
if (!c->flag.primary && ll >= PROTO_MBULK_BIG_ARG) {
/* When the client is not a primary client (because primary
* client's querybuf can only be trimmed after data applied
* and sent to replicas).
@ -2443,7 +2446,7 @@ int processMultibulkBuffer(client *c) {
/* Optimization: if a non-primary client's buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (!(c->flags & CLIENT_PRIMARY) && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG &&
if (!c->flag.primary && c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen + 2)) {
c->argv[c->argc++] = createObject(OBJ_STRING, c->querybuf);
c->argv_len_sum += c->bulklen;
@ -2482,13 +2485,13 @@ void commandProcessed(client *c) {
* The client will be reset in unblockClient().
* 2. Don't update replication offset or propagate commands to replicas,
* since we have not applied the command. */
if (c->flags & CLIENT_BLOCKED) return;
if (c->flag.blocked) return;
reqresAppendResponse(c);
resetClient(c);
long long prev_offset = c->reploff;
if (c->flags & CLIENT_PRIMARY && !(c->flags & CLIENT_MULTI)) {
if (c->flag.primary && !c->flag.multi) {
/* Update the applied replication offset of our primary. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
@ -2499,7 +2502,7 @@ void commandProcessed(client *c) {
* applied to the primary state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
if (c->flags & CLIENT_PRIMARY) {
if (c->flag.primary) {
long long applied = c->reploff - prev_offset;
if (applied) {
replicationFeedStreamFromPrimaryStream(c->querybuf + c->repl_applied, applied);
@ -2551,8 +2554,8 @@ int processPendingCommandAndInputBuffer(client *c) {
* But in case of a module blocked client (see RM_Call 'K' flag) we do not reach this code path.
* So whenever we change the code here we need to consider if we need this change on module
* blocked client as well */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (c->flag.pending_command) {
c->flag.pending_command = 0;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
@ -2578,24 +2581,24 @@ int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
if (c->flag.blocked) break;
/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;
if (c->flag.pending_command) break;
/* Don't process input from the primary while there is a busy script
* condition on the replica. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (isInsideYieldingLongCommand() && c->flags & CLIENT_PRIMARY) break;
if (isInsideYieldingLongCommand() && c->flag.primary) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY | CLIENT_CLOSE_ASAP)) break;
if (c->flag.close_after_reply || c->flag.close_asap) break;
/* Determine request type when unknown. */
if (!c->reqtype) {
@ -2623,7 +2626,7 @@ int processInputBuffer(client *c) {
* as one that needs to process the command. */
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
c->flag.pending_command = 1;
break;
}
@ -2644,7 +2647,7 @@ int processInputBuffer(client *c) {
}
}
if (c->flags & CLIENT_PRIMARY) {
if (c->flag.primary) {
/* If the client is a primary, trim the querybuf to repl_applied,
* since primary client is very special, its querybuf not only
* used to parse command, but also proxy to sub-replicas.
@ -2705,7 +2708,7 @@ void readQueryFromClient(connection *conn) {
/* Primary client needs expand the readlen when meet BIG_ARG(see #9100),
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_PRIMARY && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN;
if (c->flag.primary && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN;
}
if (c->querybuf == NULL) {
@ -2714,7 +2717,7 @@ void readQueryFromClient(connection *conn) {
qblen = sdslen(c->querybuf);
}
if (!(c->flags & CLIENT_PRIMARY) && // primary client's querybuf can grow greedy.
if (!c->flag.primary && // primary client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
@ -2755,7 +2758,7 @@ void readQueryFromClient(connection *conn) {
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->last_interaction = server.unixtime;
if (c->flags & CLIENT_PRIMARY) {
if (c->flag.primary) {
c->read_reploff += nread;
atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread, memory_order_relaxed);
} else {
@ -2763,7 +2766,7 @@ void readQueryFromClient(connection *conn) {
}
c->net_input_bytes += nread;
if (!(c->flags & CLIENT_PRIMARY) &&
if (!c->flag.primary &&
/* The commands cached in the MULTI/EXEC queue have not been executed yet,
* so they are also considered a part of the query buffer in a broader sense.
*
@ -2806,7 +2809,7 @@ done:
* you want to relax error checking or need to display something anyway (see
* anetFdToString implementation for more info). */
void genClientAddrString(client *client, char *addr, size_t addr_len, int remote) {
if (client->flags & CLIENT_UNIX_SOCKET) {
if (client->flag.unix_socket) {
/* Unix socket client. */
snprintf(addr, addr_len, "%s:0", server.unixsocket);
} else {
@ -2849,27 +2852,29 @@ sds catClientInfoString(sds s, client *client) {
char flags[17], events[3], conninfo[CONN_INFO_LEN], *p;
p = flags;
if (client->flags & CLIENT_REPLICA) {
if (client->flags & CLIENT_MONITOR)
if (client->flag.replica) {
if (client->flag.monitor)
*p++ = 'O';
else
*p++ = 'S';
}
if (client->flags & CLIENT_PRIMARY) *p++ = 'M';
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
if (client->flags & CLIENT_MULTI) *p++ = 'x';
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
if (client->flags & CLIENT_TRACKING) *p++ = 't';
if (client->flags & CLIENT_TRACKING_BROKEN_REDIR) *p++ = 'R';
if (client->flags & CLIENT_TRACKING_BCAST) *p++ = 'B';
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
if (client->flags & CLIENT_READONLY) *p++ = 'r';
if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
if (client->flags & CLIENT_NO_TOUCH) *p++ = 'T';
/* clang-format off */
if (client->flag.primary) *p++ = 'M';
if (client->flag.pubsub) *p++ = 'P';
if (client->flag.multi) *p++ = 'x';
if (client->flag.blocked) *p++ = 'b';
if (client->flag.tracking) *p++ = 't';
if (client->flag.tracking_broken_redir) *p++ = 'R';
if (client->flag.tracking_bcast) *p++ = 'B';
if (client->flag.dirty_cas) *p++ = 'd';
if (client->flag.close_after_reply) *p++ = 'c';
if (client->flag.unblocked) *p++ = 'u';
if (client->flag.close_asap) *p++ = 'A';
if (client->flag.unix_socket) *p++ = 'U';
if (client->flag.readonly) *p++ = 'r';
if (client->flag.no_evict) *p++ = 'e';
if (client->flag.no_touch) *p++ = 'T';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@ -2904,7 +2909,7 @@ sds catClientInfoString(sds s, client *client) {
" sub=%i", (int) dictSize(client->pubsub_channels),
" psub=%i", (int) dictSize(client->pubsub_patterns),
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" multi=%i", (client->flag.multi) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
@ -2919,7 +2924,7 @@ sds catClientInfoString(sds s, client *client) {
" events=%s", events,
" cmd=%s", client->lastcmd ? client->lastcmd->fullname : "NULL",
" user=%s", client->user ? client->user->name : "(superuser)",
" redir=%I", (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
" redir=%I", (client->flag.tracking) ? (long long) client->client_tracking_redirection : -1,
" resp=%i", client->resp,
" lib-name=%s", client->lib_name ? (char*)client->lib_name->ptr : "",
" lib-ver=%s", client->lib_ver ? (char*)client->lib_ver->ptr : "",
@ -3044,10 +3049,13 @@ void resetCommand(client *c) {
/* MONITOR clients are also marked with CLIENT_REPLICA, we need to
* distinguish between the two.
*/
uint64_t flags = c->flags;
if (flags & CLIENT_MONITOR) flags &= ~(CLIENT_MONITOR | CLIENT_REPLICA);
struct ClientFlags flags = c->flag;
if (flags.monitor) {
flags.monitor = 0;
flags.replica = 0;
}
if (flags & (CLIENT_REPLICA | CLIENT_PRIMARY | CLIENT_MODULE)) {
if (flags.replica || flags.primary || flags.module) {
addReplyError(c, "can only reset normal client connections");
return;
}
@ -3059,7 +3067,7 @@ void resetCommand(client *c) {
/* Disconnect the current client */
void quitCommand(client *c) {
addReply(c, shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
c->flag.close_after_reply = 1;
}
void clientCommand(client *c) {
@ -3173,12 +3181,13 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr, "reply") && c->argc == 3) {
/* CLIENT REPLY ON|OFF|SKIP */
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flags &= ~(CLIENT_REPLY_SKIP | CLIENT_REPLY_OFF);
c->flag.reply_skip = 0;
c->flag.reply_off = 0;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flags |= CLIENT_REPLY_OFF;
c->flag.reply_off = 1;
} else if (!strcasecmp(c->argv[2]->ptr, "skip")) {
if (!(c->flags & CLIENT_REPLY_OFF)) c->flags |= CLIENT_REPLY_SKIP_NEXT;
if (!c->flag.reply_off) c->flag.reply_skip_next = 1;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
@ -3186,11 +3195,11 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr, "no-evict") && c->argc == 3) {
/* CLIENT NO-EVICT ON|OFF */
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flags |= CLIENT_NO_EVICT;
c->flag.no_evict = 1;
removeClientFromMemUsageBucket(c, 0);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flags &= ~CLIENT_NO_EVICT;
c->flag.no_evict = 0;
updateClientMemUsageAndBucket(c);
addReply(c, shared.ok);
} else {
@ -3308,7 +3317,7 @@ NULL
/* If this client has to be closed, flag it as CLOSE_AFTER_REPLY
* only after we queued the reply to its output buffers. */
if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY;
if (close_this_client) c->flag.close_after_reply = 1;
} else if (!strcasecmp(c->argv[1]->ptr, "unblock") && (c->argc == 3 || c->argc == 4)) {
/* CLIENT UNBLOCK <id> [timeout|error] */
long long id;
@ -3330,7 +3339,7 @@ NULL
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
* The reason is that we assume that if a command doesn't expect to be timedout,
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (target && target->flag.blocked && moduleBlockedClientMayTimeout(target)) {
if (unblock_error)
unblockClientOnError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
else
@ -3373,7 +3382,7 @@ NULL
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
* [PREFIX second] [OPTIN] [OPTOUT] [NOLOOP]... */
long long redir = 0;
uint64_t options = 0;
struct ClientFlags options = {0};
robj **prefix = NULL;
size_t numprefix = 0;
@ -3404,13 +3413,13 @@ NULL
return;
}
} else if (!strcasecmp(c->argv[j]->ptr, "bcast")) {
options |= CLIENT_TRACKING_BCAST;
options.tracking_bcast = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "optin")) {
options |= CLIENT_TRACKING_OPTIN;
options.tracking_optin = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "optout")) {
options |= CLIENT_TRACKING_OPTOUT;
options.tracking_optout = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "noloop")) {
options |= CLIENT_TRACKING_NOLOOP;
options.tracking_noloop = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "prefix") && moreargs) {
j++;
prefix = zrealloc(prefix, sizeof(robj *) * (numprefix + 1));
@ -3426,15 +3435,15 @@ NULL
if (!strcasecmp(c->argv[2]->ptr, "on")) {
/* Before enabling tracking, make sure options are compatible
* among each other and with the current state of the client. */
if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
if (!(options.tracking_bcast) && numprefix) {
addReplyError(c, "PREFIX option requires BCAST mode to be enabled");
zfree(prefix);
return;
}
if (c->flags & CLIENT_TRACKING) {
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
int newbcast = !!(options & CLIENT_TRACKING_BCAST);
if (c->flag.tracking) {
int oldbcast = !!c->flag.tracking_bcast;
int newbcast = !!(options.tracking_bcast);
if (oldbcast != newbcast) {
addReplyError(c, "You can't switch BCAST mode on/off before disabling "
"tracking for this client, and then re-enabling it with "
@ -3444,20 +3453,20 @@ NULL
}
}
if (options & CLIENT_TRACKING_BCAST && options & (CLIENT_TRACKING_OPTIN | CLIENT_TRACKING_OPTOUT)) {
if (options.tracking_bcast && (options.tracking_optin || options.tracking_optout)) {
addReplyError(c, "OPTIN and OPTOUT are not compatible with BCAST");
zfree(prefix);
return;
}
if (options & CLIENT_TRACKING_OPTIN && options & CLIENT_TRACKING_OPTOUT) {
if (options.tracking_optin && options.tracking_optout) {
addReplyError(c, "You can't specify both OPTIN mode and OPTOUT mode");
zfree(prefix);
return;
}
if ((options & CLIENT_TRACKING_OPTIN && c->flags & CLIENT_TRACKING_OPTOUT) ||
(options & CLIENT_TRACKING_OPTOUT && c->flags & CLIENT_TRACKING_OPTIN)) {
if ((options.tracking_optin && c->flag.tracking_optout) ||
(options.tracking_optout && c->flag.tracking_optin)) {
addReplyError(c, "You can't switch OPTIN/OPTOUT mode before disabling "
"tracking for this client, and then re-enabling it with "
"a different mode.");
@ -3465,7 +3474,7 @@ NULL
return;
}
if (options & CLIENT_TRACKING_BCAST) {
if (options.tracking_bcast) {
if (!checkPrefixCollisionsOrReply(c, prefix, numprefix)) {
zfree(prefix);
return;
@ -3483,7 +3492,7 @@ NULL
zfree(prefix);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "caching") && c->argc >= 3) {
if (!(c->flags & CLIENT_TRACKING)) {
if (!c->flag.tracking) {
addReplyError(c, "CLIENT CACHING can be called only when the "
"client is in tracking mode with OPTIN or "
"OPTOUT mode enabled");
@ -3492,15 +3501,15 @@ NULL
char *opt = c->argv[2]->ptr;
if (!strcasecmp(opt, "yes")) {
if (c->flags & CLIENT_TRACKING_OPTIN) {
c->flags |= CLIENT_TRACKING_CACHING;
if (c->flag.tracking_optin) {
c->flag.tracking_caching = 1;
} else {
addReplyError(c, "CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
return;
}
} else if (!strcasecmp(opt, "no")) {
if (c->flags & CLIENT_TRACKING_OPTOUT) {
c->flags |= CLIENT_TRACKING_CACHING;
if (c->flag.tracking_optout) {
c->flag.tracking_caching = 1;
} else {
addReplyError(c, "CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
return;
@ -3514,7 +3523,7 @@ NULL
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "getredir") && c->argc == 2) {
/* CLIENT GETREDIR */
if (c->flags & CLIENT_TRACKING) {
if (c->flag.tracking) {
addReplyLongLong(c, c->client_tracking_redirection);
} else {
addReplyLongLong(c, -1);
@ -3526,33 +3535,33 @@ NULL
addReplyBulkCString(c, "flags");
void *arraylen_ptr = addReplyDeferredLen(c);
int numflags = 0;
addReplyBulkCString(c, c->flags & CLIENT_TRACKING ? "on" : "off");
addReplyBulkCString(c, c->flag.tracking ? "on" : "off");
numflags++;
if (c->flags & CLIENT_TRACKING_BCAST) {
if (c->flag.tracking_bcast) {
addReplyBulkCString(c, "bcast");
numflags++;
}
if (c->flags & CLIENT_TRACKING_OPTIN) {
if (c->flag.tracking_optin) {
addReplyBulkCString(c, "optin");
numflags++;
if (c->flags & CLIENT_TRACKING_CACHING) {
if (c->flag.tracking_caching) {
addReplyBulkCString(c, "caching-yes");
numflags++;
}
}
if (c->flags & CLIENT_TRACKING_OPTOUT) {
if (c->flag.tracking_optout) {
addReplyBulkCString(c, "optout");
numflags++;
if (c->flags & CLIENT_TRACKING_CACHING) {
if (c->flag.tracking_caching) {
addReplyBulkCString(c, "caching-no");
numflags++;
}
}
if (c->flags & CLIENT_TRACKING_NOLOOP) {
if (c->flag.tracking_noloop) {
addReplyBulkCString(c, "noloop");
numflags++;
}
if (c->flags & CLIENT_TRACKING_BROKEN_REDIR) {
if (c->flag.tracking_broken_redir) {
addReplyBulkCString(c, "broken_redirect");
numflags++;
}
@ -3560,7 +3569,7 @@ NULL
/* Redirect */
addReplyBulkCString(c, "redirect");
if (c->flags & CLIENT_TRACKING) {
if (c->flag.tracking) {
addReplyLongLong(c, c->client_tracking_redirection);
} else {
addReplyLongLong(c, -1);
@ -3583,10 +3592,10 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr, "no-touch")) {
/* CLIENT NO-TOUCH ON|OFF */
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flags |= CLIENT_NO_TOUCH;
c->flag.no_touch = 1;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flags &= ~CLIENT_NO_TOUCH;
c->flag.no_touch = 0;
addReply(c, shared.ok);
} else {
addReplyErrorObject(c, shared.syntaxerr);
@ -3661,7 +3670,7 @@ void helloCommand(client *c) {
}
/* At this point we need to be authenticated to continue. */
if (!(c->flags & CLIENT_AUTHENTICATED)) {
if (!c->flag.authenticated) {
addReplyError(c, "-NOAUTH HELLO must be called with the client already "
"authenticated, otherwise the HELLO <proto> AUTH <user> <pass> "
"option can be used to authenticate the client and "
@ -3900,11 +3909,11 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
* CLIENT_TYPE_PRIMARY -> The client representing our replication primary.
*/
int getClientType(client *c) {
if (c->flags & CLIENT_PRIMARY) return CLIENT_TYPE_PRIMARY;
if (c->flag.primary) return CLIENT_TYPE_PRIMARY;
/* Even though MONITOR clients are marked as replicas, we
* want the expose them as normal clients. */
if ((c->flags & CLIENT_REPLICA) && !(c->flags & CLIENT_MONITOR)) return CLIENT_TYPE_REPLICA;
if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
if (c->flag.replica && !c->flag.monitor) return CLIENT_TYPE_REPLICA;
if (c->flag.pubsub) return CLIENT_TYPE_PUBSUB;
return CLIENT_TYPE_NORMAL;
}
@ -3999,7 +4008,7 @@ int closeClientOnOutputBufferLimitReached(client *c, int async) {
serverAssert(c->reply_bytes < SIZE_MAX - (1024 * 64));
/* Note that c->reply_bytes is irrelevant for replica clients
* (they use the global repl buffers). */
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || c->flags & CLIENT_CLOSE_ASAP) return 0;
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_REPLICA) || c->flag.close_asap) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(), c);
@ -4029,7 +4038,7 @@ void flushReplicasOutputBuffers(void) {
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
int can_receive_writes = connHasWriteHandler(replica->conn) || (replica->flags & CLIENT_PENDING_WRITE);
int can_receive_writes = connHasWriteHandler(replica->conn) || (replica->flag.pending_write);
/* We don't want to send the pending data to the replica in a few
* cases:
@ -4045,8 +4054,8 @@ void flushReplicasOutputBuffers(void) {
*
* 3. Obviously if the replica is not ONLINE.
*/
if (replica->repl_state == REPLICA_STATE_ONLINE && !(replica->flags & CLIENT_CLOSE_ASAP) &&
can_receive_writes && !replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) {
if (replica->repl_state == REPLICA_STATE_ONLINE && !(replica->flag.close_asap) && can_receive_writes &&
!replica->repl_start_cmd_stream_on_ack && clientHasPendingReplies(replica)) {
writeToClient(replica, 0);
}
}
@ -4410,11 +4419,11 @@ int handleClientsWithPendingWritesUsingThreads(void) {
int item_id = 0;
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
c->flag.pending_write = 0;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
if (c->flag.close_asap) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
@ -4489,7 +4498,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_PRIMARY | CLIENT_REPLICA | CLIENT_BLOCKED)) && io_threads_op == IO_THREADS_OP_IDLE) {
!(c->flag.primary || c->flag.replica || c->flag.blocked) && io_threads_op == IO_THREADS_OP_IDLE) {
listAddNodeHead(server.clients_pending_read, c);
c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
@ -4559,7 +4568,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
listDelNode(server.clients_pending_read, ln);
c->pending_read_list_node = NULL;
serverAssert(!(c->flags & CLIENT_BLOCKED));
serverAssert(!c->flag.blocked);
if (beforeNextClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
@ -4581,7 +4590,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not put the client in pending write queue (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
if (!c->flag.pending_write && clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
}
/* Update processed count on server */

View File

@ -605,7 +605,7 @@ void trimStringObjectIfNeeded(robj *o, int trim_small_values) {
* 3. When calling from RM_TrimStringAllocation (trim_small_values is true). */
size_t len = sdslen(o->ptr);
if (len >= PROTO_MBULK_BIG_ARG || trim_small_values ||
(server.executing_client && server.executing_client->flags & CLIENT_SCRIPT && len < LUA_CMD_OBJCACHE_MAX_LEN)) {
(server.executing_client && server.executing_client->flag.script && len < LUA_CMD_OBJCACHE_MAX_LEN)) {
if (sdsavail(o->ptr) > len / 10) {
o->ptr = sdsRemoveFreeSpace(o->ptr, 0);
}

View File

@ -105,8 +105,8 @@ pubsubtype pubSubShardType = {
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[3]);
else
@ -114,15 +114,15 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu
addReply(c, message_bulk);
addReplyBulk(c, channel);
if (msg) addReplyBulk(c, msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[4]);
else
@ -131,13 +131,13 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
addReplyBulk(c, pat);
addReplyBulk(c, channel);
addReplyBulk(c, msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[3]);
else
@ -145,7 +145,7 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
addReply(c, *type.subscribeMsg);
addReplyBulk(c, channel);
addReplyLongLong(c, type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* Send the pubsub unsubscription notification to the client.
@ -153,8 +153,8 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[3]);
else
@ -165,13 +165,13 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
else
addReplyNull(c);
addReplyLongLong(c, type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* Send the pubsub pattern subscription notification to the client. */
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[3]);
else
@ -179,7 +179,7 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
addReply(c, shared.psubscribebulk);
addReplyBulk(c, pattern);
addReplyLongLong(c, clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* Send the pubsub pattern unsubscription notification to the client.
@ -187,8 +187,8 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
* punsubscribe command but there are no pattern to unsubscribe from: we
* still send a notification. */
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
if (c->resp == 2)
addReply(c, shared.mbulkhdr[3]);
else
@ -199,7 +199,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
else
addReplyNull(c);
addReplyLongLong(c, clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/*-----------------------------------------------------------------------------
@ -241,15 +241,15 @@ int clientTotalPubSubSubscriptionCount(client *c) {
}
void markClientAsPubSub(client *c) {
if (!(c->flags & CLIENT_PUBSUB)) {
c->flags |= CLIENT_PUBSUB;
if (!c->flag.pubsub) {
c->flag.pubsub = 1;
server.pubsub_clients++;
}
}
void unmarkClientAsPubSub(client *c) {
if (c->flags & CLIENT_PUBSUB) {
c->flags &= ~CLIENT_PUBSUB;
if (c->flag.pubsub) {
c->flag.pubsub = 0;
server.pubsub_clients--;
}
}
@ -539,7 +539,7 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
/* SUBSCRIBE channel [channel ...] */
void subscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
if (c->flag.deny_blocking && !c->flag.multi) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
@ -571,7 +571,7 @@ void unsubscribeCommand(client *c) {
/* PSUBSCRIBE pattern [pattern ...] */
void psubscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
if (c->flag.deny_blocking && !c->flag.multi) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
@ -709,7 +709,7 @@ void spublishCommand(client *c) {
/* SSUBSCRIBE shardchannel [shardchannel ...] */
void ssubscribeCommand(client *c) {
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
/* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe. */
addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");

View File

@ -1859,7 +1859,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
if (server.sanitize_dump_payload == SANITIZE_DUMP_CLIENTS) {
/* Skip sanitization when loading (an RDB), or getting a RESTORE command
* from either the primary or a client using an ACL user with the skip-sanitize-payload flag. */
int skip = server.loading || (server.current_client && (server.current_client->flags & CLIENT_PRIMARY));
int skip = server.loading || (server.current_client && (server.current_client->flag.primary));
if (!skip && server.current_client && server.current_client->user)
skip = !!(server.current_client->user->flags & USER_FLAG_SANITIZE_PAYLOAD_SKIP);
deep_integrity_validation = !skip;

View File

@ -198,7 +198,7 @@ void resetReplicationBuffer(void) {
int canFeedReplicaReplBuffer(client *replica) {
/* Don't feed replicas that only want the RDB. */
if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
if (replica->flag.repl_rdbonly) return 0;
/* Don't feed replicas that are still waiting for BGSAVE to start. */
if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) return 0;
@ -568,9 +568,9 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
gettimeofday(&tv, NULL);
cmdrepr = sdscatprintf(cmdrepr, "%ld.%06ld ", (long)tv.tv_sec, (long)tv.tv_usec);
if (c->flags & CLIENT_SCRIPT) {
if (c->flag.script) {
cmdrepr = sdscatprintf(cmdrepr, "[%d lua] ", dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
} else if (c->flag.unix_socket) {
cmdrepr = sdscatprintf(cmdrepr, "[%d unix:%s] ", dictid, server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr, "[%d %s] ", dictid, getClientPeerId(c));
@ -699,7 +699,7 @@ int replicationSetupReplicaForFullResync(client *replica, long long offset) {
/* Don't send this reply to replicas that approached us with
* the old SYNC command. */
if (!(replica->flags & CLIENT_PRE_PSYNC)) {
if (!(replica->flag.pre_psync)) {
buflen = snprintf(buf, sizeof(buf), "+FULLRESYNC %s %lld\r\n", server.replid, offset);
if (connWrite(replica->conn, buf, buflen) != buflen) {
freeClientAsync(replica);
@ -768,7 +768,7 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) {
* 1) Set client state to make it a replica.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the replica. */
c->flags |= CLIENT_REPLICA;
c->flag.replica = 1;
c->repl_state = REPLICA_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_start_cmd_stream_on_ack = 0;
@ -877,10 +877,10 @@ int startBgsaveForReplication(int mincapa, int req) {
if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
replica->repl_state = REPL_STATE_NONE;
replica->flags &= ~CLIENT_REPLICA;
replica->flag.replica = 0;
listDelNode(server.replicas, ln);
addReplyError(replica, "BGSAVE failed, replication can't continue");
replica->flags |= CLIENT_CLOSE_AFTER_REPLY;
replica->flag.close_after_reply = 1;
}
}
return retval;
@ -907,7 +907,7 @@ int startBgsaveForReplication(int mincapa, int req) {
/* SYNC and PSYNC command implementation. */
void syncCommand(client *c) {
/* ignore SYNC if already replica or in monitor mode */
if (c->flags & CLIENT_REPLICA) return;
if (c->flag.replica) return;
/* Check if this is a failover request to a replica with the same replid and
* become a primary if so. */
@ -998,7 +998,7 @@ void syncCommand(client *c) {
/* If a replica uses SYNC, we are dealing with an old implementation
* of the replication protocol (like valkey-cli --replica). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
c->flag.pre_psync = 1;
}
/* Full resynchronization. */
@ -1009,7 +1009,7 @@ void syncCommand(client *c) {
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_REPLICA;
c->flag.replica = 1;
listAddNodeTail(server.replicas, c);
/* Create the replication backlog if needed. */
@ -1041,7 +1041,7 @@ void syncCommand(client *c) {
/* If the client needs a buffer of commands, we can't use
* a replica without replication buffer. */
if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END &&
(!(replica->flags & CLIENT_REPL_RDBONLY) || (c->flags & CLIENT_REPL_RDBONLY)))
(!(replica->flag.repl_rdbonly) || (c->flag.repl_rdbonly)))
break;
}
/* To attach this replica, we check that it has at least all the
@ -1052,7 +1052,7 @@ void syncCommand(client *c) {
/* Perfect, the server is already registering differences for
* another replica. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
if (!(c->flags & CLIENT_REPL_RDBONLY)) copyReplicaOutputBuffer(c, replica);
if (!c->flag.repl_rdbonly) copyReplicaOutputBuffer(c, replica);
replicationSetupReplicaForFullResync(c, replica->psync_initial_offset);
serverLog(LL_NOTICE, "Waiting for end of BGSAVE for SYNC");
} else {
@ -1168,7 +1168,7 @@ void replconfCommand(client *c) {
* internal only command that normal clients should never use. */
long long offset;
if (!(c->flags & CLIENT_REPLICA)) return;
if (!c->flag.replica) return;
if ((getLongLongFromObject(c->argv[j + 1], &offset) != C_OK)) return;
if (offset > c->repl_ack_off) c->repl_ack_off = offset;
if (c->argc > j + 3 && !strcasecmp(c->argv[j + 2]->ptr, "fack")) {
@ -1200,9 +1200,9 @@ void replconfCommand(client *c) {
long rdb_only = 0;
if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_only, NULL) != C_OK) return;
if (rdb_only == 1)
c->flags |= CLIENT_REPL_RDBONLY;
c->flag.repl_rdbonly = 1;
else
c->flags &= ~CLIENT_REPL_RDBONLY;
c->flag.repl_rdbonly = 0;
} else if (!strcasecmp(c->argv[j]->ptr, "rdb-filter-only")) {
/* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters
* for the RDB snapshot. Currently we only support a single
@ -1258,7 +1258,7 @@ void replconfCommand(client *c) {
* the return value indicates that the replica should be disconnected.
* */
int replicaPutOnline(client *replica) {
if (replica->flags & CLIENT_REPL_RDBONLY) {
if (replica->flag.repl_rdbonly) {
replica->repl_state = REPLICA_STATE_RDB_TRANSMITTED;
/* The client asked for RDB only so we should close it ASAP */
serverLog(LL_NOTICE, "RDB transfer completed, rdb only replica (%s) should be disconnected asap",
@ -1288,7 +1288,7 @@ int replicaPutOnline(client *replica) {
* accumulate output buffer data without sending it to the replica so it
* won't get mixed with the RDB stream. */
void replicaStartCommandStream(client *replica) {
serverAssert(!(replica->flags & CLIENT_REPL_RDBONLY));
serverAssert(!(replica->flag.repl_rdbonly));
replica->repl_start_cmd_stream_on_ack = 0;
putClientInPendingWriteQueue(replica);
@ -1721,7 +1721,8 @@ void replicationCreatePrimaryClient(connection *conn, int dbid) {
* to pass the execution to a background thread and unblock after the
* execution is done. This is the reason why we allow blocking the replication
* connection. */
server.primary->flags |= (CLIENT_PRIMARY | CLIENT_AUTHENTICATED);
server.primary->flag.primary = 1;
server.primary->flag.authenticated = 1;
/* Allocate a private query buffer for the primary client instead of using the shared query buffer.
* This is done because the primary's query buffer data needs to be preserved for my sub-replicas to use. */
@ -1732,7 +1733,7 @@ void replicationCreatePrimaryClient(connection *conn, int dbid) {
memcpy(server.primary->replid, server.primary_replid, sizeof(server.primary_replid));
/* If primary offset is set to -1, this primary is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.primary->reploff == -1) server.primary->flags |= CLIENT_PRE_PSYNC;
if (server.primary->reploff == -1) server.primary->flag.pre_psync = 1;
if (dbid != -1) selectDb(server.primary, dbid);
}
@ -3073,7 +3074,7 @@ void replicaofCommand(client *c) {
} else {
long port;
if (c->flags & CLIENT_REPLICA) {
if (c->flag.replica) {
/* If a client is already a replica they cannot run this command,
* because it involves flushing all replicas (including this
* client) */
@ -3171,7 +3172,7 @@ void replicationSendAck(void) {
if (c != NULL) {
int send_fack = server.fsynced_reploff != -1;
c->flags |= CLIENT_PRIMARY_FORCE_REPLY;
c->flag.primary_force_reply = 1;
addReplyArrayLen(c, send_fack ? 5 : 3);
addReplyBulkCString(c, "REPLCONF");
addReplyBulkCString(c, "ACK");
@ -3180,7 +3181,7 @@ void replicationSendAck(void) {
addReplyBulkCString(c, "FACK");
addReplyBulkLongLong(c, server.fsynced_reploff);
}
c->flags &= ~CLIENT_PRIMARY_FORCE_REPLY;
c->flag.primary_force_reply = 0;
}
}
@ -3219,7 +3220,7 @@ void replicationCachePrimary(client *c) {
server.primary->qb_pos = 0;
server.primary->repl_applied = 0;
server.primary->read_reploff = server.primary->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
if (c->flag.multi) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
@ -3286,7 +3287,7 @@ void replicationDiscardCachedPrimary(void) {
if (server.cached_primary == NULL) return;
serverLog(LL_NOTICE, "Discarding previously cached primary state.");
server.cached_primary->flags &= ~CLIENT_PRIMARY;
server.cached_primary->flag.primary = 0;
freeClient(server.cached_primary);
server.cached_primary = NULL;
}
@ -3302,8 +3303,9 @@ void replicationResurrectCachedPrimary(connection *conn) {
server.cached_primary = NULL;
server.primary->conn = conn;
connSetPrivateData(server.primary->conn, server.primary);
server.primary->flags &= ~(CLIENT_CLOSE_AFTER_REPLY | CLIENT_CLOSE_ASAP);
server.primary->flags |= CLIENT_AUTHENTICATED;
server.primary->flag.close_after_reply = 0;
server.primary->flag.close_asap = 0;
server.primary->flag.authenticated = 1;
server.primary->last_interaction = server.unixtime;
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
@ -3448,7 +3450,7 @@ void waitCommand(client *c) {
/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
if (ackreplicas >= numreplicas || c->flags & CLIENT_DENY_BLOCKING) {
if (ackreplicas >= numreplicas || c->flag.deny_blocking) {
addReplyLongLong(c, ackreplicas);
return;
}
@ -3486,7 +3488,7 @@ void waitaofCommand(client *c) {
/* First try without blocking at all. */
ackreplicas = replicationCountAOFAcksByOffset(c->woff);
acklocal = server.fsynced_reploff >= c->woff;
if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flags & CLIENT_DENY_BLOCKING) {
if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, acklocal);
addReplyLongLong(c, ackreplicas);
@ -3577,8 +3579,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags |= CLIENT_REPLICATION_DONE;
} else if (c->flag.pending_command) {
c->flag.replication_done = 1;
} else {
addReplyLongLong(c, numreplicas);
}
@ -3648,7 +3650,7 @@ void replicationCron(void) {
/* Send ACK to primary from time to time.
* Note that we do not send periodic acks to primary that don't
* support PSYNC and replication offsets. */
if (server.primary_host && server.primary && !(server.primary->flags & CLIENT_PRE_PSYNC)) replicationSendAck();
if (server.primary_host && server.primary && !(server.primary->flag.pre_psync)) replicationSendAck();
/* If we have attached replicas, PING them from time to time.
* So replicas can implement an explicit timeout to primaries, and will
@ -3711,7 +3713,7 @@ void replicationCron(void) {
client *replica = ln->value;
if (replica->repl_state == REPLICA_STATE_ONLINE) {
if (replica->flags & CLIENT_PRE_PSYNC) continue;
if (replica->flag.pre_psync) continue;
if ((server.unixtime - replica->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
replicationGetReplicaName(replica));

View File

@ -132,7 +132,7 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx,
uint64_t script_flags,
int ro) {
serverAssert(!curr_run_ctx);
int client_allow_oom = !!(caller->flags & CLIENT_ALLOW_OOM);
int client_allow_oom = !!(caller->flag.allow_oom);
int running_stale =
server.primary_host && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0;
@ -224,8 +224,8 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx,
script_client->resp = 2; /* Default is RESP2, scripts can change it. */
/* If we are in MULTI context, flag Lua client as CLIENT_MULTI. */
if (curr_client->flags & CLIENT_MULTI) {
script_client->flags |= CLIENT_MULTI;
if (curr_client->flag.multi) {
script_client->flag.multi = 1;
}
run_ctx->start_time = getMonotonicUs();
@ -260,7 +260,7 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
serverAssert(curr_run_ctx);
/* After the script done, remove the MULTI state. */
run_ctx->c->flags &= ~CLIENT_MULTI;
run_ctx->c->flag.multi = 0;
if (scriptIsTimedout()) {
exitScriptTimedoutMode(run_ctx);
@ -426,8 +426,8 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or
* received from our primary or when loading the AOF back in memory. */
int error_code;
/* Duplicate relevant flags in the script client. */
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
c->flag.readonly = original_c->flag.readonly;
c->flag.asking = original_c->flag.asking;
int hashslot = -1;
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != getMyClusterNode()) {
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
@ -582,7 +582,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c, call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
serverAssert(c->flag.blocked == 0);
return;
error:

View File

@ -707,7 +707,7 @@ int clientsCronResizeQueryBuffer(client *c) {
if (idletime > 2) {
/* 1) Query is idle for a long time. */
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_PRIMARY) && !remaining) {
if (!c->flag.primary && !remaining) {
/* If the client is not a primary and no data is pending,
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
@ -858,7 +858,7 @@ void updateClientMemoryUsage(client *c) {
}
int clientEvictionAllowed(client *c) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) {
if (server.maxmemory_clients == 0 || c->flag.no_evict || !c->conn) {
return 0;
}
int type = getClientType(c);
@ -3144,7 +3144,7 @@ struct serverCommand *lookupCommandOrOriginal(robj **argv, int argc) {
/* Commands arriving from the primary client or AOF client, should never be rejected. */
int mustObeyClient(client *c) {
return c->id == CLIENT_ID_AOF || c->flags & CLIENT_PRIMARY;
return c->id == CLIENT_ID_AOF || c->flag.primary;
}
static int shouldPropagate(int target) {
@ -3216,25 +3216,25 @@ void alsoPropagate(int dbid, robj **argv, int argc, int target) {
* specific command execution into AOF / Replication. */
void forceCommandPropagation(client *c, int flags) {
serverAssert(c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE));
if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
if (flags & PROPAGATE_REPL) c->flag.force_repl = 1;
if (flags & PROPAGATE_AOF) c->flag.force_aof = 1;
}
/* Avoid that the executed command is propagated at all. This way we
* are free to just propagate what we want using the alsoPropagate()
* API. */
void preventCommandPropagation(client *c) {
c->flags |= CLIENT_PREVENT_PROP;
c->flag.prevent_prop = 1;
}
/* AOF specific version of preventCommandPropagation(). */
void preventCommandAOF(client *c) {
c->flags |= CLIENT_PREVENT_AOF_PROP;
c->flag.prevent_aof_prop = 1;
}
/* Replication specific version of preventCommandPropagation(). */
void preventCommandReplication(client *c) {
c->flags |= CLIENT_PREVENT_REPL_PROP;
c->flag.prevent_repl_prop = 1;
}
/* Log the last command a client executed into the slowlog. */
@ -3395,7 +3395,8 @@ int incrCommandStatsOnError(struct serverCommand *cmd, int flags) {
*/
void call(client *c, int flags) {
long long dirty;
uint64_t client_old_flags = c->flags;
struct ClientFlags client_old_flags = c->flag;
struct serverCommand *real_cmd = c->realcmd;
client *prev_client = server.executing_client;
server.executing_client = c;
@ -3412,7 +3413,9 @@ void call(client *c, int flags) {
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF | CLIENT_FORCE_REPL | CLIENT_PREVENT_PROP);
c->flag.force_aof = 0;
c->flag.force_repl = 0;
c->flag.prevent_prop = 0;
/* The server core is in charge of propagation when the first entry point
* of call() is processCommand().
@ -3433,12 +3436,12 @@ void call(client *c, int flags) {
* sending client side caching message in the middle of a command reply.
* In case of blocking commands, the flag will be un-set only after successfully
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;
c->flag.executing_command = 1;
/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
* processing of the command proc, the client is aware that it is being
* re-processed. */
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
if (reprocessing_command) c->flag.reprocessing_command = 1;
monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs();
@ -3446,13 +3449,13 @@ void call(client *c, int flags) {
c->cmd->proc(c);
/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
if (reprocessing_command) c->flag.reprocessing_command = 0;
exitExecutionUnit();
/* In case client is blocked after trying to execute the command,
* it means the execution is not yet completed and we MIGHT reprocess the command in the future. */
if (!(c->flags & CLIENT_BLOCKED)) c->flags &= ~(CLIENT_EXECUTING_COMMAND);
if (!c->flag.blocked) c->flag.executing_command = 0;
/* In order to avoid performance implication due to querying the clock using a system call 3 times,
* we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */
@ -3478,9 +3481,9 @@ void call(client *c, int flags) {
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
if (c->flag.close_after_command) {
c->flag.close_after_command = 0;
c->flag.close_after_reply = 1;
}
/* Note: the code below uses the real command that was executed
@ -3498,7 +3501,7 @@ void call(client *c, int flags) {
/* Log the command into the Slow log if needed.
* If the client is blocked we will handle slowlog when it is unblocked. */
if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) slowlogPushCurrentCommand(c, real_cmd, c->duration);
if (update_command_stats && !c->flag.blocked) slowlogPushCurrentCommand(c, real_cmd, c->duration);
/* Send the command to clients in MONITOR mode if applicable,
* since some administrative commands are considered too dangerous to be shown.
@ -3512,20 +3515,20 @@ void call(client *c, int flags) {
/* Clear the original argv.
* If the client is blocked we will handle slowlog when it is unblocked. */
if (!(c->flags & CLIENT_BLOCKED)) freeClientOriginalArgv(c);
if (!c->flag.blocked) freeClientOriginalArgv(c);
/* populate the per-command statistics that we show in INFO commandstats.
* If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) {
if (update_command_stats && !c->flag.blocked) {
real_cmd->calls++;
real_cmd->microseconds += c->duration;
if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED))
if (server.latency_tracking_enabled && !c->flag.blocked)
updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration * 1000);
}
/* The duration needs to be reset after each call except for a blocked command,
* which is expected to record and reset the duration after unblocking. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (!c->flag.blocked) {
c->duration = 0;
}
@ -3533,8 +3536,8 @@ void call(client *c, int flags) {
* We never propagate EXEC explicitly, it will be implicitly
* propagated if needed (see propagatePendingCommands).
* Also, module commands take care of themselves */
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP &&
c->cmd->proc != execCommand && !(c->cmd->flags & CMD_MODULE)) {
if (flags & CMD_CALL_PROPAGATE && !c->flag.prevent_prop && c->cmd->proc != execCommand &&
!(c->cmd->flags & CMD_MODULE)) {
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
@ -3543,17 +3546,15 @@ void call(client *c, int flags) {
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
if (c->flag.force_repl) propagate_flags |= PROPAGATE_REPL;
if (c->flag.force_aof) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementation called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP || c->flags & CLIENT_MODULE_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
if (c->flag.prevent_repl_prop || c->flag.module_prevent_repl_prop || !(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP || c->flags & CLIENT_MODULE_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
if (c->flag.prevent_aof_prop || c->flag.module_prevent_aof_prop || !(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call alsoPropagate() only if at least one of AOF / replication
@ -3563,8 +3564,9 @@ void call(client *c, int flags) {
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF | CLIENT_FORCE_REPL | CLIENT_PREVENT_PROP);
c->flags |= client_old_flags & (CLIENT_FORCE_AOF | CLIENT_FORCE_REPL | CLIENT_PREVENT_PROP);
c->flag.force_aof = client_old_flags.force_aof;
c->flag.force_repl = client_old_flags.force_repl;
c->flag.prevent_prop = client_old_flags.prevent_prop;
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. For read-only
@ -3574,13 +3576,13 @@ void call(client *c, int flags) {
/* We use the tracking flag of the original external client that
* triggered the command, but we take the keys from the actual command
* being executed. */
if (server.current_client && (server.current_client->flags & CLIENT_TRACKING) &&
!(server.current_client->flags & CLIENT_TRACKING_BCAST)) {
if (server.current_client && (server.current_client->flag.tracking) &&
!(server.current_client->flag.tracking_bcast)) {
trackingRememberKeys(server.current_client, c);
}
}
if (!(c->flags & CLIENT_BLOCKED)) {
if (!c->flag.blocked) {
/* Modules may call commands in cron, in which case server.current_client
* is not set. */
if (server.current_client) {
@ -3828,7 +3830,7 @@ int processCommand(client *c) {
}
}
if (c->flags & CLIENT_MULTI && c->cmd->flags & CMD_NO_MULTI) {
if (c->flag.multi && c->cmd->flags & CMD_NO_MULTI) {
rejectCommandFormat(c, "Command not allowed inside a transaction");
return C_OK;
}
@ -3838,8 +3840,8 @@ int processCommand(client *c) {
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c, &acl_errpos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c, acl_retval, (c->flags & CLIENT_MULTI) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, acl_errpos,
NULL, NULL);
addACLLogEntry(c, acl_retval, (c->flag.multi) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, acl_errpos, NULL,
NULL);
sds msg = getAclErrorMessage(acl_retval, c->user, c->cmd, c->argv[acl_errpos]->ptr, 0);
rejectCommandFormat(c, "-NOPERM %s", msg);
sdsfree(msg);
@ -3868,7 +3870,7 @@ int processCommand(client *c) {
}
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
(is_write_command || (is_read_command && !(c->flags & CLIENT_READONLY)))) {
(is_write_command || (is_read_command && !c->flag.readonly))) {
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
return C_OK;
}
@ -3960,7 +3962,7 @@ int processCommand(client *c) {
/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand &&
if ((c->flag.pubsub && c->resp == 2) && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand &&
c->cmd->proc != ssubscribeCommand && c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != sunsubscribeCommand && c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand) {
@ -4016,21 +4018,21 @@ int processCommand(client *c) {
/* Prevent a replica from sending commands that access the keyspace.
* The main objective here is to prevent abuse of client pause check
* from which replicas are exempt. */
if ((c->flags & CLIENT_REPLICA) && (is_may_replicate_command || is_write_command || is_read_command)) {
if (c->flag.replica && (is_may_replicate_command || is_write_command || is_read_command)) {
rejectCommandFormat(c, "Replica can't interact with the keyspace");
return C_OK;
}
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
if (!(c->flags & CLIENT_REPLICA) && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) {
if (!c->flag.replica && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) {
blockPostponeClient(c);
return C_OK;
}
/* Exec the command */
if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
if (c->flag.multi && c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
queueMultiCommand(c, cmd_flags);
@ -4410,7 +4412,7 @@ void pingCommand(client *c) {
return;
}
if (c->flags & CLIENT_PUBSUB && c->resp == 2) {
if (c->flag.pubsub && c->resp == 2) {
addReply(c, shared.mbulkhdr[2]);
addReplyBulkCBuffer(c, "pong", 4);
if (c->argc == 1)
@ -5949,7 +5951,7 @@ void infoCommand(client *c) {
}
void monitorCommand(client *c) {
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expects a reply per command and so can't execute MONITOR. */
@ -5958,9 +5960,10 @@ void monitorCommand(client *c) {
}
/* ignore MONITOR if already replica or in monitor mode */
if (c->flags & CLIENT_REPLICA) return;
if (c->flag.replica) return;
c->flags |= (CLIENT_REPLICA | CLIENT_MONITOR);
c->flag.replica = 1;
c->flag.monitor = 1;
listAddNodeTail(server.monitors, c);
addReply(c, shared.ok);
}

View File

@ -343,92 +343,6 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CMD_DOC_DEPRECATED (1 << 0) /* Command is deprecated */
#define CMD_DOC_SYSCMD (1 << 1) /* System (internal) command */
/* Client flags */
#define CLIENT_REPLICA (1 << 0) /* This client is a replica */
#define CLIENT_PRIMARY (1 << 1) /* This client is a primary */
#define CLIENT_MONITOR (1 << 2) /* This client is a replica monitor, see MONITOR */
#define CLIENT_MULTI (1 << 3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1 << 4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1 << 5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_CLOSE_AFTER_REPLY (1 << 6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED \
(1 << 7) /* This client was unblocked and is stored in \
server.unblocked_clients */
#define CLIENT_SCRIPT (1 << 8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1 << 9) /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1 << 10) /* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1 << 11) /* Client connected via Unix domain socket */
#define CLIENT_DIRTY_EXEC (1 << 12) /* EXEC will fail for errors while queueing */
#define CLIENT_PRIMARY_FORCE_REPLY (1 << 13) /* Queue replies even if is primary */
#define CLIENT_FORCE_AOF (1 << 14) /* Force AOF propagation of current cmd. */
#define CLIENT_FORCE_REPL (1 << 15) /* Force replication of current cmd. */
#define CLIENT_PRE_PSYNC (1 << 16) /* Instance don't understand PSYNC. */
#define CLIENT_READONLY (1 << 17) /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1 << 18) /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_AOF_PROP (1 << 19) /* Don't propagate to AOF. */
#define CLIENT_PREVENT_REPL_PROP (1 << 20) /* Don't propagate to replicas. */
#define CLIENT_PREVENT_PROP (CLIENT_PREVENT_AOF_PROP | CLIENT_PREVENT_REPL_PROP)
#define CLIENT_PENDING_WRITE \
(1 << 21) /* Client has output to send but a write \
handler is yet not installed. */
#define CLIENT_REPLY_OFF (1 << 22) /* Don't send replies to client. */
#define CLIENT_REPLY_SKIP_NEXT (1 << 23) /* Set CLIENT_REPLY_SKIP for next cmd */
#define CLIENT_REPLY_SKIP (1 << 24) /* Don't send just this reply. */
#define CLIENT_LUA_DEBUG (1 << 25) /* Run EVAL in debug mode. */
#define CLIENT_LUA_DEBUG_SYNC (1 << 26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1 << 27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1 << 28) /* Client should not be freed for now. */
#define CLIENT_EXECUTING_COMMAND \
(1 << 29) /* Indicates that the client is currently in the process of handling \
a command. usually this will be marked only during call() \
however, blocked clients might have this flag kept until they \
will try to reprocess the command. */
#define CLIENT_PENDING_COMMAND \
(1 << 30) /* Indicates the client has a fully \
* parsed command ready for execution. */
#define CLIENT_TRACKING \
(1ULL << 31) /* Client enabled keys tracking in order to \
perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL << 32) /* Target client is invalid. */
#define CLIENT_TRACKING_BCAST (1ULL << 33) /* Tracking in BCAST mode. */
#define CLIENT_TRACKING_OPTIN (1ULL << 34) /* Tracking in opt-in mode. */
#define CLIENT_TRACKING_OPTOUT (1ULL << 35) /* Tracking in opt-out mode. */
#define CLIENT_TRACKING_CACHING \
(1ULL << 36) /* CACHING yes/no was given, \
depending on optin/optout mode. */
#define CLIENT_TRACKING_NOLOOP \
(1ULL << 37) /* Don't send invalidation messages \
about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL << 38) /* This client is in the timeout table. */
#define CLIENT_PROTOCOL_ERROR (1ULL << 39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND \
(1ULL << 40) /* Close after executing commands \
* and writing entire reply. */
#define CLIENT_DENY_BLOCKING \
(1ULL << 41) /* Indicate that the client should not be blocked. \
currently, turned on inside MULTI, Lua, RM_Call, \
and AOF client */
#define CLIENT_REPL_RDBONLY \
(1ULL << 42) /* This client is a replica that only wants \
RDB without replication buffer. */
#define CLIENT_NO_EVICT \
(1ULL << 43) /* This client is protected against client \
memory eviction. */
#define CLIENT_ALLOW_OOM \
(1ULL << 44) /* Client used by RM_Call is allowed to fully execute \
scripts even when in OOM */
#define CLIENT_NO_TOUCH (1ULL << 45) /* This client will not touch LFU/LRU stats. */
#define CLIENT_PUSHING (1ULL << 46) /* This client is pushing notifications. */
#define CLIENT_MODULE_AUTH_HAS_RESULT \
(1ULL << 47) /* Indicates a client in the middle of module based \
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */
#define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */
/* Client capabilities */
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
@ -1203,9 +1117,70 @@ typedef struct {
} clientReqResInfo;
#endif
typedef struct ClientFlags {
uint64_t primary : 1;
uint64_t replica : 1;
uint64_t monitor : 1;
uint64_t multi : 1;
uint64_t blocked : 1;
uint64_t dirty_cas : 1;
uint64_t close_after_reply : 1;
uint64_t unblocked : 1;
uint64_t script : 1;
uint64_t asking : 1;
uint64_t close_asap : 1;
uint64_t unix_socket : 1;
uint64_t dirty_exec : 1;
uint64_t primary_force_reply : 1;
uint64_t force_aof : 1;
uint64_t force_repl : 1;
uint64_t pre_psync : 1;
uint64_t readonly : 1;
uint64_t pubsub : 1;
uint64_t prevent_aof_prop : 1;
uint64_t prevent_repl_prop : 1;
uint64_t prevent_prop : 1;
uint64_t pending_write : 1;
uint64_t reply_off : 1;
uint64_t reply_skip_next : 1;
uint64_t reply_skip : 1;
uint64_t lua_debug : 1;
uint64_t lua_debug_sync : 1;
uint64_t module : 1;
uint64_t protected : 1;
uint64_t executing_command : 1;
uint64_t pending_command : 1;
uint64_t tracking : 1;
uint64_t tracking_broken_redir : 1;
uint64_t tracking_bcast : 1;
uint64_t tracking_optin : 1;
uint64_t tracking_optout : 1;
uint64_t tracking_caching : 1;
uint64_t tracking_noloop : 1;
uint64_t in_to_table : 1;
uint64_t protocol_error : 1;
uint64_t close_after_command : 1;
uint64_t deny_blocking : 1;
uint64_t repl_rdbonly : 1;
uint64_t no_evict : 1;
uint64_t allow_oom : 1;
uint64_t no_touch : 1;
uint64_t pushing : 1;
uint64_t module_auth_has_result : 1;
uint64_t module_prevent_aof_prop : 1;
uint64_t module_prevent_repl_prop : 1;
uint64_t reprocessing_command : 1;
uint64_t replication_done : 1;
uint64_t authenticated : 1;
uint64_t reserved : 11;
} ClientFlags;
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
uint64_t flags; /* Client flags: CLIENT_* macros. */
uint64_t id; /* Client incremental unique ID. */
union {
uint64_t raw_flag;
struct ClientFlags flag;
};
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
uint32_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
@ -2642,7 +2617,7 @@ void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
int processInputBuffer(client *c);
void acceptCommonHandler(connection *conn, int flags, char *ip);
void acceptCommonHandler(connection *conn, struct ClientFlags flags, char *ip);
void readQueryFromClient(connection *conn);
int prepareClientToWrite(client *c);
void addReplyNull(client *c);
@ -2758,7 +2733,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
#endif
/* Client side caching (tracking mode) */
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void enableTracking(client *c, uint64_t redirect_to, struct ClientFlags options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *tracking, client *executing);
void trackingInvalidateKey(client *c, robj *keyobj, int bcast);

View File

@ -305,6 +305,7 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = server.max_new_conns_per_cycle;
struct ClientFlags flags = {0};
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
@ -317,7 +318,7 @@ static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int
return;
}
serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), 0, cip);
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), flags, cip);
}
}

View File

@ -308,7 +308,7 @@ void sortCommandGeneric(client *c, int readonly) {
* The other types (list, sorted set) will retain their native order
* even if no sort order is requested, so they remain stable across
* scripting and replication. */
if (dontsort && sortval->type == OBJ_SET && (storekey || c->flags & CLIENT_SCRIPT)) {
if (dontsort && sortval->type == OBJ_SET && (storekey || c->flag.script)) {
/* Force ALPHA sorting */
dontsort = 0;
alpha = 1;

View File

@ -940,7 +940,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, key, sdslen(key));
if (withvalues) addReplyBulkCBuffer(c, value, sdslen(value));
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
} else if (hash->encoding == OBJ_ENCODING_LISTPACK) {
listpackEntry *keys, *vals = NULL;
@ -954,7 +954,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
count -= sample_count;
lpRandomPairs(hash->ptr, sample_count, keys, vals);
hrandfieldReplyWithListpack(c, sample_count, keys, vals);
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
zfree(keys);
zfree(vals);

View File

@ -1213,7 +1213,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
addReplyNullArray(c);
return;
}
@ -1237,7 +1237,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
if (checkType(c, key, OBJ_LIST)) return;
if (key == NULL) {
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
/* Blocking against an empty list when blocking is not allowed
* returns immediately. */
addReplyNull(c);

View File

@ -1045,7 +1045,7 @@ void srandmemberWithCountCommand(client *c) {
else
addReplyBulkLongLong(c, entries[i].lval);
}
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
zfree(entries);
return;
@ -1058,7 +1058,7 @@ void srandmemberWithCountCommand(client *c) {
} else {
addReplyBulkCBuffer(c, str, len);
}
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
return;
}

View File

@ -2388,7 +2388,7 @@ void xreadCommand(client *c) {
if (timeout != -1) {
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
addReplyNullArray(c);
goto cleanup;
}

View File

@ -4038,7 +4038,7 @@ void blockingGenericZpopCommand(client *c,
/* If we are not allowed to block the client and the zset is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_DENY_BLOCKING) {
if (c->flag.deny_blocking) {
addReplyNullArray(c);
return;
}
@ -4123,7 +4123,7 @@ void zrandmemberWithCountCommand(client *c, long l, int withscores) {
if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, key, sdslen(key));
if (withscores) addReplyDouble(c, *(double *)dictGetVal(de));
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
} else if (zsetobj->encoding == OBJ_ENCODING_LISTPACK) {
listpackEntry *keys, *vals = NULL;
@ -4136,7 +4136,7 @@ void zrandmemberWithCountCommand(client *c, long l, int withscores) {
count -= sample_count;
lpRandomPairs(zsetobj->ptr, sample_count, keys, vals);
zrandmemberReplyWithListpack(c, sample_count, keys, vals);
if (c->flags & CLIENT_CLOSE_ASAP) break;
if (c->flag.close_asap) break;
}
zfree(keys);
zfree(vals);

View File

@ -37,7 +37,7 @@
* not blocked right now). If so send a reply, unblock it, and return 1.
* Otherwise 0 is returned and no operation is performed. */
int checkBlockedClientTimeout(client *c, mstime_t now) {
if (c->flags & CLIENT_BLOCKED && c->bstate.timeout != 0 && c->bstate.timeout < now) {
if (c->flag.blocked && c->bstate.timeout != 0 && c->bstate.timeout < now) {
/* Handle blocking operation specific timeout. */
unblockClientOnTimeout(c);
return 1;
@ -55,15 +55,15 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
if (server.maxidletime &&
/* This handles the idle clients connection timeout if set. */
!(c->flags & CLIENT_REPLICA) && /* No timeout for replicas and monitors */
!mustObeyClient(c) && /* No timeout for primaries and AOF */
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
!c->flag.replica && /* No timeout for replicas and monitors */
!mustObeyClient(c) && /* No timeout for primaries and AOF */
!c->flag.blocked && /* No timeout for BLPOP */
!c->flag.pubsub && /* No timeout for Pub/Sub clients */
(now - c->last_interaction > server.maxidletime)) {
serverLog(LL_VERBOSE, "Closing idle client");
freeClient(c);
return 1;
} else if (c->flags & CLIENT_BLOCKED) {
} else if (c->flag.blocked) {
/* Cluster: handle unblock & redirect of clients blocked
* into keys no longer served by this server. */
if (server.cluster_enabled) {
@ -112,14 +112,14 @@ void addClientToTimeoutTable(client *c) {
uint64_t timeout = c->bstate.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf, timeout, c);
if (raxTryInsert(server.clients_timeout_table, buf, sizeof(buf), NULL, NULL)) c->flags |= CLIENT_IN_TO_TABLE;
if (raxTryInsert(server.clients_timeout_table, buf, sizeof(buf), NULL, NULL)) c->flag.in_to_table = 1;
}
/* Remove the client from the table when it is unblocked for reasons
* different than timing out. */
void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE;
if (!c->flag.in_to_table) return;
c->flag.in_to_table = 0;
uint64_t timeout = c->bstate.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf, timeout, c);
@ -140,7 +140,7 @@ void handleBlockedClientsTimeout(void) {
client *c;
decodeTimeoutKey(ri.key, &timeout, &c);
if (timeout >= now) break; /* All the timeouts are in the future. */
c->flags &= ~CLIENT_IN_TO_TABLE;
c->flag.in_to_table = 0;
checkBlockedClientTimeout(c, now);
raxRemove(server.clients_timeout_table, ri.key, ri.key_len, NULL);
raxSeek(&ri, "^", NULL, 0);

View File

@ -745,6 +745,7 @@ static void tlsAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask)
int cport, cfd;
int max = server.max_new_tls_conns_per_cycle;
char cip[NET_IP_STR_LEN];
struct ClientFlags flags = {0};
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
@ -756,7 +757,7 @@ static void tlsAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask)
return;
}
serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedTLS(cfd, &server.tls_auth_clients), 0, cip);
acceptCommonHandler(connCreateAcceptedTLS(cfd, &server.tls_auth_clients), flags, cip);
}
}

View File

@ -67,7 +67,7 @@ typedef struct bcastState {
void disableTracking(client *c) {
/* If this client is in broadcasting mode, we need to unsubscribe it
* from all the prefixes it is registered to. */
if (c->flags & CLIENT_TRACKING_BCAST) {
if (c->flag.tracking_bcast) {
raxIterator ri;
raxStart(&ri, c->client_tracking_prefixes);
raxSeek(&ri, "^", NULL, 0);
@ -92,10 +92,15 @@ void disableTracking(client *c) {
}
/* Clear flags and adjust the count. */
if (c->flags & CLIENT_TRACKING) {
if (c->flag.tracking) {
server.tracking_clients--;
c->flags &= ~(CLIENT_TRACKING | CLIENT_TRACKING_BROKEN_REDIR | CLIENT_TRACKING_BCAST | CLIENT_TRACKING_OPTIN |
CLIENT_TRACKING_OPTOUT | CLIENT_TRACKING_CACHING | CLIENT_TRACKING_NOLOOP);
c->flag.tracking = 0;
c->flag.tracking_broken_redir = 0;
c->flag.tracking_bcast = 0;
c->flag.tracking_optin = 0;
c->flag.tracking_optout = 0;
c->flag.tracking_caching = 0;
c->flag.tracking_noloop = 0;
}
}
@ -173,11 +178,14 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
* eventually get freed, we'll send a message to the original client to
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR | CLIENT_TRACKING_BCAST | CLIENT_TRACKING_OPTIN |
CLIENT_TRACKING_OPTOUT | CLIENT_TRACKING_NOLOOP);
void enableTracking(client *c, uint64_t redirect_to, struct ClientFlags options, robj **prefix, size_t numprefix) {
if (!c->flag.tracking) server.tracking_clients++;
c->flag.tracking = 1;
c->flag.tracking_broken_redir = 0;
c->flag.tracking_bcast = 0;
c->flag.tracking_optin = 0;
c->flag.tracking_optout = 0;
c->flag.tracking_noloop = 0;
c->client_tracking_redirection = redirect_to;
/* This may be the first client we ever enable. Create the tracking
@ -189,8 +197,8 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr
}
/* For broadcasting, set the list of prefixes in the client. */
if (options & CLIENT_TRACKING_BCAST) {
c->flags |= CLIENT_TRACKING_BCAST;
if (options.tracking_bcast) {
c->flag.tracking_bcast = 1;
if (numprefix == 0) enableBcastTrackingForPrefix(c, "", 0);
for (size_t j = 0; j < numprefix; j++) {
sds sdsprefix = prefix[j]->ptr;
@ -199,7 +207,9 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr
}
/* Set the remaining flags that don't need any special handling. */
c->flags |= options & (CLIENT_TRACKING_OPTIN | CLIENT_TRACKING_OPTOUT | CLIENT_TRACKING_NOLOOP);
c->flag.tracking_optin = options.tracking_optin;
c->flag.tracking_optout = options.tracking_optout;
c->flag.tracking_noloop = options.tracking_noloop;
}
/* This function is called after the execution of a readonly command in the
@ -211,9 +221,9 @@ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **pr
void trackingRememberKeys(client *tracking, client *executing) {
/* Return if we are in optin/out mode and the right CACHING command
* was/wasn't given in order to modify the default behavior. */
uint64_t optin = tracking->flags & CLIENT_TRACKING_OPTIN;
uint64_t optout = tracking->flags & CLIENT_TRACKING_OPTOUT;
uint64_t caching_given = tracking->flags & CLIENT_TRACKING_CACHING;
uint64_t optin = tracking->flag.tracking_optin;
uint64_t optout = tracking->flag.tracking_optout;
uint64_t caching_given = tracking->flag.tracking_caching;
if ((optin && !caching_given) || (optout && caching_given)) return;
getKeysResult result;
@ -263,14 +273,14 @@ void trackingRememberKeys(client *tracking, client *executing) {
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
struct ClientFlags old_flags = c->flag;
c->flag.pushing = 1;
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
c->flag.tracking_broken_redir = 1;
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */
@ -279,14 +289,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c, "tracking-redir-broken", 21);
addReplyLongLong(c, c->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
return;
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
c = redir;
using_redirection = 1;
old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
old_flags = c->flag;
c->flag.pushing = 1;
}
/* Only send such info for clients in RESP version 3 or more. However
@ -296,7 +306,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
if (c->resp > 2) {
addReplyPushLen(c, 2);
addReplyBulkCBuffer(c, "invalidate", 10);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
} else if (using_redirection && c->flag.pubsub) {
/* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */
addReplyPubsubMessage(c, TrackingChannelName, NULL, shared.messagebulk);
@ -305,7 +315,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
return;
}
@ -317,7 +327,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c, keyname, keylen);
}
updateClientMemUsageAndBucket(c);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
if (!old_flags.pushing) c->flag.pushing = 0;
}
/* This function is called when a key is modified in the server and in the case
@ -383,20 +393,20 @@ void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
* previously the client was not in BCAST mode. This can happen if
* TRACKING is enabled normally, and then the client switches to
* BCAST mode. */
if (target == NULL || !(target->flags & CLIENT_TRACKING) || target->flags & CLIENT_TRACKING_BCAST) {
if (target == NULL || !(target->flag.tracking) || target->flag.tracking_bcast) {
continue;
}
/* If the client enabled the NOLOOP mode, don't send notifications
* about keys changed by the client itself. */
if (target->flags & CLIENT_TRACKING_NOLOOP && target == server.current_client) {
if (target->flag.tracking_noloop && target == server.current_client) {
continue;
}
/* If target is current client and it's executing a command, we need schedule key invalidation.
* As the invalidation messages may be interleaved with command
* response and should after command response. */
if (target == server.current_client && (server.current_client->flags & CLIENT_EXECUTING_COMMAND)) {
if (target == server.current_client && (server.current_client->flag.executing_command)) {
incrRefCount(keyobj);
listAddNodeTail(server.tracking_pending_keys, keyobj);
} else {
@ -463,7 +473,7 @@ void trackingInvalidateKeysOnFlush(int async) {
listRewind(server.clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
if (c->flag.tracking) {
if (c == server.current_client) {
/* We use a special NULL to indicate that we should send null */
listAddNodeTail(server.tracking_pending_keys, NULL);
@ -610,7 +620,7 @@ void trackingBroadcastInvalidationMessages(void) {
while (raxNext(&ri2)) {
client *c;
memcpy(&c, ri2.key, sizeof(c));
if (c->flags & CLIENT_TRACKING_NOLOOP) {
if (c->flag.tracking_noloop) {
/* This client may have certain keys excluded. */
sds adhoc = trackingBuildBroadcastReply(c, bs->keys);
if (adhoc) {

View File

@ -92,6 +92,8 @@ static connection *connCreateAcceptedUnix(int fd, void *priv) {
static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd;
int max = server.max_new_conns_per_cycle;
struct ClientFlags flags = {0};
flags.unix_socket = 1;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
@ -103,7 +105,7 @@ static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int m
return;
}
serverLog(LL_VERBOSE, "Accepted connection to %s", server.unixsocket);
acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL), CLIENT_UNIX_SOCKET, NULL);
acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL), flags, NULL);
}
}