Cherry picking keydb changes from keydbpro to main (#203)

* Audit Logging for KeyProxy and KeyDB (#144)

* Audit Log: log cert fingerprint (#151)

* Add more flash storage stats to info command.

* Remove unneeded libs when not building FLASH

* Fix mem leak

* Allow the reservation of localhost connections to ensure health checks always succeed even at maxclients (#181)

* Enable a force option for commands (#183)

* Fix missing newline and excessive logging in the CLI

* Support NO ONE for "CLUSTER REPLICATE" command.

Co-authored-by: Jacob Bohac <jbohac@snapchat.com>
Co-authored-by: Sergey Kolosov <skolosov@snapchat.com>
Co-authored-by: John Sully <jsully@snapchat.com>
Co-authored-by: John Sully <john@csquare.ca>
This commit is contained in:
Malavan Sotheeswaran 2023-06-27 16:23:20 -04:00 committed by GitHub Enterprise
parent f53e0337ef
commit c17b9f47ac
19 changed files with 382 additions and 69 deletions

View File

@ -14,6 +14,7 @@ public:
virtual class IStorage *createMetadataDb() = 0; virtual class IStorage *createMetadataDb() = 0;
virtual const char *name() const = 0; virtual const char *name() const = 0;
virtual size_t totalDiskspaceUsed() const = 0; virtual size_t totalDiskspaceUsed() const = 0;
virtual sdsstring getInfo() const = 0;
virtual bool FSlow() const = 0; virtual bool FSlow() const = 0;
virtual size_t filedsRequired() const { return 0; } virtual size_t filedsRequired() const { return 0; }
}; };

View File

@ -142,7 +142,7 @@ DEBUG=-g -ggdb
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS) FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS)
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CXXFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS) FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CXXFLAGS) $(KEYDB_CFLAGS) $(REDIS_CFLAGS)
FINAL_LDFLAGS=$(LDFLAGS) $(KEYDB_LDFLAGS) $(DEBUG) FINAL_LDFLAGS=$(LDFLAGS) $(KEYDB_LDFLAGS) $(DEBUG)
FINAL_LIBS+=-lm -lz -lcrypto -lbz2 -lzstd -llz4 -lsnappy FINAL_LIBS+=-lm -lz -lcrypto
ifneq ($(uname_S),Darwin) ifneq ($(uname_S),Darwin)
ifneq ($(uname_S),FreeBSD) ifneq ($(uname_S),FreeBSD)

View File

@ -4516,8 +4516,8 @@ void clusterCommand(client *c) {
"NODES", "NODES",
" Return cluster configuration seen by node. Output format:", " Return cluster configuration seen by node. Output format:",
" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...", " <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
"REPLICATE <node-id>", "REPLICATE (<node-id>|NO ONE)",
" Configure current node as replica to <node-id>.", " Configure current node as replica to <node-id> or turn it into empty primary.",
"RESET [HARD|SOFT]", "RESET [HARD|SOFT]",
" Reset current node (default: soft).", " Reset current node (default: soft).",
"SET-CONFIG-EPOCH <epoch>", "SET-CONFIG-EPOCH <epoch>",
@ -4890,15 +4890,23 @@ NULL
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG); CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"replicate") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"replicate") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER REPLICATE <NODE ID> */ /* CLUSTER REPLICATE (<NODE ID>|NO ONE) */
clusterNode *n = clusterLookupNode(szFromObj(c->argv[2])); clusterNode *n;
if (c->argc == 4) {
if (0 != strcasecmp(szFromObj(c->argv[2]),"NO") || 0 != strcasecmp(szFromObj(c->argv[3]),"ONE")) {
addReplySubcommandSyntaxError(c);
return;
}
n = nullptr;
} else {
/* Lookup the specified node in our table. */ /* Lookup the specified node in our table. */
if (!n) { n = clusterLookupNode(szFromObj(c->argv[2]));
if (n == nullptr) {
addReplyErrorFormat(c,"Unknown node %s", (char*)ptrFromObj(c->argv[2])); addReplyErrorFormat(c,"Unknown node %s", (char*)ptrFromObj(c->argv[2]));
return; return;
} }
}
/* I can't replicate myself. */ /* I can't replicate myself. */
if (n == myself) { if (n == myself) {
@ -4907,7 +4915,7 @@ NULL
} }
/* Can't replicate a slave. */ /* Can't replicate a slave. */
if (nodeIsSlave(n)) { if (n != nullptr && nodeIsSlave(n)) {
addReplyError(c,"I can only replicate a master, not a replica."); addReplyError(c,"I can only replicate a master, not a replica.");
return; return;
} }
@ -4923,8 +4931,26 @@ NULL
return; return;
} }
if (n == nullptr) {
if (nodeIsMaster(myself)) {
addReply(c,shared.ok);
return;
}
serverLog(LL_NOTICE,"Stop replication and turning myself into empty primary.");
clusterSetNodeAsMaster(myself);
if (listLength(g_pserver->masters) > 0)
{
serverAssert(listLength(g_pserver->masters) == 1);
replicationUnsetMaster((redisMaster*)listFirst(g_pserver->masters)->value);
}
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
emptyDb(-1,empty_db_flags, nullptr);
/* Reset manual failover state. */
resetManualFailover();
} else {
/* Set the master. */ /* Set the master. */
clusterSetMaster(n); clusterSetMaster(n);
}
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok); addReply(c,shared.ok);
} else if ((!strcasecmp(szFromObj(c->argv[1]),"slaves") || } else if ((!strcasecmp(szFromObj(c->argv[1]),"slaves") ||

View File

@ -770,6 +770,15 @@ void loadServerConfigFromString(char *config) {
} }
for (int i = 1; i < argc; i++) for (int i = 1; i < argc; i++)
g_pserver->tls_allowlist.emplace(argv[i], strlen(argv[i])); g_pserver->tls_allowlist.emplace(argv[i], strlen(argv[i]));
} else if (!strcasecmp(argv[0], "tls-auditlog-blocklist")) {
if (argc < 2) {
err = "must supply at least one element in the block list"; goto loaderr;
}
if (!g_pserver->tls_auditlog_blocklist.empty()) {
err = "tls-auditlog-blocklist may only be set once"; goto loaderr;
}
for (int i = 1; i < argc; i++)
g_pserver->tls_auditlog_blocklist.emplace(argv[i], strlen(argv[i]));
} else if (!strcasecmp(argv[0], "version-override") && argc == 2) { } else if (!strcasecmp(argv[0], "version-override") && argc == 2) {
KEYDB_SET_VERSION = zstrdup(argv[1]); KEYDB_SET_VERSION = zstrdup(argv[1]);
serverLog(LL_WARNING, "Warning version is overriden to: %s\n", KEYDB_SET_VERSION); serverLog(LL_WARNING, "Warning version is overriden to: %s\n", KEYDB_SET_VERSION);
@ -2112,7 +2121,10 @@ static void sdsConfigGet(client *c, typeData data) {
} }
static void sdsConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { static void sdsConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
rewriteConfigSdsOption(state, name, *(data.sds.config), data.sds.default_value ? sdsnew(data.sds.default_value) : NULL); sds sdsDefault = data.sds.default_value ? sdsnew(data.sds.default_value) : NULL;
rewriteConfigSdsOption(state, name, *(data.sds.config), sdsDefault);
if (sdsDefault)
sdsfree(sdsDefault);
} }
@ -2907,6 +2919,7 @@ standardConfig configs[] = {
/* Unsigned int configs */ /* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL), createUIntConfig("loading-process-events-interval-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->loading_process_events_interval_keys, 8192, MEMORY_CONFIG, NULL, NULL),
createUIntConfig("maxclients-reserved", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->maxclientsReserved, 0, INTEGER_CONFIG, NULL, NULL),
/* Unsigned Long configs */ /* Unsigned Long configs */
createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, cserver.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */ createULongConfig("active-defrag-max-scan-fields", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, cserver.active_defrag_max_scan_fields, 1000, INTEGER_CONFIG, NULL, NULL), /* Default: keys with more than 1000 fields will be processed separately */

View File

@ -161,6 +161,11 @@ static void connSocketClose(connection *conn) {
return; return;
} }
if (conn->fprint) {
zfree(conn->fprint);
conn->fprint = NULL;
}
zfree(conn); zfree(conn);
} }

View File

@ -51,6 +51,7 @@ typedef enum {
#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */ #define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
#define CONN_FLAG_READ_THREADSAFE (1<<2) #define CONN_FLAG_READ_THREADSAFE (1<<2)
#define CONN_FLAG_WRITE_THREADSAFE (1<<3) #define CONN_FLAG_WRITE_THREADSAFE (1<<3)
#define CONN_FLAG_AUDIT_LOGGING_REQUIRED (1<<4)
#define CONN_TYPE_SOCKET 1 #define CONN_TYPE_SOCKET 1
#define CONN_TYPE_TLS 2 #define CONN_TYPE_TLS 2
@ -86,6 +87,7 @@ struct connection {
ConnectionCallbackFunc write_handler; ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler; ConnectionCallbackFunc read_handler;
int fd; int fd;
char* fprint;
}; };
/* The connection module does not deal with listening and accepting sockets, /* The connection module does not deal with listening and accepting sockets,

View File

@ -1177,6 +1177,11 @@ int chooseBestThreadForAccept()
void clientAcceptHandler(connection *conn) { void clientAcceptHandler(connection *conn) {
client *c = (client*)connGetPrivateData(conn); client *c = (client*)connGetPrivateData(conn);
if (conn->flags & CONN_FLAG_AUDIT_LOGGING_REQUIRED) {
c->flags |= CLIENT_AUDIT_LOGGING;
c->fprint = conn->fprint;
}
if (connGetState(conn) != CONN_STATE_CONNECTED) { if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Error accepting a client connection: %s", "Error accepting a client connection: %s",
@ -1240,6 +1245,7 @@ void clientAcceptHandler(connection *conn) {
#define MAX_ACCEPTS_PER_CALL 1000 #define MAX_ACCEPTS_PER_CALL 1000
#define MAX_ACCEPTS_PER_CALL_TLS 100 #define MAX_ACCEPTS_PER_CALL_TLS 100
static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) { static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) {
client *c; client *c;
char conninfo[100]; char conninfo[100];
@ -1276,8 +1282,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
* called, because we don't want to even start transport-level negotiation * called, because we don't want to even start transport-level negotiation
* if rejected. */ * if rejected. */
if (listLength(g_pserver->clients) + getClusterConnectionsCount() if (listLength(g_pserver->clients) + getClusterConnectionsCount()
>= g_pserver->maxclients) >= (g_pserver->maxclients - g_pserver->maxclientsReserved))
{ {
// Allow the connection if it comes from localhost and we're within the maxclientReserved buffer range
if ((listLength(g_pserver->clients) + getClusterConnectionsCount()) >= g_pserver->maxclients || strcmp("127.0.0.1", ip)) {
const char *err; const char *err;
if (g_pserver->cluster_enabled) if (g_pserver->cluster_enabled)
err = "-ERR max number of clients + cluster " err = "-ERR max number of clients + cluster "
@ -1295,6 +1303,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
connClose(conn); connClose(conn);
return; return;
} }
}
/* Create connection and client */ /* Create connection and client */
if ((c = createClient(conn, iel)) == NULL) { if ((c = createClient(conn, iel)) == NULL) {

View File

@ -445,7 +445,7 @@ extern "C" void clusterManagerWaitForClusterJoin(void) {
int counter = 0, int counter = 0,
check_after = CLUSTER_JOIN_CHECK_AFTER + check_after = CLUSTER_JOIN_CHECK_AFTER +
(int)(listLength(cluster_manager.nodes) * 0.15f); (int)(listLength(cluster_manager.nodes) * 0.15f);
while(!clusterManagerIsConfigConsistent()) { while(!clusterManagerIsConfigConsistent(0 /*fLog*/)) {
printf("."); printf(".");
fflush(stdout); fflush(stdout);
sleep(1); sleep(1);
@ -588,7 +588,7 @@ extern "C" int clusterManagerCheckCluster(int quiet) {
int do_fix = config.cluster_manager_command.flags & int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX; CLUSTER_MANAGER_CMD_FLAG_FIX;
if (!quiet) clusterManagerShowNodes(); if (!quiet) clusterManagerShowNodes();
consistent = clusterManagerIsConfigConsistent(); consistent = clusterManagerIsConfigConsistent(1 /*fLog*/);
if (!consistent) { if (!consistent) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!"); sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
clusterManagerOnError(err); clusterManagerOnError(err);

View File

@ -1618,6 +1618,8 @@ static int parseOptions(int argc, char **argv) {
fprintf(stderr, "Unknown --show-pushes value '%s' " fprintf(stderr, "Unknown --show-pushes value '%s' "
"(valid: '[y]es', '[n]o')\n", argval); "(valid: '[y]es', '[n]o')\n", argval);
} }
} else if (!strcmp(argv[i],"--force")) {
config.force_mode = 1;
} else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') { } else if (CLUSTER_MANAGER_MODE() && argv[i][0] != '-') {
if (config.cluster_manager_command.argc == 0) { if (config.cluster_manager_command.argc == 0) {
int j = i + 1; int j = i + 1;
@ -1793,6 +1795,7 @@ static void usage(void) {
" --verbose Verbose mode.\n" " --verbose Verbose mode.\n"
" --no-auth-warning Don't show warning message when using password on command\n" " --no-auth-warning Don't show warning message when using password on command\n"
" line interface.\n" " line interface.\n"
" --force Ignore validation and safety checks\n"
" --help Output this help and exit.\n" " --help Output this help and exit.\n"
" --version Output version and exit.\n" " --version Output version and exit.\n"
"\n"); "\n");
@ -3993,12 +3996,13 @@ cleanup:
return signature; return signature;
} }
int clusterManagerIsConfigConsistent(void) { int clusterManagerIsConfigConsistent(int fLog) {
if (cluster_manager.nodes == NULL) return 0; if (cluster_manager.nodes == NULL) return 0;
int consistent = (listLength(cluster_manager.nodes) <= 1); int consistent = (listLength(cluster_manager.nodes) <= 1);
// If the Cluster has only one node, it's always consistent // If the Cluster has only one node, it's always consistent
if (consistent) return 1; if (consistent) return 1;
sds first_cfg = NULL; sds first_cfg = NULL;
const char *firstNode = NULL;
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(cluster_manager.nodes, &li); listRewind(cluster_manager.nodes, &li);
@ -4009,10 +4013,14 @@ int clusterManagerIsConfigConsistent(void) {
consistent = 0; consistent = 0;
break; break;
} }
if (first_cfg == NULL) first_cfg = cfg; if (first_cfg == NULL) {
else { first_cfg = cfg;
firstNode = node->name;
} else {
consistent = !sdscmp(first_cfg, cfg); consistent = !sdscmp(first_cfg, cfg);
sdsfree(cfg); sdsfree(cfg);
if (fLog && !consistent)
clusterManagerLogInfo("\tNode %s (%s:%d) is inconsistent with %s\n", node->name, node->ip, node->port, firstNode);
if (!consistent) break; if (!consistent) break;
} }
} }
@ -5161,7 +5169,7 @@ static int clusterManagerCommandReshard(int argc, char **argv) {
clusterManagerNode *node = clusterManagerNewNode(ip, port); clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0; if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
clusterManagerCheckCluster(0); clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { if (cluster_manager.errors && listLength(cluster_manager.errors) > 0 && !config.force_mode) {
fflush(stdout); fflush(stdout);
fprintf(stderr, fprintf(stderr,
"*** Please fix your cluster problems before resharding\n"); "*** Please fix your cluster problems before resharding\n");
@ -5394,7 +5402,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
if (weightedNodes == NULL) goto cleanup; if (weightedNodes == NULL) goto cleanup;
/* Check cluster, only proceed if it looks sane. */ /* Check cluster, only proceed if it looks sane. */
clusterManagerCheckCluster(1); clusterManagerCheckCluster(1);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) { if (cluster_manager.errors && listLength(cluster_manager.errors) > 0 && !config.force_mode) {
clusterManagerLogErr("*** Please fix your cluster problems " clusterManagerLogErr("*** Please fix your cluster problems "
"before rebalancing\n"); "before rebalancing\n");
result = 0; result = 0;
@ -7185,6 +7193,7 @@ int main(int argc, char **argv) {
config.set_errcode = 0; config.set_errcode = 0;
config.no_auth_warning = 0; config.no_auth_warning = 0;
config.in_multi = 0; config.in_multi = 0;
config.force_mode = 0;
config.cluster_manager_command.name = NULL; config.cluster_manager_command.name = NULL;
config.cluster_manager_command.argc = 0; config.cluster_manager_command.argc = 0;
config.cluster_manager_command.argv = NULL; config.cluster_manager_command.argv = NULL;

View File

@ -195,6 +195,7 @@ extern struct config {
int in_multi; int in_multi;
int pre_multi_dbnum; int pre_multi_dbnum;
int quoted_input; /* Force input args to be treated as quoted strings */ int quoted_input; /* Force input args to be treated as quoted strings */
int force_mode;
} config; } config;
struct clusterManager { struct clusterManager {
@ -282,7 +283,7 @@ int clusterManagerFixOpenSlot(int slot);
void clusterManagerPrintSlotsList(list *slots); void clusterManagerPrintSlotsList(list *slots);
int clusterManagerGetCoveredSlots(char *all_slots); int clusterManagerGetCoveredSlots(char *all_slots);
void clusterManagerOnError(sds err); void clusterManagerOnError(sds err);
int clusterManagerIsConfigConsistent(void); int clusterManagerIsConfigConsistent(int fLog);
void freeClusterManagerNode(clusterManagerNode *node); void freeClusterManagerNode(clusterManagerNode *node);
void clusterManagerLog(int level, const char* fmt, ...); void clusterManagerLog(int level, const char* fmt, ...);
int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr, int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,

View File

@ -410,7 +410,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) { if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
// This is an emergency overflow, we better resize to fit // This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing from %lld to %lld bytes", g_pserver->repl_backlog_size, newsize);
resizeReplicationBacklog(newsize); resizeReplicationBacklog(newsize);
} else if (!listening_replicas) { } else if (!listening_replicas) {
// We need to update a few variables or later asserts will notice we dropped data // We need to update a few variables or later asserts will notice we dropped data

View File

@ -5082,10 +5082,29 @@ int processCommand(client *c, int callFlags) {
} else { } else {
/* If the command was replication or admin related we *must* flush our buffers first. This is in case /* If the command was replication or admin related we *must* flush our buffers first. This is in case
something happens which would modify what we would send to replicas */ something happens which would modify what we would send to replicas */
if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN)) if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN))
flushReplBacklogToClients(); flushReplBacklogToClients();
if (c->flags & CLIENT_AUDIT_LOGGING){
getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, &result);
int *keyindex = result.keys;
sds str = sdsempty();
for (int j = 0; j < numkeys; j++) {
sdscatsds(str, (sds)ptrFromObj(c->argv[keyindex[j]]));
sdscat(str, " ");
}
if (numkeys > 0)
{
serverLog(LL_NOTICE, "Audit Log: %s, cmd %s, keys: %s", c->fprint, c->cmd->name, str);
} else {
serverLog(LL_NOTICE, "Audit Log: %s, cmd %s", c->fprint, c->cmd->name);
}
sdsfree(str);
}
call(c,callFlags); call(c,callFlags);
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
@ -5818,15 +5837,6 @@ sds genRedisInfoString(const char *section) {
g_pserver->m_pstorageFactory ? g_pserver->m_pstorageFactory->name() : "none" g_pserver->m_pstorageFactory ? g_pserver->m_pstorageFactory->name() : "none"
); );
freeMemoryOverheadData(mh); freeMemoryOverheadData(mh);
if (g_pserver->m_pstorageFactory)
{
info = sdscatprintf(info,
"%s_memory:%zu\r\n",
g_pserver->m_pstorageFactory->name(),
g_pserver->m_pstorageFactory->totalDiskspaceUsed()
);
}
} }
/* Persistence */ /* Persistence */
@ -5950,6 +5960,10 @@ sds genRedisInfoString(const char *section) {
(intmax_t)eta (intmax_t)eta
); );
} }
if (g_pserver->m_pstorageFactory)
{
info = sdscat(info, g_pserver->m_pstorageFactory->getInfo().get());
}
} }
/* Stats */ /* Stats */

View File

@ -545,6 +545,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants #define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants
RDB without replication buffer. */ RDB without replication buffer. */
#define CLIENT_FORCE_REPLY (1ULL<<44) /* Should addReply be forced to write the text? */ #define CLIENT_FORCE_REPLY (1ULL<<44) /* Should addReply be forced to write the text? */
#define CLIENT_AUDIT_LOGGING (1ULL<<45) /* Client commands required audit logging */
/* Client block type (btype field in client structure) /* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */ * if CLIENT_BLOCKED flag is set. */
@ -1713,6 +1714,7 @@ struct client {
size_t argv_len_sum() const; size_t argv_len_sum() const;
bool asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, const std::vector<robj_sharedptr> &)> &&mainFn, bool asyncCommand(std::function<void(const redisDbPersistentDataSnapshot *, const std::vector<robj_sharedptr> &)> &&mainFn,
std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn = nullptr); std::function<void(const redisDbPersistentDataSnapshot *)> &&postFn = nullptr);
char* fprint;
}; };
struct saveparam { struct saveparam {
@ -2568,6 +2570,7 @@ struct redisServer {
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */ /* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */ unsigned int maxclients; /* Max number of simultaneous clients */
unsigned int maxclientsReserved; /* Reserved amount for health checks (localhost conns) */
unsigned long long maxmemory; /* Max number of memory bytes to use */ unsigned long long maxmemory; /* Max number of memory bytes to use */
unsigned long long maxstorage; /* Max number of bytes to use in a storage provider */ unsigned long long maxstorage; /* Max number of bytes to use in a storage provider */
int maxmemory_policy; /* Policy for key eviction */ int maxmemory_policy; /* Policy for key eviction */
@ -2707,6 +2710,7 @@ struct redisServer {
int tls_auth_clients; int tls_auth_clients;
int tls_rotation; int tls_rotation;
std::set<sdsstring> tls_auditlog_blocklist; /* Certificates that can be excluded from audit logging */
std::set<sdsstring> tls_allowlist; std::set<sdsstring> tls_allowlist;
redisTLSContextConfig tls_ctx_config; redisTLSContextConfig tls_ctx_config;

View File

@ -18,6 +18,7 @@ public:
virtual const char *name() const override; virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override; virtual size_t totalDiskspaceUsed() const override;
virtual sdsstring getInfo() const override;
virtual bool FSlow() const override { return true; } virtual bool FSlow() const override { return true; }

View File

@ -9,6 +9,7 @@
#include "rocksdbfactor_internal.h" #include "rocksdbfactor_internal.h"
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/statvfs.h>
rocksdb::Options DefaultRocksDBOptions() { rocksdb::Options DefaultRocksDBOptions() {
rocksdb::Options options; rocksdb::Options options;
@ -203,3 +204,21 @@ size_t RocksDBStorageFactory::totalDiskspaceUsed() const
{ {
return m_pfilemanager->GetTotalSize(); return m_pfilemanager->GetTotalSize();
} }
sdsstring RocksDBStorageFactory::getInfo() const
{
struct statvfs fiData;
int status = statvfs(m_path.c_str(), &fiData);
if ( status == 0 ) {
return sdsstring(sdscatprintf(sdsempty(),
"storage_flash_used_bytes:%zu\r\n"
"storage_flash_total_bytes:%zu\r\n"
"storage_flash_rocksdb_bytes:%zu\r\n",
fiData.f_bfree * fiData.f_frsize,
fiData.f_blocks * fiData.f_frsize,
totalDiskspaceUsed()));
} else {
fprintf(stderr, "Failed to gather FLASH statistics with status: %d\r\n", status);
return sdsstring(sdsempty());
}
}

View File

@ -8,6 +8,7 @@ class TestStorageFactory : public IStorageFactory
virtual class IStorage *createMetadataDb() override; virtual class IStorage *createMetadataDb() override;
virtual const char *name() const override; virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override { return 0; } virtual size_t totalDiskspaceUsed() const override { return 0; }
virtual sdsstring getInfo() const override { return sdsstring(sdsempty()); }
virtual bool FSlow() const override { return false; } virtual bool FSlow() const override { return false; }
}; };

View File

@ -517,19 +517,38 @@ typedef struct tls_connection {
aeEventLoop *el; aeEventLoop *el;
} tls_connection; } tls_connection;
/* Check to see if a given client name matches against our allowlist. /* Check to see if a given client name is contained in the provided set (allowlist/blocklist)
* Return true if it does */ * Return true if it does */
bool tlsCheckAgainstAllowlist(const char * client){ bool tlsCheckAgainstAllowlist(const char * client, std::set<sdsstring> set){
/* Because of wildcard matching, we need to iterate over the entire set. /* Because of wildcard matching, we need to iterate over the entire set.
* If we were doing simply straight matching, we could just directly * If we were doing simply straight matching, we could just directly
* check to see if the client name is in the set in O(1) time */ * check to see if the client name is in the set in O(1) time */
for (auto &client_pattern: g_pserver->tls_allowlist){ for (auto &client_pattern: set){
if (stringmatchlen(client_pattern.get(), client_pattern.size(), client, strlen(client), 1)) if (stringmatchlen(client_pattern.get(), client_pattern.size(), client, strlen(client), 1))
return true; return true;
} }
return false; return false;
} }
/* Sets the sha256 certificate fingerprint on the connection
* Based on the example here https://fm4dd.com/openssl/certfprint.shtm */
void tlsSetCertificateFingerprint(tls_connection* conn, X509 * cert) {
unsigned int fprint_size;
unsigned char fprint[EVP_MAX_MD_SIZE];
const EVP_MD *fprint_type = EVP_sha256();
X509_digest(cert, fprint_type, fprint, &fprint_size);
if (conn->c.fprint) zfree(conn->c.fprint);
conn->c.fprint = (char*)zcalloc(fprint_size*2+1);
/* Format fingerprint as hex string */
char tmp[3];
for (unsigned int i = 0; i < fprint_size; i++) {
snprintf(tmp, 2, "%02x", (unsigned int)fprint[i]);
strncat(conn->c.fprint, tmp, 2);
}
}
/* ASN1_STRING_get0_data was introduced in OPENSSL 1.1.1 /* ASN1_STRING_get0_data was introduced in OPENSSL 1.1.1
* use ASN1_STRING_data for older versions where it is not available */ * use ASN1_STRING_data for older versions where it is not available */
#if OPENSSL_VERSION_NUMBER < 0x10100000L #if OPENSSL_VERSION_NUMBER < 0x10100000L
@ -549,19 +568,24 @@ public:
} }
}; };
bool tlsValidateCertificateName(tls_connection* conn){ bool tlsCheckCertificateAgainstAllowlist(tls_connection* conn, std::set<sdsstring> allowlist, const char** commonName){
if (g_pserver->tls_allowlist.empty()) if (allowlist.empty()){
return true; // Empty list implies acceptance of all // An empty list implies acceptance of all
return true;
}
X509 * cert = SSL_get_peer_certificate(conn->ssl); X509 * cert = SSL_get_peer_certificate(conn->ssl);
TCleanup certClen([cert]{X509_free(cert);}); TCleanup certClen([cert]{X509_free(cert);});
/* Check the common name (CN) of the certificate first */ /* Check the common name (CN) of the certificate first */
X509_NAME_ENTRY * ne = X509_NAME_get_entry(X509_get_subject_name(cert), X509_NAME_get_index_by_NID(X509_get_subject_name(cert), NID_commonName, -1)); X509_NAME_ENTRY * ne = X509_NAME_get_entry(X509_get_subject_name(cert), X509_NAME_get_index_by_NID(X509_get_subject_name(cert), NID_commonName, -1));
const char * commonName = reinterpret_cast<const char*>(ASN1_STRING_get0_data(X509_NAME_ENTRY_get_data(ne))); *commonName = reinterpret_cast<const char*>(ASN1_STRING_get0_data(X509_NAME_ENTRY_get_data(ne)));
if (tlsCheckAgainstAllowlist(commonName)) tlsSetCertificateFingerprint(conn, cert);
if (tlsCheckAgainstAllowlist(*commonName, allowlist)) {
return true; return true;
}
/* If that fails, check through the subject alternative names (SANs) as well */ /* If that fails, check through the subject alternative names (SANs) as well */
GENERAL_NAMES* subjectAltNames = (GENERAL_NAMES*)X509_get_ext_d2i(cert, NID_subject_alt_name, NULL, NULL); GENERAL_NAMES* subjectAltNames = (GENERAL_NAMES*)X509_get_ext_d2i(cert, NID_subject_alt_name, NULL, NULL);
@ -574,19 +598,19 @@ bool tlsValidateCertificateName(tls_connection* conn){
switch (generalName->type) switch (generalName->type)
{ {
case GEN_EMAIL: case GEN_EMAIL:
if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.rfc822Name)))){ if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.rfc822Name)), allowlist)){
sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free); sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free);
return true; return true;
} }
break; break;
case GEN_DNS: case GEN_DNS:
if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.dNSName)))){ if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.dNSName)), allowlist)){
sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free); sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free);
return true; return true;
} }
break; break;
case GEN_URI: case GEN_URI:
if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.uniformResourceIdentifier)))){ if (tlsCheckAgainstAllowlist(reinterpret_cast<const char*>(ASN1_STRING_get0_data(generalName->d.uniformResourceIdentifier)), allowlist)){
sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free); sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free);
return true; return true;
} }
@ -597,7 +621,7 @@ bool tlsValidateCertificateName(tls_connection* conn){
if (ipLen == 4){ //IPv4 case if (ipLen == 4){ //IPv4 case
char addr[INET_ADDRSTRLEN]; char addr[INET_ADDRSTRLEN];
inet_ntop(AF_INET, ASN1_STRING_get0_data(generalName->d.iPAddress), addr, INET_ADDRSTRLEN); inet_ntop(AF_INET, ASN1_STRING_get0_data(generalName->d.iPAddress), addr, INET_ADDRSTRLEN);
if (tlsCheckAgainstAllowlist(addr)){ if (tlsCheckAgainstAllowlist(addr, allowlist)){
sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free); sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free);
return true; return true;
} }
@ -613,13 +637,34 @@ bool tlsValidateCertificateName(tls_connection* conn){
sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free); sk_GENERAL_NAME_pop_free(subjectAltNames, GENERAL_NAME_free);
} }
return false;
}
bool tlsCertificateRequiresAuditLogging(tls_connection* conn){
const char* cn = "";
if (tlsCheckCertificateAgainstAllowlist(conn, g_pserver->tls_auditlog_blocklist, &cn)) {
// Certificate is in exclusion list, no need to audit log
serverLog(LL_NOTICE, "Audit Log: disabled for %s", conn->c.fprint);
return false;
} else {
serverLog(LL_NOTICE, "Audit Log: enabled for %s", conn->c.fprint);
return true;
}
}
bool tlsValidateCertificateName(tls_connection* conn){
const char* cn = "";
if (tlsCheckCertificateAgainstAllowlist(conn, g_pserver->tls_allowlist, &cn)) {
return true;
} else {
/* If neither the CN nor the SANs match, update the SSL error and return false */ /* If neither the CN nor the SANs match, update the SSL error and return false */
conn->c.last_errno = 0; conn->c.last_errno = 0;
if (conn->ssl_error) zfree(conn->ssl_error); if (conn->ssl_error) zfree(conn->ssl_error);
size_t bufsize = 512; size_t bufsize = 512;
conn->ssl_error = (char*)zmalloc(bufsize); conn->ssl_error = (char*)zmalloc(bufsize);
snprintf(conn->ssl_error, bufsize, "Client CN (%s) and SANs not found in allowlist.", commonName); snprintf(conn->ssl_error, bufsize, "Client CN (%s) and SANs not found in allowlist.", cn);
return false; return false;
}
} }
static connection *createTLSConnection(int client_side) { static connection *createTLSConnection(int client_side) {
@ -844,6 +889,9 @@ void tlsHandleEvent(tls_connection *conn, int mask) {
conn->c.state = CONN_STATE_ERROR; conn->c.state = CONN_STATE_ERROR;
} else { } else {
conn->c.state = CONN_STATE_CONNECTED; conn->c.state = CONN_STATE_CONNECTED;
if (tlsCertificateRequiresAuditLogging(conn)){
conn->c.flags |= CONN_FLAG_AUDIT_LOGGING_REQUIRED;
}
} }
} }

View File

@ -81,6 +81,7 @@ set ::all_tests {
unit/pendingquerybuf unit/pendingquerybuf
unit/tls unit/tls
unit/tls-name-validation unit/tls-name-validation
unit/tls-auditlog
unit/tracking unit/tracking
unit/oom-score-adj unit/oom-score-adj
unit/shutdown unit/shutdown

159
tests/unit/tls-auditlog.tcl Normal file
View File

@ -0,0 +1,159 @@
# only run this test if tls is enabled
if {$::tls} {
package require tls
test {TLS Audit Log: Able to connect with no exclustion list} {
start_server {tags {"tls"}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with exclusion list '*'} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist *}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching CN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist client.keydb.dev}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching SAN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist san1.keydb.dev}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching CN with wildcard} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist client*.dev}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching SAN with wildcard} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist san*.dev}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect while with CN having a comprehensive list} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.keydb.dev client.keydb.dev other.keydb.dev}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS: Able to connect while with SAN having a comprehensive list} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.keydb.dev san2.keydb.dev other.keydb.dev}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect while with CN having a comprehensive list with wildcards} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.* client*.dev other.*}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit LogTLS: Able to connect while with SAN having a comprehensive list with wildcards} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.* san*.dev other.*}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Not matching CN or SAN accepted} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {client.keydb.dev}}} {
catch {r PING}
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to match against DNS SAN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {san1.keydb.dev}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to match against email SAN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {someone@keydb.dev}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to match against IPv4 SAN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {192.168.0.1}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to match against IPv4 with a wildcard} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {192.*}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to match against URI SAN} {
start_server {tags {"tls"} overrides {tls-allowlist {https://keydb.dev}}} {
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching CN} {
start_server {tags {"tls"} overrides {tls-auditlog-blocklist test.dev}} {
r set testkey foo
wait_for_condition 50 1000 {
[log_file_matches [srv 0 stdout] "*Audit Log: *, cmd set, keys: testkey*"]
} else {
fail "Missing expected Audit Log entry"
}
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with matching TLS allowlist and Audit Log blocklist} {
start_server {tags {"tls"} overrides {tls-allowlist client.keydb.dev tls-auditlog-blocklist client.keydb.dev}} {
r set testkey foo
if {[log_file_matches [srv 0 stdout] "*Audit Log: *, cmd set, keys: testkey*"]} {
fail "Unexpected Audit Log entry"
}
catch {r PING} e
assert_match {PONG} $e
}
}
test {TLS Audit Log: Able to connect with different TLS allowlist and Audit Log blocklist} {
start_server {tags {"tls"} overrides {tls-allowlist client.keydb.dev tls-auditlog-blocklist test.dev}} {
r set testkey foo
wait_for_condition 50 1000 {
[log_file_matches [srv 0 stdout] "*Audit Log: *, cmd set, keys: testkey*"]
} else {
fail "Missing expected Audit Log entry"
}
catch {r PING} e
assert_match {PONG} $e
}
}
} else {
start_server {} {
# just a dummy server so that the test doesn't panic if tls is disabled
# otherwise the test will try to bind to a server that just isn't there
}
}