From 10d2793bb9daa5f78aac1e6d3c9754cffc9dd6cb Mon Sep 17 00:00:00 2001 From: Roshan Khatri <117414976+roshkhatri@users.noreply.github.com> Date: Mon, 3 Mar 2025 15:19:36 -0800 Subject: [PATCH] Adds JSON.MSET command (#39) #### JSON.MSET Set JSON values for multiple keys. The operation is atomic. Either all values are set or none is set. #### Syntax ``` JSON.MSET [key ...] ``` * key - required, JSON key * path - required, a JSON path * value - required, JSON value #### Return * Simple String 'OK' on success * Error on failure --------- Signed-off-by: Roshan Khatri --- src/json/dom.cc | 16 +++ src/json/dom.h | 14 +++ src/json/json.cc | 166 ++++++++++++++++++++++++++++- src/json/util.cc | 2 + src/json/util.h | 2 + tst/integration/test_json_basic.py | 150 +++++++++++++++++++++++++- tst/unit/dom_test.cc | 18 ++++ 7 files changed, 362 insertions(+), 6 deletions(-) diff --git a/src/json/dom.cc b/src/json/dom.cc index a731d79..4ea8fd9 100644 --- a/src/json/dom.cc +++ b/src/json/dom.cc @@ -169,6 +169,22 @@ void dom_serialize_value(const JValue &val, const PrintFormat *format, rapidjson serialize_value(val, 0, format, oss); } +JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json, + size_t new_val_size) { + Selector selector; + JsonUtilCode rc = selector.prepareSetValues(doc->GetJValue(), json_path); + if (rc != JSONUTIL_SUCCESS) return rc; + + JParser new_val; + if (new_val.Parse(new_val_json, new_val_size).HasParseError()) { + return new_val.GetParseErrorCode(); + } + + CHECK_DOCUMENT_PATH_LIMIT(ctx, selector, new_val); + CHECK_DOCUMENT_SIZE_LIMIT(ctx, doc->size, new_val.GetJValueSize()); + return JSONUTIL_SUCCESS; +} + JsonUtilCode dom_set_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json, size_t new_val_size, const bool is_create_only, const bool is_update_only) { if (is_create_only && is_update_only) return JSONUTIL_NX_XX_SHOULD_BE_MUTUALLY_EXCLUSIVE; diff --git a/src/json/dom.h b/src/json/dom.h index 76c73d7..20dc411 100644 --- a/src/json/dom.h +++ b/src/json/dom.h @@ -253,6 +253,20 @@ void dom_serialize_value(const JValue &val, const PrintFormat *format, rapidjson */ JValue& dom_get_value(JDocument &doc); +/* Verify value at the path. + * @param json_path: path that is compliant to the JSON Path syntax. + * @param is_create_only - indicates to create a new value. + * @param is_update_only - indicates to update an existing value. + * @return JSONUTIL_SUCCESS for success, other code for failure. + */ +JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json, + size_t new_val_len); + + +inline JsonUtilCode dom_verify_value(ValkeyModuleCtx *ctx, JDocument *doc, const char *json_path, const char *new_val_json) { + return dom_verify_value(ctx, doc, json_path, new_val_json, strlen(new_val_json)); +} + /* Set value at the path. * @param json_path: path that is compliant to the JSON Path syntax. * @param is_create_only - indicates to create a new value. diff --git a/src/json/json.cc b/src/json/json.cc index a0f3a7c..81db080 100644 --- a/src/json/json.cc +++ b/src/json/json.cc @@ -262,6 +262,98 @@ STATIC JsonUtilCode parseSetCmdArgs(ValkeyModuleString **argv, const int argc, S return JSONUTIL_SUCCESS; } +typedef struct { + ValkeyModuleString *key_str; // Required + ValkeyModuleKey *key; + const char *path; // Required + const char *json; // Required + size_t json_len; + bool is_root_path; +} MSetCmdArgs; + +STATIC JsonUtilCode parseAndValidateMSetCmdArgs(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, const int argc, MSetCmdArgs **args_list, size_t *num_keys) { + // Validate that the number of arguments is correct for MSET (3 arguments per key) + if ((argc - 1) % 3 != 0) { + return JSONUTIL_WRONG_NUM_ARGS; + } + + *num_keys = (argc - 1) / 3; + // Allocate memory for args_list + *args_list = reinterpret_cast(ValkeyModule_Alloc((*num_keys) * sizeof(MSetCmdArgs))); + memset(*args_list, 0, (*num_keys) * sizeof(MSetCmdArgs)); + + if (!(*args_list)) { + return JSONUTIL_ALLOCATION_FAILURE; + } + + JsonUtilCode rc; + // Parse and validate arguments for each key + size_t i; + for (i = 0; i < *num_keys; ++i) { + MSetCmdArgs ¤t_arg = (*args_list)[i]; + + current_arg.key_str = argv[i * 3 + 1]; + current_arg.key = static_cast( + ValkeyModule_OpenKey(ctx, current_arg.key_str, VALKEYMODULE_READ | VALKEYMODULE_WRITE)); + + // Handle key allocation failure + if (!current_arg.key) { + rc = JSONUTIL_KEY_OPEN_ERROR; + return rc; + } + + current_arg.path = ValkeyModule_StringPtrLen(argv[i * 3 + 2], nullptr); + current_arg.json = ValkeyModule_StringPtrLen(argv[i * 3 + 3], ¤t_arg.json_len); + + // Validate key type + int type = ValkeyModule_KeyType(current_arg.key); + if (type != VALKEYMODULE_KEYTYPE_EMPTY && + ValkeyModule_ModuleTypeGetType(current_arg.key) != DocumentType) { + rc = JSONUTIL_NOT_A_DOCUMENT_KEY; + return rc; + } + + // Root path validation + bool is_new_valkey_key = (type == VALKEYMODULE_KEYTYPE_EMPTY); + current_arg.is_root_path = jsonutil_is_root_path(current_arg.path); + if (is_new_valkey_key && !current_arg.is_root_path) { + rc = JSONUTIL_COMMAND_SYNTAX_ERROR; + return rc; + } + + // Validate JSON structure + if (current_arg.is_root_path) { + JDocument *doc = nullptr; + rc = dom_parse(ctx, current_arg.json, current_arg.json_len, &doc); + if (rc != JSONUTIL_SUCCESS) { + if (doc) dom_free_doc(doc); + return rc; + } + if (json_is_instrument_enabled_insert() || json_is_instrument_enabled_update()) { + size_t len; + const char* key_cstr = ValkeyModule_StringPtrLen(current_arg.key_str, &len); + std::size_t key_hash = std::hash{}(std::string_view(key_cstr, len)); + ValkeyModule_Log(ctx, "warning", + "Dump document structure before setting JSON key (hashed) %zu whole doc %p:", + key_hash, static_cast(doc)); + DumpRedactedJValue(doc->GetJValue(), nullptr, "warning"); + } + dom_free_doc(doc); + } else { + JDocument *doc = static_cast(ValkeyModule_ModuleTypeGetValue(current_arg.key)); + if (!doc) { + rc = JSONUTIL_DOCUMENT_KEY_NOT_FOUND; + return rc; + } + rc = dom_verify_value(ctx, doc, current_arg.path, current_arg.json); + if (rc != JSONUTIL_SUCCESS) { + return rc; + } + } + } + return JSONUTIL_SUCCESS; +} + STATIC JsonUtilCode parseGetCmdArgs(ValkeyModuleString **argv, const int argc, ValkeyModuleString **key, PrintFormat *format, ValkeyModuleString ***paths, int *num_paths) { *key = nullptr; @@ -629,6 +721,63 @@ int Command_JsonSet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { return ValkeyModule_ReplyWithSimpleString(ctx, "OK"); } +int Command_JsonMSet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + ValkeyModule_AutoMemory(ctx); + + MSetCmdArgs *args_list = nullptr; + size_t num_keys; + JsonUtilCode rc = parseAndValidateMSetCmdArgs(ctx, argv, argc, &args_list, &num_keys); + if (rc != JSONUTIL_SUCCESS) { + ValkeyModule_Free(args_list); + if (rc == JSONUTIL_WRONG_NUM_ARGS) + return ValkeyModule_WrongArity(ctx); + else + return ValkeyModule_ReplyWithError(ctx, jsonutil_code_to_message(rc)); + } + + // Apply changes + size_t i; + for (i = 0; i < num_keys; i++) { + // begin tracking memory + int64_t begin_val = jsonstats_begin_track_mem(); + + if (args_list[i].is_root_path) { // Root document + // parse incoming JSON string + JDocument *doc; + rc = dom_parse(ctx, args_list[i].json, args_list[i].json_len, &doc); + ValkeyModule_Assert(rc == JSONUTIL_SUCCESS); + + int64_t delta = jsonstats_end_track_mem(begin_val); + size_t doc_size = dom_get_doc_size(doc) + delta; + dom_set_doc_size(doc, doc_size); + + // Set Valkey key + ValkeyModule_ModuleTypeSetValue(args_list[i].key, DocumentType, doc); + // update stats + jsonstats_update_stats_on_insert(doc, true, 0, doc_size, doc_size); + } else { // Update existing document + JDocument *doc = static_cast(ValkeyModule_ModuleTypeGetValue(args_list[i].key)); + size_t orig_doc_size = dom_get_doc_size(doc); + + rc = dom_set_value(ctx, doc, args_list[i].path, args_list[i].json, args_list[i].json_len, false, false); + ValkeyModule_Assert(rc == JSONUTIL_SUCCESS); + int64_t delta = jsonstats_end_track_mem(begin_val); + size_t new_doc_size = dom_get_doc_size(doc) + delta; + dom_set_doc_size(doc, new_doc_size); + + // update stats + jsonstats_update_stats_on_update(doc, orig_doc_size, new_doc_size, args_list[i].json_len); + } + + ValkeyModule_NotifyKeyspaceEvent(ctx, VALKEYMODULE_NOTIFY_GENERIC, "json.mset", args_list[i].key_str); + } + + // replicate the entire command + ValkeyModule_ReplicateVerbatim(ctx); + ValkeyModule_Free(args_list); + return ValkeyModule_ReplyWithSimpleString(ctx, "OK"); +} + int Command_JsonGet(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { ValkeyModule_AutoMemory(ctx); @@ -2216,10 +2365,6 @@ void Module_Info(ValkeyModuleInfoCtx *ctx, int for_crash_report) { } \ } - - // - // User visible metrics - // beginSection("core_metrics") addULongLong("total_memory_bytes", jsonstats_get_used_mem() + keyTable->getStats().bytes); addULongLong("num_documents", jsonstats_get_num_doc_keys()); @@ -2512,6 +2657,16 @@ extern "C" int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx) { return VALKEYMODULE_ERR; } + if (ValkeyModule_CreateCommand(ctx, "JSON.MSET", Command_JsonMSet, cmdflg_slow_write_deny, 1, -3, 3) + == VALKEYMODULE_ERR) { + ValkeyModule_Log(ctx, "warning", "Failed to create command JSON.MSET."); + return VALKEYMODULE_ERR; + } + if (ValkeyModule_SetCommandACLCategories(ValkeyModule_GetCommand(ctx,"JSON.MSET"), cat_slow_write_deny) == VALKEYMODULE_ERR) { + ValkeyModule_Log(ctx, "warning", "Failed to mset command category for JSON.MSET."); + return VALKEYMODULE_ERR; + } + if (ValkeyModule_CreateCommand(ctx, "JSON.GET", Command_JsonGet, cmdflg_readonly, 1, 1, 1) == VALKEYMODULE_ERR) { ValkeyModule_Log(ctx, "warning", "Failed to create command JSON.GET."); return VALKEYMODULE_ERR; @@ -2768,6 +2923,9 @@ extern "C" int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx) { if (!set_command_info(ctx, "JSON.SET", -4, ks_read_write_update, 1, std::make_tuple(0, 1, 0))) { return VALKEYMODULE_ERR; } + if (!set_command_info(ctx, "JSON.MSET", -4, ks_read_write_update, 1, std::make_tuple(-3, 3, 0))) { + return VALKEYMODULE_ERR; + } // Commands under RW + Insert if (!set_command_info(ctx, "JSON.ARRAPPEND", -4, ks_read_write_insert, 1, std::make_tuple(0, 1, 0))) { return VALKEYMODULE_ERR; diff --git a/src/json/util.cc b/src/json/util.cc index ee7fc1d..ffbe55e 100644 --- a/src/json/util.cc +++ b/src/json/util.cc @@ -62,6 +62,8 @@ const char *jsonutil_code_to_message(JsonUtilCode code) { case JSONUTIL_INVALID_RDB_FORMAT: return "ERROR Invalid value in RDB format"; case JSONUTIL_DOLLAR_CANNOT_APPLY_TO_NON_ROOT: return "SYNTAXERR Dollar sign cannot apply to non-root element"; + case JSONUTIL_ALLOCATION_FAILURE: return "ERROR Memory Allocation failed "; + case JSONUTIL_KEY_OPEN_ERROR: return "ERROR Unable to open valkeymodule key"; default: ValkeyModule_Assert(false); } return ""; diff --git a/src/json/util.h b/src/json/util.h index 2f7117e..b6719aa 100644 --- a/src/json/util.h +++ b/src/json/util.h @@ -62,6 +62,8 @@ typedef enum { JSONUTIL_CANNOT_INSERT_MEMBER_INTO_NON_OBJECT_VALUE, JSONUTIL_INVALID_RDB_FORMAT, JSONUTIL_DOLLAR_CANNOT_APPLY_TO_NON_ROOT, + JSONUTIL_ALLOCATION_FAILURE, + JSONUTIL_KEY_OPEN_ERROR, JSONUTIL_LAST } JsonUtilCode; diff --git a/tst/integration/test_json_basic.py b/tst/integration/test_json_basic.py index e6416c0..b2bdaea 100644 --- a/tst/integration/test_json_basic.py +++ b/tst/integration/test_json_basic.py @@ -178,8 +178,14 @@ class TestJsonBasic(JsonTestCase): assert b'OK' == client.execute_command( 'JSON.SET', k2, '.', '[1,2,3,4,5]') - assert [b'{"a":"1","b":"2","c":"3"}', b'[1,2,3,4,5]'] == client.execute_command( - 'JSON.MGET', k1, k2, '.') + assert b'OK' == client.execute_command( + 'JSON.MSET', k3, '.', '[1,2,3,4,5]') + assert b'OK' == client.execute_command( + 'JSON.MSET', k2, '.', '[5,4,3,2,1]', k1, '.', '{"a":"1", "b":"2", "c":"3"}') + assert [b'{"a":"1","b":"2","c":"3"}', b'[5,4,3,2,1]', b'[1,2,3,4,5]'] == client.execute_command( + 'JSON.MGET', k1, k2, k3, '.') + assert b'OK' == client.execute_command( + 'JSON.SET', k2, '.', '[1,2,3,4,5]') assert b'{"a":"1","b":"2","c":"3"}' == client.execute_command( 'JSON.GET', k1) for (key, path, exp) in [ @@ -2781,6 +2787,146 @@ class TestJsonBasic(JsonTestCase): 'JSON.SET', k, '.', json_with_size(64*MB)) assert self.error_class.is_limit_exceeded_error(str(e.value)) + def test_json_mset_command_supports_all_datatypes(self): + client = self.server.get_new_client() + for (path, value) in [('.address.city', '"Boston"'), # string + # number + ('.age', '30'), + ('.foo', '[1,2,3]'), # array + # array element access + ('.phoneNumbers[0].number', '"1234567"'), + # object + ('.foo', '{"a":"b"}'), + ('.lastName', 'null'), # null + ('.isAlive', 'false')]: # boolean + assert b'OK' == client.execute_command( + 'JSON.MSET', wikipedia, path, value) + assert value.encode() == client.execute_command( + 'JSON.GET', wikipedia, path) + + def test_json_mset_command_root_document(self): + client = self.server.get_new_client() + # path to the root document is represented as '.' + assert b'OK' == client.execute_command( + "JSON.MSET", + k1, ".", '"Boston"', + k2, ".", '123', + k3, ".", '["Seattle","Boston"]', + k4, ".", '[1,2,3]', + k5, ".", '{"a":"b"}', + k6, ".", '{}', + k7, ".", 'null', + k8, ".", 'false') + for (key, value) in [(k1, '"Boston"'), # string + (k2, '123'), # number + (k3, '["Seattle","Boston"]'), # array + (k4, '[1,2,3]'), # array + (k5, '{"a":"b"}'), # object + (k6, '{}'), # empty object + (k7, 'null'), # null + (k8, 'false')]: # boolean + assert value.encode() == client.execute_command('JSON.GET', key) + + def test_json_mset_command_with_error_conditions(self): + client = self.server.get_new_client() + # A new Valkey key's path must be root + with pytest.raises(ResponseError) as e: + assert None == client.execute_command( + 'JSON.MSET', foo, '.bar', '"bar"') + assert self.error_class.is_syntax_error(str(e.value)) + + def test_json_mset_command_mixed_path_document(self): + client = self.server.get_new_client() + # path to the root document is represented as '.' + assert b'OK' == client.execute_command( + "JSON.MSET", + k1, ".", '"Boston"', + wikipedia, '.address.city', '"Boston"', + k2, ".", '123', + wikipedia, '.age', '30', + k3, ".", '["Seattle","Boston"]', + k4, ".", '[1,2,3]', + wikipedia, '.phoneNumbers[0].number', '"1234567"', + k5, ".", '{"a":"b"}', + wikipedia, '.foo', '{"a":"b"}', + k6, ".", '{}', + wikipedia, '.lastName', 'null', + k7, ".", 'null', + wikipedia, '.isAlive', 'false', + k8, ".", 'false') + for (key, path, value) in [(k1, ".", '"Boston"'), # string + (k2, ".", '123'), # number + (k3, ".", '["Seattle","Boston"]'), # array + (k4, ".", '[1,2,3]'), # array + (k5, ".", '{"a":"b"}'), # object + (k6, ".", '{}'), # empty object + (k7, ".", 'null'), # null + (k8, ".", 'false'), # boolean + (wikipedia, '.address.city', '"Boston"'), + (wikipedia, '.age', '30'), + (wikipedia, '.phoneNumbers[0].number', '"1234567"'), + (wikipedia, '.foo', '{"a":"b"}'), + (wikipedia, '.lastName', 'null'), + (wikipedia, '.isAlive', 'false')]: + assert value.encode() == client.execute_command( + 'JSON.GET', key, path) + + def test_json_mset_command_atomicity(self): + client = self.server.get_new_client() + # path to the root document is represented as '.' + assert b'OK' == client.execute_command( + "JSON.MSET", + k1, ".", '"Boston"', + wikipedia, '.address.city', '"Boston"', + k2, ".", '123', + wikipedia, '.age', '30', + k3, ".", '["Seattle","Boston"]', + k4, ".", '[1,2,3]', + wikipedia, '.phoneNumbers[0].number', '"1234567"', + k5, ".", '{"a":"b"}', + wikipedia, '.foo', '{"a":"b"}', + k6, ".", '{}', + wikipedia, '.lastName', 'null', + k7, ".", 'null', + wikipedia, '.isAlive', 'false', + k8, ".", 'false') + + with pytest.raises(ResponseError) as e: + assert None == client.execute_command( + "JSON.MSET", + k1, ".", '"Seattle"', + wikipedia, '.address.city', '"Seattle"', + k2, ".", '456', + wikipedia, '.age', '40', + k3, ".", '["Seattle","Chicago"]', + k4, ".", '[4,5,6]', + wikipedia, '.phoneNumbers[0].number', '"56789"', + k5, ".", '{"c":"d"}', + wikipedia, '.foo', '{"c":"d"}', + k6, ".", '{}', + wikipedia, '.lastName', 'null', + k7, ".", 'null', + foo, '.bar', '"bar"', + k8, ".", 'true') + assert self.error_class.is_syntax_error(str(e.value)) + + for (key, path, value) in [(k1, ".", '"Boston"'), # string + (k2, ".", '123'), # number + (k3, ".", '["Seattle","Boston"]'), # array + (k4, ".", '[1,2,3]'), # array + (k5, ".", '{"a":"b"}'), # object + (k6, ".", '{}'), # empty object + (k7, ".", 'null'), # null + (k8, ".", 'false'), # boolean + (wikipedia, '.address.city', '"Boston"'), + (wikipedia, '.age', '30'), + (wikipedia, '.phoneNumbers[0].number', '"1234567"'), + (wikipedia, '.foo', '{"a":"b"}'), + (wikipedia, '.lastName', 'null'), + (wikipedia, '.isAlive', 'false')]: + assert value.encode() == client.execute_command( + 'JSON.GET', key, path) + def test_multi_exec(self): client = self.server.get_new_client() client.execute_command('MULTI') diff --git a/tst/unit/dom_test.cc b/tst/unit/dom_test.cc index ed70b58..6d66a56 100644 --- a/tst/unit/dom_test.cc +++ b/tst/unit/dom_test.cc @@ -321,6 +321,12 @@ TEST_F(DomTest, testSerialize_CustomFormat) { EXPECT_STREQ(oss.GetString(), exp_json); } +TEST_F(DomTest, testVerifyString) { + const char *new_val = "\"Boston\""; + JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".address.city", new_val); + EXPECT_EQ(rc, JSONUTIL_SUCCESS); +} + TEST_F(DomTest, testSetString) { const char *new_val = "\"Boston\""; JsonUtilCode rc = dom_set_value(nullptr, doc1, ".address.city", new_val, false, false); @@ -332,6 +338,12 @@ TEST_F(DomTest, testSetString) { EXPECT_STREQ(GetString(&oss), new_val); } +TEST_F(DomTest, testVerifyNumber) { + const char *new_val = "37"; + JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".age", new_val); + EXPECT_EQ(rc, JSONUTIL_SUCCESS); +} + TEST_F(DomTest, testSetNumber) { const char *new_val = "37"; JsonUtilCode rc = dom_set_value(nullptr, doc1, ".age", new_val, false, false); @@ -343,6 +355,12 @@ TEST_F(DomTest, testSetNumber) { EXPECT_STREQ(GetString(&oss), new_val); } +TEST_F(DomTest, testVerifyNull) { + const char *new_val = "null"; + JsonUtilCode rc = dom_verify_value(nullptr, doc1, ".address.street", new_val); + EXPECT_EQ(rc, JSONUTIL_SUCCESS); +} + TEST_F(DomTest, testSetNull) { const char *new_val = "null"; JsonUtilCode rc = dom_set_value(nullptr, doc1, ".address.street", new_val, false, false);