Skip to content

Commit

Permalink
feat: init csv moudle
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed May 21, 2024
1 parent e754cc6 commit 958b2bb
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 1 deletion.
44 changes: 44 additions & 0 deletions case/exportCsv.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"filetype": "csv",
"csvDir": "/root/csv/",
"databases": [
{
"dbinfo": {
"name": "exportCsv"
},
"super_tables": [
{
"name": "meters",
"childtable_count": 100,
"insert_rows": 100000,
"childtable_prefix": "d",
"timestamp_step": 10,
"start_timestamp":1600000000000,
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi"},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 32},
{ "type": "nchar", "name": "nch", "len": 64}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}
2 changes: 2 additions & 0 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
CSVFILE_TEST // 3
};

enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT };
Expand Down Expand Up @@ -965,6 +966,7 @@ typedef struct SArguments_S {
bool mistMode;
bool escape_character;
bool pre_load_tb_meta;
char csvPath[MAX_FILE_NAME_LEN];
} SArguments;

typedef struct SBenchConn {
Expand Down
35 changes: 35 additions & 0 deletions inc/benchCsv.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the MIT license as published by the Free Software
* Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef INC_BENCHCSV_H_
#define INC_BENCHCSV_H_

#include <bench.h>

int csvTestProcess();

int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir);

void csvWriteThread(void* param);

char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k);

char * genColumnData(char* buf, SSuperTable* stb, int64_t i, int precision, int64_t *k)

int32_t genRowByField(char* buf, int32_t pos1, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k);

void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir);

#endif // INC_BENCHCSV_H_
250 changes: 250 additions & 0 deletions src/benchCsv.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the MIT license as published by the Free Software
* Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*/

#include <bench.h>
#include <benchDataMix.h>
#include <benchCsv.h>


//
// main etry
//
int csvTestProcess() {
pthread_t handle = NULL;
int ret = pthread_create(handle, NULL, csvWriteThread, NULL);
if (ret != 0) {
errorPrint("pthread_create failed. error code =%d", ret);
return -1;
}
pthread_join(handle);
return 0;
}

void csvWriteThread(void* param) {
// write thread
for (int i = 0; i < g_arguments->databases->size; i++) {
// database
SDataBase * db = benchArrayGet(g_arguments->databases, i);
for (int j=0; i< db->superTbls->size; j++) {
// stb
SSuperTable* stb = benchArrayGet(database->superTbls, j);
// gen csv
int ret = genWithSTable(db, stb, g_arguments->csvPath);
if(ret != 0) {
errorPrint("failed generate to csv. db=%s stb=%s error code=%d", db->dbName, stb->stbName, ret);
return ret;
}
}
}
}

int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir) {
// filename
int ret = 0;
char outFile[MAX_FILE_NAME_LEN] = {0};
obtainCsvFile(outFile, db, stb, outDir);
FILE * fs = fopen(outFile, "w");
if(fs == NULL) {
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s", outFile, errno, strerror(errno));
return -1;
}

int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
int bufLen = rowLen * g_arguments->reqPerReq;
char* buf = benchCalloc(1, bufLen, true);

if (stb->interlaceRows > 0) {
// interlace mode
ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
} else {
// batch mode
ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
}

tmfree(buf);
close(fs);

return ret;
}


void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) {
sprintf(outFile, "%s//%s-%s.csv", outDir, db->dbName, stb->stbName);
}



int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
size_t size = fwrite(buf, 1, len, f);
if(size ! = len) {
errorPrint("failed to write csv file. expect write length:%d real write length:%d", len, size);
return -1;
}
return 0;
}

int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
int ret = 0;
int cnt = 0;
int pos = 0;
int64_t n = 0; // already inserted rows for one child table

int tagDataLen = stb->lenOfTags + stb->tags->size + 256;
char * tagData = (char *) benchCalloc(1, tagDataLen, true);
int colDataLen = stb->lenOfCols + stb->cols->size + 256;
char * colData = (char *) benchCalloc(1, colDataLen, true);

// gen child name
for (int64_t i = 0; i < stb->childTblCount; i++) {
int64_t ts = stb->startTimestamp;
// tags
genTagData(tagData, stb, i);
// insert child column data
for(int64_t j=0; j< stb->insertRows; j++) {
genColumnData(colData, ts, i, j);
// combine
pos += sprintf(buf + pos, "%s,%s\n", tagData, colData);
if (bufLen - pos < minRemain ) {
// submit
ret = writeCsvFile(fs, buf, pos);
if (ret != 0) {
goto END;
}

pos = 0;
}

// ts move next
ts += stb->timestamp_step;
}
}

END:
// free
tmfree(tagData);
tmfree(colData);
return ret;
}

int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
int ret = 0;
int cnt = 0;
int pos = 0;
int64_t n = 0; // already inserted rows for one child table
int64_t tk = 0;

char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true);
int colDataLen = stb->lenOfCols + stb->cols->size + 256;
char * colData = (char *) benchCalloc(1, colDataLen, true);

while (n < stb->insertRows ) {
for (int64_t i = 0; i < stb->childTblCount; i++) {
// start one table
int64_t ts = stb->startTimestamp;
int64_t ck = 0;
// tags
tagDatas[i] = genTagData(NULL, stb, i, &tk);
// calc need insert rows
int64_t needInserts = stb->interlaceRows;
if(needInserts > stb->insertRows - n) {
needInserts = stb->insertRows - n;
}

for (int64_t j = 0; j < needInserts; j++) {
genColumnData(colData, ts, i, j, &ck, db->precision);
// combine tags,cols
pos += sprintf(buf + pos, "%s,%s\n", tagDatas[i], colData);
if (bufLen - pos < ) {
// submit
ret = writeToFile(fs, buf, pos);
if (ret != 0) {
goto END:
}
pos = 0;
}

// ts move next
ts += stb->timestamp_step;
}

if (i == 0) {
n += needInserts;
}
}
}

END:
// free
for(int64_t m = 0 ; m < stb->childTblCount; m ++) {
if (tagDatas[m]) {
tmfree(tagDatas[m]);
}
}
tmfree(colData);
return ret;
}

// gen tag data
char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) {
// malloc
char* tagData;
if (buf == NULL) {
int tagDataLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size + 32;
tagData = benchCalloc(1, tagDataLen, true);
} else {
tagData = buf;
}

int pos = 0;
// tbname
pos += sprintf(tagData + pos, "%s%d", stb->childTblPrefix);
// tags
pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k);

return tagData;
}

// gen column data
char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int64_t i, int32_t precision, int64_t *k) {
int pos = 0;
char str[128] = "";
toolsFormatTimestamp(colData, ts, precision);
pos = strlen(colData);

// columns
genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, precision, k);
return colData;
}


int32_t genRowByField(char* buf, int32_t pos1, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) {

// other cols data
int32_t pos2 = 0;
for(uint16_t i = 0; i < fieldCnt; i++) {
Field* fd = benchArrayGet(fields, GET_IDX(i));
char* prefix = "";
if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) {
if(stb->binaryPrefex) {
prefix = stb->binaryPrefex;
}
} else if(fd->type == TSDB_DATA_TYPE_NCHAR) {
if(stb->ncharPrefex) {
prefix = stb->ncharPrefex;
}
}

pos2 += dataGenByField(fd, buf + pos1 + pos2, prefix, k);
}

return pos2;
}
17 changes: 16 additions & 1 deletion src/benchJsonOpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,19 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
}
}

g_arguments->csvPath[0] = 0;
tools_cJSON *csv = tools_cJSON_GetObjectItem(json, "csvPath");
if (csvPath && (csv->type == tools_cJSON_String)
&& (csv->valuestring != NULL)) {
tstrncpy(g_arguments->csvPath, csv->valuestring, MAX_FILE_NAME_LEN);
}

if(g_arguments->csvPath[0] == 0) {
// set default with current path
strcpy(g_arguments->csvPath, "./output");
mkdir(_arguments->csvPath, 0775);
}

code = 0;
return code;
}
Expand Down Expand Up @@ -2230,6 +2243,8 @@ int getInfoFromJsonFile() {
g_arguments->test_mode = QUERY_TEST;
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_arguments->test_mode = SUBSCRIBE_TEST;
} else if (0 == strcasecmp("csvfile", filetype->valuestring)) {
g_arguments->test_mode = CSVFILE_TEST;
} else {
errorPrint("%s",
"failed to read json, filetype not support\n");
Expand All @@ -2241,7 +2256,7 @@ int getInfoFromJsonFile() {

// read common item
code = getMetaFromCommonJsonFile(root);
if (INSERT_TEST == g_arguments->test_mode) {
if (INSERT_TEST == g_arguments->test_mode || CSVFILE_TEST == g_arguments->test_mode) {
code = getMetaFromInsertJsonFile(root);
#ifdef TD_VER_COMPATIBLE_3_0_0_0
} else if (QUERY_TEST == g_arguments->test_mode) {
Expand Down
Loading

0 comments on commit 958b2bb

Please sign in to comment.