Skip to content

Commit

Permalink
feat : basic function finished
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed May 21, 2024
1 parent feac5b1 commit 589991f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 51 deletions.
4 changes: 2 additions & 2 deletions case/exportCsv.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"filetype": "csv",
"csvDir": "/root/csv/",
"filetype": "csvfile",
"csvPath": "/root/csv/",
"databases": [
{
"dbinfo": {
Expand Down
7 changes: 4 additions & 3 deletions inc/benchCsv.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ int csvTestProcess();

int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir);

void csvWriteThread(void* param);

char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k);

char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k);

int32_t genRowByField(char* buf, int32_t pos1, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k);
int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k);

void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir);

int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);

#endif // INC_BENCHCSV_H_
104 changes: 58 additions & 46 deletions src/benchCsv.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,35 @@
//
// main etry
//
int csvTestProcess() {
pthread_t handle = NULL;
int ret = pthread_create(handle, NULL, csvWriteThread, NULL);
if (ret != 0) {
errorPrint("pthread_create failed. error code =%d", ret);
return -1;
}
pthread_join(handle);
return 0;
}

void csvWriteThread(void* param) {
static void *csvWriteThread(void *param) {
// write thread
for (int i = 0; i < g_arguments->databases->size; i++) {
// database
SDataBase * db = benchArrayGet(g_arguments->databases, i);
for (int j=0; i< db->superTbls->size; j++) {
for (int j=0; j < db->superTbls->size; j++) {
// stb
SSuperTable* stb = benchArrayGet(database->superTbls, j);
SSuperTable* stb = benchArrayGet(db->superTbls, j);
// gen csv
int ret = genWithSTable(db, stb, g_arguments->csvPath);
if(ret != 0) {
errorPrint("failed generate to csv. db=%s stb=%s error code=%d", db->dbName, stb->stbName, ret);
return ret;
return NULL;
}
}
}
return NULL;
}

int csvTestProcess() {
pthread_t handle;
int ret = pthread_create(&handle, NULL, csvWriteThread, NULL);
if (ret != 0) {
errorPrint("pthread_create failed. error code =%d", ret);
return -1;
}
pthread_join(handle, NULL);
return 0;
}

int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir) {
Expand Down Expand Up @@ -85,18 +87,16 @@ void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir

int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
size_t size = fwrite(buf, 1, len, f);
if(size ! = len) {
errorPrint("failed to write csv file. expect write length:%d real write length:%d", len, size);
if(size != len) {
errorPrint("failed to write csv file. expect write length:%d real write length:%d", len, (int32_t)size);
return -1;
}
return 0;
}

int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
int ret = 0;
int cnt = 0;
int pos = 0;
int64_t n = 0; // already inserted rows for one child table
int64_t tk = 0;

int tagDataLen = stb->lenOfTags + stb->tags->size + 256;
Expand All @@ -112,10 +112,10 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL
genTagData(tagData, stb, i, &tk);
// insert child column data
for(int64_t j = 0; j < stb->insertRows; j++) {
genColumnData(colData, ts, db->precision, &ck);
genColumnData(colData, stb, ts, db->precision, &ck);
// combine
pos += sprintf(buf + pos, "%s,%s\n", tagData, colData);
if (bufLen - pos < minRemain ) {
if (bufLen - pos < minRemain) {
// submit
ret = writeCsvFile(fs, buf, pos);
if (ret != 0) {
Expand All @@ -130,6 +130,11 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL
}
}

if (pos > 0) {
ret = writeCsvFile(fs, buf, pos);
pos = 0;
}

END:
// free
tmfree(tagData);
Expand All @@ -139,37 +144,40 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL

int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
int ret = 0;
int cnt = 0;
int pos = 0;
int64_t n = 0; // already inserted rows for one child table
int64_t tk = 0;

char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true);
int colDataLen = stb->lenOfCols + stb->cols->size + 256;
char * colData = (char *) benchCalloc(1, colDataLen, true);
int64_t last_ts = stb->startTimestamp;

while (n < stb->insertRows ) {
for (int64_t i = 0; i < stb->childTblCount; i++) {
// start one table
int64_t ts = stb->startTimestamp;
int64_t ts = last_ts;
int64_t ck = 0;
// tags
tagDatas[i] = genTagData(NULL, stb, i, &tk);
if (tagDatas[i] == NULL) {
tagDatas[i] = genTagData(NULL, stb, i, &tk);
}

// calc need insert rows
int64_t needInserts = stb->interlaceRows;
if(needInserts > stb->insertRows - n) {
needInserts = stb->insertRows - n;
}

for (int64_t j = 0; j < needInserts; j++) {
genColumnData(colData, ts, db->precision, &ck);
genColumnData(colData, stb, ts, db->precision, &ck);
// combine tags,cols
pos += sprintf(buf + pos, "%s,%s\n", tagDatas[i], colData);
if (bufLen - pos < ) {
if (bufLen - pos < minRemain) {
// submit
ret = writeToFile(fs, buf, pos);
ret = writeCsvFile(fs, buf, pos);
if (ret != 0) {
goto END:
goto END;
}
pos = 0;
}
Expand All @@ -178,18 +186,23 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int
ts += stb->timestamp_step;
}

if (i == 0) {
// if last child table
if (i + 1 == stb->childTblCount ) {
n += needInserts;
last_ts = ts;
}
}
}

if (pos > 0) {
ret = writeCsvFile(fs, buf, pos);
pos = 0;
}

END:
// free
for(int64_t m = 0 ; m < stb->childTblCount; m ++) {
if (tagDatas[m]) {
for(int64_t m = 0 ; m < stb->childTblCount; m ++) {
tmfree(tagDatas[m]);
}
}
tmfree(colData);
return ret;
Expand All @@ -208,7 +221,7 @@ char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) {

int pos = 0;
// tbname
pos += sprintf(tagData, "%s%"PRId64, stb->childTblPrefix, i);
pos += sprintf(tagData, "\'%s%"PRId64"\'", stb->childTblPrefix, i);
// tags
pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k);

Expand All @@ -217,36 +230,35 @@ char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) {

// gen column data
char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) {
int pos = 0;
char str[128] = "";
toolsFormatTimestamp(colData, ts, precision);
pos = strlen(colData);
char szTime[128] = {0};
toolsFormatTimestamp(szTime, ts, precision);
int pos = sprintf(colData, "\'%s\'", szTime);

// columns
genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, precision, k);
genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k);
return colData;
}


int32_t genRowByField(char* buf, int32_t pos1, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) {
int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) {

// other cols data
int32_t pos2 = 0;
int32_t pos1 = 0;
for(uint16_t i = 0; i < fieldCnt; i++) {
Field* fd = benchArrayGet(fields, GET_IDX(i));
Field* fd = benchArrayGet(fields, i);
char* prefix = "";
if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) {
if(stb->binaryPrefex) {
prefix = stb->binaryPrefex;
if(binanryPrefix) {
prefix = binanryPrefix;
}
} else if(fd->type == TSDB_DATA_TYPE_NCHAR) {
if(stb->ncharPrefex) {
prefix = stb->ncharPrefex;
if(ncharPrefix) {
prefix = ncharPrefix;
}
}

pos2 += dataGenByField(fd, buf + pos1 + pos2, prefix, k);
pos1 += dataGenByField(fd, buf, pos1, prefix, k);
}

return pos2;
return pos1;
}

0 comments on commit 589991f

Please sign in to comment.