add export csv feature

This commit is contained in:
Kenneth Cheng 2020-05-03 22:32:41 +08:00
parent 7e8fc08b04
commit 9539e62a22
2 changed files with 105 additions and 49 deletions

View File

@ -1,6 +1,6 @@
# Redis Module for maintaining hash by simple SQL (Also provide csv import function) # Redis Module for maintaining hash by simple SQL (Support csv import/export)
This module aims to provide simple DML to manipulate the hashes in REDIS for SQL users. It works as simple as you expected. It translates the input statement to a set of pure REDIS commands. It does not need nor generate any intermediate stuffs which occupied your storages. The target data is your hashes only. It also provides the CSV import and export (under construction) function. This module aims to provide simple DML to manipulate the hashes in REDIS for SQL users. It works as simple as you expected. It translates the input statement to a set of pure REDIS commands. It does not need nor generate any intermediate stuffs which occupied your storages. The target data is your hashes only. It also provides the CSV import and export function.
## Usage ## Usage
```sql ```sql
@ -120,7 +120,7 @@ The above is nearly like REDIS keys command
Each record is exactly a hash, you could use raw REDIS commands ``hget, hmget or hgetall`` to retrieve the same content Each record is exactly a hash, you could use raw REDIS commands ``hget, hmget or hgetall`` to retrieve the same content
#### Where clause in select statement #### Where clause
Your could specify =, >, <, >=, <=, <>, != or like conditions in where clause. Now the module only support "and" to join multiple conditions. Your could specify =, >, <, >=, <=, <>, != or like conditions in where clause. Now the module only support "and" to join multiple conditions.
```sql ```sql
127.0.0.1:6379> dbx select tel from phonebook where name like Son 127.0.0.1:6379> dbx select tel from phonebook where name like Son
@ -133,7 +133,7 @@ Your could specify =, >, <, >=, <=, <>, != or like conditions in where clause. N
2) "1-888-3333-1412" 2) "1-888-3333-1412"
``` ```
#### Order clause in select statement #### Order clause
Ordering can be ascending or descending. All sortings are alpha-sort. Ordering can be ascending or descending. All sortings are alpha-sort.
```sql ```sql
127.0.0.1:6379> dbx select name, pos from phonebook order by pos asc 127.0.0.1:6379> dbx select name, pos from phonebook order by pos asc
@ -164,7 +164,7 @@ Ordering can be ascending or descending. All sortings are alpha-sort.
2) "Betty Joan" 2) "Betty Joan"
``` ```
#### Top clause in select statement #### Top clause
```sql ```sql
127.0.0.1:6379> dbx select top 3 name, tel from phonebook order by pos desc 127.0.0.1:6379> dbx select top 3 name, tel from phonebook order by pos desc
1) 1) name 1) 1) name
@ -183,8 +183,8 @@ Ordering can be ascending or descending. All sortings are alpha-sort.
(empty list or set) (empty list or set)
``` ```
#### Into clause in select statement #### Into clause for copy hash table
You could create another table by into clause. You could create another hash table by into clause.
```sql ```sql
127.0.0.1:6379> dbx select * into testbook from phonebook 127.0.0.1:6379> dbx select * into testbook from phonebook
1) testbook:1588325407-1751904058 1) testbook:1588325407-1751904058
@ -239,16 +239,15 @@ You could create another table by into clause.
10) "F" 10) "F"
``` ```
#### Into csv clause in select statement #### Into csv clause for exporting records in csv format
(The feature is under construction)
```sql ```sql
127.0.0.1:6379> dbx select * into csv "/tmp/testbook.csv" from phonebook where pos > 2 127.0.0.1:6379> dbx select * into csv "/tmp/testbook.csv" from phonebook where pos > 2
(integer) 2 1) Kevin Louis,111-2123-1233,2009-12-31,6,F
2) Kenneth Cheng,123-12134-123,2000-12-31,5,M
127.0.0.1:6379> quit 127.0.0.1:6379> quit
$ cat /tmp/testbook.csv $ cat /tmp/testbook.csv
name,tel,birth,pos,gender Kevin Louis,111-2123-1233,2009-12-31,6,F
"Kenneth Cheng","123-12134-123","2000-12-31","5","M" Kenneth Cheng,123-12134-123,2000-12-31,5,M
"Kevin Louis","111-2123-1233","2009-12-31","6","F"
$ $
``` ```
@ -290,7 +289,7 @@ Note that Redis requires at least one space after the single and double quoted a
127.0.0.1:6379> dbx "insert into phonebook (name,tel,birth,pos,gender) values ('Peter Nelson','1-456-1246-3421','2019-10-01',3, 'M')" 127.0.0.1:6379> dbx "insert into phonebook (name,tel,birth,pos,gender) values ('Peter Nelson','1-456-1246-3421','2019-10-01',3, 'M')"
``` ```
#### From clause for importing CSV file in insert statement #### From clause for importing CSV file
The module provides simple import function by specifying from clause in Insert statement. It only support comma deliminated. Please make sure that the specified import file can be accessed by Redis server. The module provides simple import function by specifying from clause in Insert statement. It only support comma deliminated. Please make sure that the specified import file can be accessed by Redis server.
```bash ```bash
$ cat > /tmp/test.csv << EOF $ cat > /tmp/test.csv << EOF

125
src/dbx.c
View File

@ -26,6 +26,12 @@ int regexCompile(RedisModuleCtx *ctx, regex_t *r, const char *t) {
return 0; return 0;
} }
char* trim(char* s, char t) {
char* p = s;
if (p[strlen(s)-1] == t) p[strlen(s)-1] = 0;
if (p[0] == t) p++;
return p;
}
char* toLower(char* s) { char* toLower(char* s) {
for(char *p = s; *p; p++) for(char *p = s; *p; p++)
*p = tolower(*p); *p = tolower(*p);
@ -37,8 +43,20 @@ char* toUpper(char* s) {
return s; return s;
} }
const char* RedisModule_StringToChar(RedisModuleString *s) {
size_t l;
return RedisModule_StringPtrLen(s, &l);
}
char* VectorGetString(Vector *v, size_t i) {
char *value;
Vector_Get(v, i, &value);
return value;
}
int whereRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vWhere) { int whereRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vWhere) {
char *field, *w; //char *field;
char *w;
int condition; int condition;
int match = 1; int match = 1;
@ -47,21 +65,20 @@ int whereRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vWhere) {
if (n == 0) return 1; if (n == 0) return 1;
if (n % 3 != 0) return 0; if (n % 3 != 0) return 0;
for (size_t i = 0; i < n; i += 3) { for (size_t i = 0; i < n; i += 3) {
Vector_Get(vWhere, i, &field); // Vector_Get(vWhere, i, &field);
Vector_Get(vWhere, i+1, &condition); Vector_Get(vWhere, i+1, &condition);
Vector_Get(vWhere, i+2, &w); Vector_Get(vWhere, i+2, &w);
if (condition == 7) if (condition == 7)
toLower(w); toLower(w);
if (strlen(w) == 0) return 0; if (strlen(w) == 0) return 0;
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field); RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, VectorGetString(vWhere, i));
if (RedisModule_CallReplyLength(tags) == 0) { if (RedisModule_CallReplyLength(tags) == 0) {
RedisModule_FreeCallReply(tags); RedisModule_FreeCallReply(tags);
return 0; return 0;
} }
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags); RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
size_t l; const char *s = RedisModule_StringToChar(rms);
const char *s = RedisModule_StringPtrLen(rms, &l);
switch(condition) { switch(condition) {
case 0: case 0:
match = strcmp(s, w) >= 0? 1: 0; match = strcmp(s, w) >= 0? 1: 0;
@ -179,6 +196,47 @@ void intoRecord(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vSelect, ch
RedisModule_ReplyWithSimpleString(ctx, newkey); RedisModule_ReplyWithSimpleString(ctx, newkey);
} }
void intoCSV(RedisModuleCtx *ctx, RedisModuleString *key, Vector *vSelect, char *filename) {
char* field;
size_t nSelected = Vector_Size(vSelect);
char line[1024];
FILE *fp = fopen(filename, "a");
strcpy(line, "");
for(size_t i = 0; i < nSelected; i++) {
Vector_Get(vSelect, i, &field);
// If '*' is specified in selected hash list, display all hashes then
if (strcmp(field, "*") == 0) {
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGETALL", "s", key);
size_t tf = RedisModule_CallReplyLength(tags);
if (tf > 0) {
for(size_t j=0; j<tf; j+=2) {
// RedisModuleString *rms1 = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(tags, j));
RedisModuleString *rms2 = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(tags, j+1));
if (strlen(line) > 0) strcat(line, ",");
strcat(line, RedisModule_StringToChar(rms2));
// RedisModule_FreeString(ctx, rms1);
RedisModule_FreeString(ctx, rms2);
}
}
RedisModule_FreeCallReply(tags);
}
else {
if (strlen(line) > 0) strcat(line, ",");
RedisModuleCallReply *tags = RedisModule_Call(ctx, "HGET", "sc", key, field);
if (RedisModule_CallReplyLength(tags) > 0) {
RedisModuleString *rms = RedisModule_CreateStringFromCallReply(tags);
strcat(line, RedisModule_StringToChar(rms));
RedisModule_FreeString(ctx, rms);
}
RedisModule_FreeCallReply(tags);
}
}
RedisModule_ReplyWithSimpleString(ctx, line);
fprintf(fp, "%s\n", line);
fclose(fp);
}
/* Split the string by specified delimilator */ /* Split the string by specified delimilator */
Vector* splitStringByChar(char *s, char* d) { Vector* splitStringByChar(char *s, char* d) {
size_t cap; size_t cap;
@ -220,16 +278,17 @@ Vector* splitWhereString(char *s) {
return v; return v;
} }
size_t processRecords(RedisModuleCtx *ctx, RedisModuleCallReply *keys, regex_t *r, Vector *vSelect, Vector *vWhere, char *intoKey, long *top) { size_t processRecords(RedisModuleCtx *ctx, RedisModuleCallReply *keys, regex_t *r, Vector *vSelect, Vector *vWhere, long *top, char *intoKey, char *csvFile) {
size_t nKeys = RedisModule_CallReplyLength(keys); size_t nKeys = RedisModule_CallReplyLength(keys);
size_t affected = 0; size_t affected = 0;
for (size_t i = 0; i < nKeys; i++) { for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i)); RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l; const char *s = RedisModule_StringToChar(key);
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0)) { if (!regexec(r, s, 1, NULL, 0)) {
if (vWhere == NULL || whereRecord(ctx, key, vWhere)) { if (vWhere == NULL || whereRecord(ctx, key, vWhere)) {
if (strlen(intoKey) > 0) if (strlen(csvFile) > 0)
intoCSV(ctx, key, vSelect, csvFile);
else if (strlen(intoKey) > 0)
intoRecord(ctx, key, vSelect, intoKey); intoRecord(ctx, key, vSelect, intoKey);
else else
showRecord(ctx, key, vSelect); showRecord(ctx, key, vSelect);
@ -261,8 +320,7 @@ size_t buildSetByPattern(RedisModuleCtx *ctx, regex_t *r, char *setName, Vector
size_t nKeys = RedisModule_CallReplyLength(keys); size_t nKeys = RedisModule_CallReplyLength(keys);
for (size_t i = 0; i < nKeys; i++) { for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i)); RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l; const char *s = RedisModule_StringToChar(key);
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(r, s, 1, NULL, 0)) { if (!regexec(r, s, 1, NULL, 0)) {
if (vWhere == NULL || whereRecord(ctx, key, vWhere)) { if (vWhere == NULL || whereRecord(ctx, key, vWhere)) {
RedisModule_Call(ctx, "SADD", "cs", setName, key); RedisModule_Call(ctx, "SADD", "cs", setName, key);
@ -280,6 +338,8 @@ size_t buildSetByPattern(RedisModuleCtx *ctx, regex_t *r, char *setName, Vector
} }
int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc < 2) if (argc < 2)
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
@ -287,6 +347,7 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
long top = -1; long top = -1;
RedisModuleString *fromKeys; RedisModuleString *fromKeys;
char intoKey[32] = ""; char intoKey[32] = "";
char csvFile[128] = "";
// Process the arguments // Process the arguments
size_t plen; size_t plen;
@ -362,8 +423,16 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
break; break;
case -4: case -4:
// parse into clause, assume time+rand always is new key // parse into clause, assume time+rand always is new key
if (strcmp("csv", token) == 0)
step = -45;
else {
strcpy(intoKey, token); strcpy(intoKey, token);
step = -5; step = -5;
}
break;
case -45:
strcpy(csvFile, token);
step = -5;
break; break;
case -5: case -5:
if (strcmp("from", token) == 0) if (strcmp("from", token) == 0)
@ -437,10 +506,8 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
Vector *vWhere = splitWhereString(stmWhere); Vector *vWhere = splitWhereString(stmWhere);
Vector *vOrder = splitStringByChar(stmOrder, ","); Vector *vOrder = splitStringByChar(stmOrder, ",");
RedisModule_AutoMemory(ctx);
/* Convert key to regex */ /* Convert key to regex */
const char *pat = RedisModule_StringPtrLen(fromKeys, &plen); const char *pat = RedisModule_StringToChar(fromKeys);
regex_t regex; regex_t regex;
if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR; if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR;
@ -479,7 +546,7 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
for(int i = 0; i < cap; i++) for(int i = 0; i < cap; i++)
RedisModule_FreeString(ctx, param[i]); RedisModule_FreeString(ctx, param[i]);
size_t n = processRecords(ctx, rep, &regex, vSelect, NULL, intoKey, &top); size_t n = processRecords(ctx, rep, &regex, vSelect, NULL, &top, intoKey, csvFile);
RedisModule_FreeCallReply(rep); RedisModule_FreeCallReply(rep);
RedisModule_ReplySetArrayLength(ctx, n); RedisModule_ReplySetArrayLength(ctx, n);
@ -503,7 +570,7 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* Filter by pattern matching. */ /* Filter by pattern matching. */
RedisModuleCallReply *rkeys = RedisModule_CallReplyArrayElement(rep, 1); RedisModuleCallReply *rkeys = RedisModule_CallReplyArrayElement(rep, 1);
n += processRecords(ctx, rkeys, &regex, vSelect, vWhere, intoKey, &top); n += processRecords(ctx, rkeys, &regex, vSelect, vWhere, &top, intoKey, csvFile);
RedisModule_FreeCallReply(rkeys); RedisModule_FreeCallReply(rkeys);
RedisModule_FreeCallReply(rep); RedisModule_FreeCallReply(rep);
@ -522,14 +589,9 @@ int SelectCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK; return REDISMODULE_OK;
} }
char* trim(char* s, char t) {
char* p = s;
if (p[strlen(s)-1] == t) p[strlen(s)-1] = 0;
if (p[0] == t) p++;
return p;
}
int InsertCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int InsertCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc < 2) if (argc < 2)
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
@ -663,11 +725,8 @@ int InsertCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
Vector *vField = splitStringByChar(stmField, ","); Vector *vField = splitStringByChar(stmField, ",");
Vector *vValue = splitStringByChar(stmValue, ","); Vector *vValue = splitStringByChar(stmValue, ",");
RedisModule_AutoMemory(ctx);
if (fromCSV != NULL) { if (fromCSV != NULL) {
size_t len; const char *filename = RedisModule_StringToChar(fromCSV);
const char *filename = RedisModule_StringPtrLen(fromCSV, &len);
FILE *fp = fopen(filename, "r"); FILE *fp = fopen(filename, "r");
if (fp == NULL) { if (fp == NULL) {
RedisModule_ReplyWithError(ctx, "File does not exist"); RedisModule_ReplyWithError(ctx, "File does not exist");
@ -741,6 +800,8 @@ int InsertCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
} }
int DeleteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int DeleteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc < 2) if (argc < 2)
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
@ -830,10 +891,8 @@ int DeleteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
Vector *vWhere = splitWhereString(stmWhere); Vector *vWhere = splitWhereString(stmWhere);
RedisModule_AutoMemory(ctx);
/* Convert key to regex */ /* Convert key to regex */
const char *pat = RedisModule_StringPtrLen(fromKeys, &plen); const char *pat = RedisModule_StringToChar(fromKeys);
regex_t regex; regex_t regex;
if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR; if (regexCompile(ctx, &regex, pat)) return REDISMODULE_ERR;
@ -852,8 +911,7 @@ int DeleteCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
size_t nKeys = RedisModule_CallReplyLength(keys); size_t nKeys = RedisModule_CallReplyLength(keys);
for (size_t i = 0; i < nKeys; i++) { for (size_t i = 0; i < nKeys; i++) {
RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i)); RedisModuleString *key = RedisModule_CreateStringFromCallReply(RedisModule_CallReplyArrayElement(keys, i));
size_t l; const char *s = RedisModule_StringToChar(key);
const char *s = RedisModule_StringPtrLen(key, &l);
if (!regexec(&regex, s, 1, NULL, 0)) { if (!regexec(&regex, s, 1, NULL, 0)) {
if (vWhere == NULL || whereRecord(ctx, key, vWhere)) { if (vWhere == NULL || whereRecord(ctx, key, vWhere)) {
RedisModule_Call(ctx, "DEL", "s", key); RedisModule_Call(ctx, "DEL", "s", key);
@ -879,8 +937,7 @@ int ExecCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2) if (argc < 2)
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
size_t plen; const char *arg = RedisModule_StringToChar(argv[1]);
const char *arg = RedisModule_StringPtrLen(argv[1], &plen);
if (strncmp(arg, "select", 6) == 0) if (strncmp(arg, "select", 6) == 0)
return SelectCommand(ctx, argv, argc); return SelectCommand(ctx, argv, argc);