diff --git a/src/Makefile b/src/Makefile index 9db3e6934..0822e82f2 100644 --- a/src/Makefile +++ b/src/Makefile @@ -276,7 +276,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/networking.cpp b/src/networking.cpp index d843e8d99..465376eeb 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -424,6 +424,10 @@ void addReplyProto(client *c, const char *s, size_t len) { _addReplyProtoToList(c,s,len); } +void addReplyProtoCString(client *c, const char *s) { + addReplyProto(c, s, strlen(s)); +} + std::string escapeString(sds str) { std::string newstr; diff --git a/src/object.cpp b/src/object.cpp index 4988fa0bf..35b0ae398 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cron.h" +#include "t_nhash.h" #include #include #include @@ -395,6 +396,7 @@ void decrRefCount(robj_roptr o) { case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; case OBJ_CRON: freeCronObject(o); break; + case OBJ_NESTEDHASH: freeNestedHashObject(o); break; default: serverPanic("Unknown object type"); break; } if (g_pserver->fActiveReplica) { diff --git a/src/sds.h b/src/sds.h index 3315ed9b1..d316a9ee8 100644 --- a/src/sds.h +++ b/src/sds.h @@ -390,6 +390,10 @@ public: : sdsview(sdsdup(other.m_str)) {} + sdsstring(const char *rgch, size_t cch) + : sdsview(sdsnewlen(rgch, cch)) + {} + sdsstring(sdsstring &&other) : sdsview(other.m_str) { @@ -410,6 +414,12 @@ public: return *this; } + sds release() { + sds sdsT = m_str; + m_str = nullptr; + return sdsT; + } + ~sdsstring() { sdsfree(m_str); diff --git a/src/server.cpp b/src/server.cpp index a162a8c1c..9d7dc2ef3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -63,6 +63,7 @@ #include #include "aelocker.h" #include "motd.h" +#include "t_nhash.h" #ifdef __linux__ #include #endif @@ -1060,7 +1061,15 @@ struct redisCommand redisCommandTable[] = { {"stralgo",stralgoCommand,-2, "read-only @string", - 0,lcsGetKeys,0,0,0,0,0,0} + 0,lcsGetKeys,0,0,0,0,0,0}, + + {"keydb.nhget",nhgetCommand,-2, + "read-only fast @hash", + 0,NULL,1,1,1,0,0,0}, + + {"keydb.nhset",nhsetCommand,-3, + "read-only fast @hash", + 0,NULL,1,1,1,0,0,0}, }; /*============================ Utility functions ============================ */ @@ -3558,7 +3567,15 @@ void call(client *c, int flags) { updateCachedTime(0); incrementMvccTstamp(); start = g_pserver->ustime; - c->cmd->proc(c); + try { + c->cmd->proc(c); + } catch (robj_roptr o) { + addReply(c, o); + } catch (robj *o) { + addReply(c, o); + } catch (const char *sz) { + addReplyError(c, sz); + } serverTL->commandsExecuted++; duration = ustime()-start; dirty = g_pserver->dirty-dirty; diff --git a/src/server.h b/src/server.h index 9f7d8f22c..d8285e5f4 100644 --- a/src/server.h +++ b/src/server.h @@ -654,11 +654,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ -#define OBJ_STRING 0 /* String object. */ -#define OBJ_LIST 1 /* List object. */ -#define OBJ_SET 2 /* Set object. */ -#define OBJ_ZSET 3 /* Sorted set object. */ -#define OBJ_HASH 4 /* Hash object. */ +#define OBJ_STRING 0 /* String object. */ +#define OBJ_LIST 1 /* List object. */ +#define OBJ_SET 2 /* Set object. */ +#define OBJ_ZSET 3 /* Sorted set object. */ +#define OBJ_HASH 4 /* Hash object. */ /* The "module" object type is a special one that signals that the object * is one directly managed by a Redis module. In this case the value points @@ -671,10 +671,10 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; * by a 64 bit module type ID, which has a 54 bits module-specific signature * in order to dispatch the loading to the right module, plus a 10 bits * encoding version. */ -#define OBJ_MODULE 5 /* Module object. */ -#define OBJ_STREAM 6 /* Stream object. */ -#define OBJ_CRON 7 /* CRON job */ - +#define OBJ_MODULE 5 /* Module object. */ +#define OBJ_STREAM 6 /* Stream object. */ +#define OBJ_CRON 7 /* CRON job */ +#define OBJ_NESTEDHASH 8 /* Nested Hash Object */ /* Extract encver / signature from a module type ID. */ #define REDISMODULE_TYPE_ENCVER_BITS 10 @@ -2009,6 +2009,7 @@ void addReplyNullArray(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); +void addReplyProtoCString(client *c, const char *s); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); diff --git a/src/t_nhash.cpp b/src/t_nhash.cpp new file mode 100644 index 000000000..e9e17b1f7 --- /dev/null +++ b/src/t_nhash.cpp @@ -0,0 +1,353 @@ +/* + * Copyright (c) 2020, EQ Alpha Technology Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include + +void dictObjectDestructor(void *privdata, void *val); +dictType nestedHashDictType { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictObjectDestructor, /* val destructor */ +}; + +robj *createNestHashBucket() { + dict *d = dictCreate(&nestedHashDictType, nullptr); + return createObject(OBJ_NESTEDHASH, d); +} + +void freeNestedHashObject(robj_roptr o) { + dictRelease((dict*)ptrFromObj(o)); +} + +robj *fetchFromKey(redisDb *db, robj_roptr key) { + const char *pchCur = szFromObj(key); + const char *pchStart = pchCur; + const char *pchMax = pchCur + sdslen(pchCur); + robj *o = nullptr; + + while (pchCur <= pchMax) { + if (pchCur == pchMax || *pchCur == '.') { + // WARNING: Don't deref pchCur as it may be pchMax + + // New word + if ((pchCur - pchStart) < 1) { + throw shared.syntaxerr; // malformed + } + + dict *d = nullptr; + if (o == nullptr) + d = db->pdict; + else + d = (dict*)ptrFromObj(o); + + sdsstring str(pchStart, pchCur - pchStart); + dictEntry *de = dictFind(d, str.get()); + o = (de != nullptr) ? (robj*)dictGetVal(de) : nullptr; + + if (o == nullptr) throw shared.nokeyerr; // Not Found + serverAssert(o->type == OBJ_NESTEDHASH || o->type == OBJ_STRING || o->type == OBJ_LIST); + if (o->type == OBJ_STRING && pchCur != pchMax) + throw shared.nokeyerr; // Past the end + + pchStart = pchCur + 1; + } + ++pchCur; + } + + return o; +} + +// Returns one if we overwrote a value +bool setWithKey(redisDb *db, robj_roptr key, robj *val, bool fCreateBuckets) { + const char *pchCur = szFromObj(key); + const char *pchStart = pchCur; + const char *pchMax = pchCur + sdslen(pchCur); + robj *o = nullptr; + + while (pchCur <= pchMax) { + if (pchCur == pchMax || *pchCur == '.') { + // WARNING: Don't deref pchCur as it may be pchMax + + // New word + if ((pchCur - pchStart) < 1) { + throw shared.syntaxerr; // malformed + } + + dict *d = nullptr; + if (o == nullptr) + d = db->pdict; + else + d = (dict*)ptrFromObj(o); + + sdsstring str(pchStart, pchCur - pchStart); + dictEntry *de = dictFind(d, str.get()); + + if (pchCur == pchMax) { + val->addref(); + if (de != nullptr) { + decrRefCount((robj*)dictGetVal(de)); + dictSetVal(d, de, val); + return true; + } else { + dictAdd(d, str.release(), val); + return false; + } + } else { + o = (de != nullptr) ? (robj*)dictGetVal(de) : nullptr; + + if (o == nullptr) { + if (!fCreateBuckets) + throw shared.nokeyerr; // Not Found + o = createNestHashBucket(); + serverAssert(dictAdd(d, str.release(), o) == DICT_OK); + } else if (o->type != OBJ_NESTEDHASH) { + decrRefCount(o); + o = createNestHashBucket(); + de->v.val = o; + } + } + + pchStart = pchCur + 1; + } + ++pchCur; + } + throw "Internal Error"; +} + +void writeNestedHashToClient(client *c, robj_roptr o) { + if (o == nullptr) { + addReply(c, shared.null[c->resp]); + } else if (o->type == OBJ_STRING) { + addReplyBulk(c, o); + } else if (o->type == OBJ_LIST) { + unsigned char *zl = (unsigned char*)ptrFromObj(o); + addReplyArrayLen(c, ziplistLen(zl)); + unsigned char *p = ziplistIndex(zl, ZIPLIST_HEAD); + while (p != nullptr) { + unsigned char *str; + unsigned int len; + long long lval; + if (ziplistGet(p, &str, &len, &lval)) { + char rgT[128]; + if (str == nullptr) { + len = ll2string(rgT, 128, lval); + str = (unsigned char*)rgT; + } + addReplyBulkCBuffer(c, (const char*)str, len); + } + p = ziplistNext(zl, p); + } + } else { + serverAssert(o->type == OBJ_NESTEDHASH ); + dict *d = (dict*)ptrFromObj(o); + + if (dictSize(d) > 1) + addReplyArrayLen(c, dictSize(d)); + + dictIterator *di = dictGetIterator(d); + dictEntry *de; + while ((de = dictNext(di))) { + robj_roptr oT = (robj*)dictGetVal(de); + addReplyArrayLen(c, 2); + addReplyBulkCBuffer(c, (sds)dictGetKey(de), sdslen((sds)dictGetKey(de))); + if (oT->type == OBJ_STRING) { + addReplyBulk(c, oT); + } else { + writeNestedHashToClient(c, oT); + } + } + dictReleaseIterator(di); + } +} + +inline bool FSimpleJsonEscapeCh(char ch) { + return (ch == '"' || ch == '\\'); +} +inline bool FExtendedJsonEscapeCh(char ch) { + return ch <= 0x1F; +} + +sds writeJsonValue(sds output, const char *valIn, size_t cchIn) { + const char *val = valIn; + size_t cch = cchIn; + int cchEscapeExtra = 0; + + // First scan for escaped chars + for (size_t ich = 0; ich < cchIn; ++ich) { + if (FSimpleJsonEscapeCh(valIn[ich])) { + ++cchEscapeExtra; + } else if (FExtendedJsonEscapeCh(valIn[ich])) { + cchEscapeExtra += 5; + } + } + + if (cchEscapeExtra > 0) { + size_t ichDst = 0; + sds dst = sdsnewlen(SDS_NOINIT, cchIn+cchEscapeExtra); + for (size_t ich = 0; ich < cchIn; ++ich) { + switch (valIn[ich]) { + case '"': + dst[ichDst++] = '\\'; dst[ichDst++] = '"'; + break; + case '\\': + dst[ichDst++] = '\\'; dst[ichDst++] = '\\'; + break; + + default: + serverAssert(!FSimpleJsonEscapeCh(valIn[ich])); + if (FExtendedJsonEscapeCh(valIn[ich])) { + dst[ichDst++] = '\\'; dst[ichDst++] = 'u'; + sprintf(dst + ichDst, "%4x", valIn[ich]); + ichDst += 4; + } else { + dst[ichDst++] = valIn[ich]; + } + break; + } + } + val = (const char*)dst; + serverAssert(ichDst == (cchIn+cchEscapeExtra)); + cch = ichDst; + } + + output = sdscat(output, "\""); + output = sdscatlen(output, val, cch); + output = sdscat(output, "\""); + + if (val != valIn) + sdsfree(val); + + return output; +} +sds writeJsonValue(sds output, sds val) { + return writeJsonValue(output, (const char*)val, sdslen(val)); +} + +sds writeNestedHashAsJson(sds output, robj_roptr o) { + if (o->type == OBJ_STRING) { + output = writeJsonValue(output, (sds)szFromObj(o)); + } else if (o->type == OBJ_LIST) { + unsigned char *zl = (unsigned char*)ptrFromObj(o); + output = sdscat(output, "["); + unsigned char *p = ziplistIndex(zl, ZIPLIST_HEAD); + bool fFirst = true; + while (p != nullptr) { + unsigned char *str; + unsigned int len; + long long lval; + if (ziplistGet(p, &str, &len, &lval)) { + char rgT[128]; + if (str == nullptr) { + len = ll2string(rgT, 128, lval); + str = (unsigned char*)rgT; + } + if (!fFirst) + output = sdscat(output, ","); + fFirst = false; + output = writeJsonValue(output, (const char*)str, len); + } + p = ziplistNext(zl, p); + } + output = sdscat(output, "]"); + } else { + output = sdscat(output, "{"); + dictIterator *di = dictGetIterator((dict*)ptrFromObj(o)); + dictEntry *de; + bool fFirst = true; + while ((de = dictNext(di))) { + robj_roptr oT = (robj*)dictGetVal(de); + if (!fFirst) + output = sdscat(output, ","); + fFirst = false; + output = writeJsonValue(output, (sds)dictGetKey(de)); + output = sdscat(output, " : "); + output = writeNestedHashAsJson(output, oT); + } + dictReleaseIterator(di); + output = sdscat(output, "}"); + } + return output; +} + +void nhsetCommand(client *c) { + if (c->argc < 3) + throw shared.syntaxerr; + + robj *val = c->argv[2]; + if (c->argc > 3) { + // Its a list, we'll store as a ziplist + val = createZiplistObject(); + for (int iarg = 2; iarg < c->argc; ++iarg) { + sds arg = (sds)szFromObj(c->argv[iarg]); + val->m_ptr = ziplistPush((unsigned char*)ptrFromObj(val), (unsigned char*)arg, sdslen(arg), ZIPLIST_TAIL); + } + } + + try { + if (setWithKey(c->db, c->argv[1], val, true)) { + addReplyLongLong(c, 1); // we replaced a value + } else { + addReplyLongLong(c, 0); // we added a new value + } + } catch (...) { + if (val != c->argv[2]) + decrRefCount(val); + throw; + } + if (val != c->argv[2]) + decrRefCount(val); +} + +void nhgetCommand(client *c) { + if (c->argc != 2 && c->argc != 3) + throw shared.syntaxerr; + + bool fJson = false; + int argOffset = 0; + if (c->argc == 3) { + argOffset++; + if (strcasecmp(szFromObj(c->argv[1]), "json") == 0) { + fJson = true; + } else if (strcasecmp(szFromObj(c->argv[1]), "resp") != 0) { + throw shared.syntaxerr; + } + } + + robj *o = fetchFromKey(c->db, c->argv[argOffset + 1]); + if (fJson) { + sds val = writeNestedHashAsJson(sdsnew(nullptr), o); + addReplyBulkSds(c, val); + } else { + writeNestedHashToClient(c, o); + } +} \ No newline at end of file diff --git a/src/t_nhash.h b/src/t_nhash.h new file mode 100644 index 000000000..df71fb0ca --- /dev/null +++ b/src/t_nhash.h @@ -0,0 +1,6 @@ +#pragma once + +void freeNestedHashObject(robj_roptr o); + +void nhsetCommand(client *c); +void nhgetCommand(client *c); diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 88ea79d61..b70d89968 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -28,6 +28,7 @@ set ::all_tests { unit/type/hash unit/type/stream unit/type/stream-cgroups + unit/type/nestedhash unit/sort unit/expire unit/other