Adds JSON.MSET command (#39)

#### JSON.MSET
Set JSON values for multiple keys. The operation is atomic. Either all values are set or none is set.

#### Syntax
```
JSON.MSET <key> <path> <value> [key <path> <value> ...]
```

* key - required, JSON key
* path - required, a JSON path
* value - required, JSON value
#### Return
* Simple String 'OK' on success
* Error on failure
---------

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
This commit is contained in:
Roshan Khatri 2025-03-03 15:19:36 -08:00 committed by GitHub
parent 87de05e8be
commit 10d2793bb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 362 additions and 6 deletions

View File

@ -169,6 +169,22 @@ void dom_serialize_value(const JValue &val, const PrintFormat *format, rapidjson
serialize_value(val, 0, format, oss);
}
JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json,
size_t new_val_size) {
Selector selector;
JsonUtilCode rc = selector.prepareSetValues(doc->GetJValue(), json_path);
if (rc != JSONUTIL_SUCCESS) return rc;
JParser new_val;
if (new_val.Parse(new_val_json, new_val_size).HasParseError()) {
return new_val.GetParseErrorCode();
}
CHECK_DOCUMENT_PATH_LIMIT(ctx, selector, new_val);
CHECK_DOCUMENT_SIZE_LIMIT(ctx, doc->size, new_val.GetJValueSize());
return JSONUTIL_SUCCESS;
}
JsonUtilCode dom_set_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json,
size_t new_val_size, const bool is_create_only, const bool is_update_only) {
if (is_create_only && is_update_only) return JSONUTIL_NX_XX_SHOULD_BE_MUTUALLY_EXCLUSIVE;

View File

@ -253,6 +253,20 @@ void dom_serialize_value(const JValue &val, const PrintFormat *format, rapidjson
*/
JValue& dom_get_value(JDocument &doc);
/* Verify value at the path.
* @param json_path: path that is compliant to the JSON Path syntax.
* @param is_create_only - indicates to create a new value.
* @param is_update_only - indicates to update an existing value.
* @return JSONUTIL_SUCCESS for success, other code for failure.
*/
JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json,
size_t new_val_len);
inline JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json) {
return dom_verify_value(ctx, doc, json_path, new_val_json, strlen(new_val_json));
}
/* Set value at the path.
* @param json_path: path that is compliant to the JSON Path syntax.
* @param is_create_only - indicates to create a new value.

View File

@ -262,6 +262,98 @@ STATIC JsonUtilCode parseSetCmdArgs(ValkeyModuleString **argv, const int argc, S
return JSONUTIL_SUCCESS;
}
typedef struct {
ValkeyModuleString *key_str; // Required
ValkeyModuleKey *key;
const char *path; // Required
const char *json; // Required
size_t json_len;
bool is_root_path;
} MSetCmdArgs;
STATIC JsonUtilCode parseAndValidateMSetCmdArgs(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, const int argc, MSetCmdArgs **args_list, size_t *num_keys) {
// Validate that the number of arguments is correct for MSET (3 arguments per key)
if ((argc - 1) % 3 != 0) {
return JSONUTIL_WRONG_NUM_ARGS;
}
*num_keys = (argc - 1) / 3;
// Allocate memory for args_list
*args_list = reinterpret_cast<MSetCmdArgs *>(ValkeyModule_Alloc((*num_keys) * sizeof(MSetCmdArgs)));
memset(*args_list, 0, (*num_keys) * sizeof(MSetCmdArgs));
if (!(*args_list)) {
return JSONUTIL_ALLOCATION_FAILURE;
}
JsonUtilCode rc;
// Parse and validate arguments for each key
size_t i;
for (i = 0; i < *num_keys; ++i) {
MSetCmdArgs &current_arg = (*args_list)[i];
current_arg.key_str = argv[i * 3 + 1];
current_arg.key = static_cast<ValkeyModuleKey*>(
ValkeyModule_OpenKey(ctx, current_arg.key_str, VALKEYMODULE_READ | VALKEYMODULE_WRITE));
// Handle key allocation failure
if (!current_arg.key) {
rc = JSONUTIL_KEY_OPEN_ERROR;
return rc;
}
current_arg.path = ValkeyModule_StringPtrLen(argv[i * 3 + 2], nullptr);
current_arg.json = ValkeyModule_StringPtrLen(argv[i * 3 + 3], &current_arg.json_len);
// Validate key type
int type = ValkeyModule_KeyType(current_arg.key);
if (type != VALKEYMODULE_KEYTYPE_EMPTY &&
ValkeyModule_ModuleTypeGetType(current_arg.key) != DocumentType) {
rc = JSONUTIL_NOT_A_DOCUMENT_KEY;
return rc;
}
// Root path validation
bool is_new_valkey_key = (type == VALKEYMODULE_KEYTYPE_EMPTY);
current_arg.is_root_path = jsonutil_is_root_path(current_arg.path);
if (is_new_valkey_key && !current_arg.is_root_path) {
rc = JSONUTIL_COMMAND_SYNTAX_ERROR;
return rc;
}
// Validate JSON structure
if (current_arg.is_root_path) {
JDocument *doc = nullptr;
rc = dom_parse(ctx, current_arg.json, current_arg.json_len, &doc);
if (rc != JSONUTIL_SUCCESS) {
if (doc) dom_free_doc(doc);
return rc;
}
if (json_is_instrument_enabled_insert() || json_is_instrument_enabled_update()) {
size_t len;
const char* key_cstr = ValkeyModule_StringPtrLen(current_arg.key_str, &len);
std::size_t key_hash = std::hash<std::string_view>{}(std::string_view(key_cstr, len));
ValkeyModule_Log(ctx, "warning",
"Dump document structure before setting JSON key (hashed) %zu whole doc %p:",
key_hash, static_cast<void*>(doc));
DumpRedactedJValue(doc->GetJValue(), nullptr, "warning");
}
dom_free_doc(doc);
} else {
JDocument *doc = static_cast<JDocument*>(ValkeyModule_ModuleTypeGetValue(current_arg.key));
if (!doc) {
rc = JSONUTIL_DOCUMENT_KEY_NOT_FOUND;
return rc;
}
rc = dom_verify_value(ctx, doc, current_arg.path, current_arg.json);
if (rc != JSONUTIL_SUCCESS) {
return rc;
}
}
}
return JSONUTIL_SUCCESS;
}
STATIC JsonUtilCode parseGetCmdArgs(ValkeyModuleString **argv, const int argc, ValkeyModuleString **key,
PrintFormat *format, ValkeyModuleString ***paths, int *num_paths) {
*key = nullptr;
@ -629,6 +721,63 @@ int Command_JsonSet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
int Command_JsonMSet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
ValkeyModule_AutoMemory(ctx);
MSetCmdArgs *args_list = nullptr;
size_t num_keys;
JsonUtilCode rc = parseAndValidateMSetCmdArgs(ctx, argv, argc, &args_list, &num_keys);
if (rc != JSONUTIL_SUCCESS) {
ValkeyModule_Free(args_list);
if (rc == JSONUTIL_WRONG_NUM_ARGS)
return ValkeyModule_WrongArity(ctx);
else
return ValkeyModule_ReplyWithError(ctx, jsonutil_code_to_message(rc));
}
// Apply changes
size_t i;
for (i = 0; i < num_keys; i++) {
// begin tracking memory
int64_t begin_val = jsonstats_begin_track_mem();
if (args_list[i].is_root_path) { // Root document
// parse incoming JSON string
JDocument *doc;
rc = dom_parse(ctx, args_list[i].json, args_list[i].json_len, &doc);
ValkeyModule_Assert(rc == JSONUTIL_SUCCESS);
int64_t delta = jsonstats_end_track_mem(begin_val);
size_t doc_size = dom_get_doc_size(doc) + delta;
dom_set_doc_size(doc, doc_size);
// Set Valkey key
ValkeyModule_ModuleTypeSetValue(args_list[i].key, DocumentType, doc);
// update stats
jsonstats_update_stats_on_insert(doc, true, 0, doc_size, doc_size);
} else { // Update existing document
JDocument *doc = static_cast<JDocument*>(ValkeyModule_ModuleTypeGetValue(args_list[i].key));
size_t orig_doc_size = dom_get_doc_size(doc);
rc = dom_set_value(ctx, doc, args_list[i].path, args_list[i].json, args_list[i].json_len, false, false);
ValkeyModule_Assert(rc == JSONUTIL_SUCCESS);
int64_t delta = jsonstats_end_track_mem(begin_val);
size_t new_doc_size = dom_get_doc_size(doc) + delta;
dom_set_doc_size(doc, new_doc_size);
// update stats
jsonstats_update_stats_on_update(doc, orig_doc_size, new_doc_size, args_list[i].json_len);
}
ValkeyModule_NotifyKeyspaceEvent(ctx, VALKEYMODULE_NOTIFY_GENERIC, "json.mset", args_list[i].key_str);
}
// replicate the entire command
ValkeyModule_ReplicateVerbatim(ctx);
ValkeyModule_Free(args_list);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
int Command_JsonGet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
ValkeyModule_AutoMemory(ctx);
@ -2216,10 +2365,6 @@ void Module_Info(ValkeyModuleInfoCtx *ctx, int for_crash_report) {
} \
}
//
// User visible metrics
//
beginSection("core_metrics")
addULongLong("total_memory_bytes", jsonstats_get_used_mem() + keyTable->getStats().bytes);
addULongLong("num_documents", jsonstats_get_num_doc_keys());
@ -2512,6 +2657,16 @@ extern "C" int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "JSON.MSET", Command_JsonMSet, cmdflg_slow_write_deny, 1, -3, 3)
== VALKEYMODULE_ERR) {
ValkeyModule_Log(ctx, "warning", "Failed to create command JSON.MSET.");
return VALKEYMODULE_ERR;
}
if (ValkeyModule_SetCommandACLCategories(ValkeyModule_GetCommand(ctx,"JSON.MSET"), cat_slow_write_deny) == VALKEYMODULE_ERR) {
ValkeyModule_Log(ctx, "warning", "Failed to mset command category for JSON.MSET.");
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "JSON.GET", Command_JsonGet, cmdflg_readonly, 1, 1, 1) == VALKEYMODULE_ERR) {
ValkeyModule_Log(ctx, "warning", "Failed to create command JSON.GET.");
return VALKEYMODULE_ERR;
@ -2768,6 +2923,9 @@ extern "C" int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx) {
if (!set_command_info(ctx, "JSON.SET", -4, ks_read_write_update, 1, std::make_tuple(0, 1, 0))) {
return VALKEYMODULE_ERR;
}
if (!set_command_info(ctx, "JSON.MSET", -4, ks_read_write_update, 1, std::make_tuple(-3, 3, 0))) {
return VALKEYMODULE_ERR;
}
// Commands under RW + Insert
if (!set_command_info(ctx, "JSON.ARRAPPEND", -4, ks_read_write_insert, 1, std::make_tuple(0, 1, 0))) {
return VALKEYMODULE_ERR;

View File

@ -62,6 +62,8 @@ const char *jsonutil_code_to_message(JsonUtilCode code) {
case JSONUTIL_INVALID_RDB_FORMAT:
return "ERROR Invalid value in RDB format";
case JSONUTIL_DOLLAR_CANNOT_APPLY_TO_NON_ROOT: return "SYNTAXERR Dollar sign cannot apply to non-root element";
case JSONUTIL_ALLOCATION_FAILURE: return "ERROR Memory Allocation failed ";
case JSONUTIL_KEY_OPEN_ERROR: return "ERROR Unable to open valkeymodule key";
default: ValkeyModule_Assert(false);
}
return "";

View File

@ -62,6 +62,8 @@ typedef enum {
JSONUTIL_CANNOT_INSERT_MEMBER_INTO_NON_OBJECT_VALUE,
JSONUTIL_INVALID_RDB_FORMAT,
JSONUTIL_DOLLAR_CANNOT_APPLY_TO_NON_ROOT,
JSONUTIL_ALLOCATION_FAILURE,
JSONUTIL_KEY_OPEN_ERROR,
JSONUTIL_LAST
} JsonUtilCode;

View File

@ -178,8 +178,14 @@ class TestJsonBasic(JsonTestCase):
assert b'OK' == client.execute_command(
'JSON.SET', k2, '.', '[1,2,3,4,5]')
assert [b'{"a":"1","b":"2","c":"3"}', b'[1,2,3,4,5]'] == client.execute_command(
'JSON.MGET', k1, k2, '.')
assert b'OK' == client.execute_command(
'JSON.MSET', k3, '.', '[1,2,3,4,5]')
assert b'OK' == client.execute_command(
'JSON.MSET', k2, '.', '[5,4,3,2,1]', k1, '.', '{"a":"1", "b":"2", "c":"3"}')
assert [b'{"a":"1","b":"2","c":"3"}', b'[5,4,3,2,1]', b'[1,2,3,4,5]'] == client.execute_command(
'JSON.MGET', k1, k2, k3, '.')
assert b'OK' == client.execute_command(
'JSON.SET', k2, '.', '[1,2,3,4,5]')
assert b'{"a":"1","b":"2","c":"3"}' == client.execute_command(
'JSON.GET', k1)
for (key, path, exp) in [
@ -2781,6 +2787,146 @@ class TestJsonBasic(JsonTestCase):
'JSON.SET', k, '.', json_with_size(64*MB))
assert self.error_class.is_limit_exceeded_error(str(e.value))
def test_json_mset_command_supports_all_datatypes(self):
client = self.server.get_new_client()
for (path, value) in [('.address.city', '"Boston"'), # string
# number
('.age', '30'),
('.foo', '[1,2,3]'), # array
# array element access
('.phoneNumbers[0].number', '"1234567"'),
# object
('.foo', '{"a":"b"}'),
('.lastName', 'null'), # null
('.isAlive', 'false')]: # boolean
assert b'OK' == client.execute_command(
'JSON.MSET', wikipedia, path, value)
assert value.encode() == client.execute_command(
'JSON.GET', wikipedia, path)
def test_json_mset_command_root_document(self):
client = self.server.get_new_client()
# path to the root document is represented as '.'
assert b'OK' == client.execute_command(
"JSON.MSET",
k1, ".", '"Boston"',
k2, ".", '123',
k3, ".", '["Seattle","Boston"]',
k4, ".", '[1,2,3]',
k5, ".", '{"a":"b"}',
k6, ".", '{}',
k7, ".", 'null',
k8, ".", 'false')
for (key, value) in [(k1, '"Boston"'), # string
(k2, '123'), # number
(k3, '["Seattle","Boston"]'), # array
(k4, '[1,2,3]'), # array
(k5, '{"a":"b"}'), # object
(k6, '{}'), # empty object
(k7, 'null'), # null
(k8, 'false')]: # boolean
assert value.encode() == client.execute_command('JSON.GET', key)
def test_json_mset_command_with_error_conditions(self):
client = self.server.get_new_client()
# A new Valkey key's path must be root
with pytest.raises(ResponseError) as e:
assert None == client.execute_command(
'JSON.MSET', foo, '.bar', '"bar"')
assert self.error_class.is_syntax_error(str(e.value))
def test_json_mset_command_mixed_path_document(self):
client = self.server.get_new_client()
# path to the root document is represented as '.'
assert b'OK' == client.execute_command(
"JSON.MSET",
k1, ".", '"Boston"',
wikipedia, '.address.city', '"Boston"',
k2, ".", '123',
wikipedia, '.age', '30',
k3, ".", '["Seattle","Boston"]',
k4, ".", '[1,2,3]',
wikipedia, '.phoneNumbers[0].number', '"1234567"',
k5, ".", '{"a":"b"}',
wikipedia, '.foo', '{"a":"b"}',
k6, ".", '{}',
wikipedia, '.lastName', 'null',
k7, ".", 'null',
wikipedia, '.isAlive', 'false',
k8, ".", 'false')
for (key, path, value) in [(k1, ".", '"Boston"'), # string
(k2, ".", '123'), # number
(k3, ".", '["Seattle","Boston"]'), # array
(k4, ".", '[1,2,3]'), # array
(k5, ".", '{"a":"b"}'), # object
(k6, ".", '{}'), # empty object
(k7, ".", 'null'), # null
(k8, ".", 'false'), # boolean
(wikipedia, '.address.city', '"Boston"'),
(wikipedia, '.age', '30'),
(wikipedia, '.phoneNumbers[0].number', '"1234567"'),
(wikipedia, '.foo', '{"a":"b"}'),
(wikipedia, '.lastName', 'null'),
(wikipedia, '.isAlive', 'false')]:
assert value.encode() == client.execute_command(
'JSON.GET', key, path)
def test_json_mset_command_atomicity(self):
client = self.server.get_new_client()
# path to the root document is represented as '.'
assert b'OK' == client.execute_command(
"JSON.MSET",
k1, ".", '"Boston"',
wikipedia, '.address.city', '"Boston"',
k2, ".", '123',
wikipedia, '.age', '30',
k3, ".", '["Seattle","Boston"]',
k4, ".", '[1,2,3]',
wikipedia, '.phoneNumbers[0].number', '"1234567"',
k5, ".", '{"a":"b"}',
wikipedia, '.foo', '{"a":"b"}',
k6, ".", '{}',
wikipedia, '.lastName', 'null',
k7, ".", 'null',
wikipedia, '.isAlive', 'false',
k8, ".", 'false')
with pytest.raises(ResponseError) as e:
assert None == client.execute_command(
"JSON.MSET",
k1, ".", '"Seattle"',
wikipedia, '.address.city', '"Seattle"',
k2, ".", '456',
wikipedia, '.age', '40',
k3, ".", '["Seattle","Chicago"]',
k4, ".", '[4,5,6]',
wikipedia, '.phoneNumbers[0].number', '"56789"',
k5, ".", '{"c":"d"}',
wikipedia, '.foo', '{"c":"d"}',
k6, ".", '{}',
wikipedia, '.lastName', 'null',
k7, ".", 'null',
foo, '.bar', '"bar"',
k8, ".", 'true')
assert self.error_class.is_syntax_error(str(e.value))
for (key, path, value) in [(k1, ".", '"Boston"'), # string
(k2, ".", '123'), # number
(k3, ".", '["Seattle","Boston"]'), # array
(k4, ".", '[1,2,3]'), # array
(k5, ".", '{"a":"b"}'), # object
(k6, ".", '{}'), # empty object
(k7, ".", 'null'), # null
(k8, ".", 'false'), # boolean
(wikipedia, '.address.city', '"Boston"'),
(wikipedia, '.age', '30'),
(wikipedia, '.phoneNumbers[0].number', '"1234567"'),
(wikipedia, '.foo', '{"a":"b"}'),
(wikipedia, '.lastName', 'null'),
(wikipedia, '.isAlive', 'false')]:
assert value.encode() == client.execute_command(
'JSON.GET', key, path)
def test_multi_exec(self):
client = self.server.get_new_client()
client.execute_command('MULTI')

View File

@ -321,6 +321,12 @@ TEST_F(DomTest, testSerialize_CustomFormat) {
EXPECT_STREQ(oss.GetString(), exp_json);
}
TEST_F(DomTest, testVerifyString) {
const char *new_val = "\"Boston\"";
JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".address.city", new_val);
EXPECT_EQ(rc, JSONUTIL_SUCCESS);
}
TEST_F(DomTest, testSetString) {
const char *new_val = "\"Boston\"";
JsonUtilCode rc = dom_set_value(nullptr, doc1, ".address.city", new_val, false, false);
@ -332,6 +338,12 @@ TEST_F(DomTest, testSetString) {
EXPECT_STREQ(GetString(&oss), new_val);
}
TEST_F(DomTest, testVerifyNumber) {
const char *new_val = "37";
JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".age", new_val);
EXPECT_EQ(rc, JSONUTIL_SUCCESS);
}
TEST_F(DomTest, testSetNumber) {
const char *new_val = "37";
JsonUtilCode rc = dom_set_value(nullptr, doc1, ".age", new_val, false, false);
@ -343,6 +355,12 @@ TEST_F(DomTest, testSetNumber) {
EXPECT_STREQ(GetString(&oss), new_val);
}
TEST_F(DomTest, testVerifyNull) {
const char *new_val = "null";
JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".address.street", new_val);
EXPECT_EQ(rc, JSONUTIL_SUCCESS);
}
TEST_F(DomTest, testSetNull) {
const char *new_val = "null";
JsonUtilCode rc = dom_set_value(nullptr, doc1, ".address.street", new_val, false, false);