diff --git a/.github/workflows/3.0-windows-build.yml b/.github/workflows/3.0-windows-build.yml index 8bc58fad..68a0feeb 100644 --- a/.github/workflows/3.0-windows-build.yml +++ b/.github/workflows/3.0-windows-build.yml @@ -5,12 +5,10 @@ on: - cron: "10 16 * * *" push: branches: - - develop - 3.0 - main pull_request: branches: - - develop - 3.0 - main diff --git a/case/exportCsv.json b/case/exportCsv.json new file mode 100644 index 00000000..f031266c --- /dev/null +++ b/case/exportCsv.json @@ -0,0 +1,77 @@ +{ + "filetype": "csvfile", + "csvPath": "/root/csv/", + "num_of_records_per_req": 10000, + "databases": [ + { + "dbinfo": { + "name": "csvdb" + }, + "super_tables": [ + { + "name": "batchTable", + "childtable_count": 10, + "insert_rows": 50, + "childtable_prefix": "d", + "timestamp_step": 10, + "start_timestamp":1600000000000, + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + }, + { + "name": "interlaceTable", + "childtable_count": 5, + "insert_rows": 100, + "interlace_rows": 10, + "childtable_prefix": "e", + "timestamp_step": 1000, + "start_timestamp":1700000000000, + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min":16}, + { "type": "double", "name": "dc", "min":16}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 32}, + { "type": "nchar", "name": "nch", "len": 64} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/case/insertBindVGroup.json b/case/insertBindVGroup.json new file mode 100644 index 00000000..beaadfb5 --- /dev/null +++ b/case/insertBindVGroup.json @@ -0,0 +1,58 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "num_of_records_per_req": 200, + "thread_count": 20, + "thread_bind_vgroup": "yes", + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "binddb", + "drop": "yes", + "vgroups": 2 + }, + "super_tables": [ + { + "name": "meters", + "child_table_exists": "no", + "childtable_count": 4, + "insert_rows": 100, + "interlace_rows": 10, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 1000, + "start_timestamp":1500000000000, + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "max": 1, "min": 0 }, + { "type": "double", "name": "dc", "max": 1, "min": 0 }, + { "type": "tinyint", "name": "ti", "max": 100, "min": 0 }, + { "type": "smallint", "name": "si", "max": 100, "min": 0 }, + { "type": "int", "name": "ic", "max": 100, "min": 0 }, + { "type": "bigint", "name": "bi", "max": 100, "min": 0 }, + { "type": "utinyint", "name": "uti", "max": 100, "min": 0 }, + { "type": "usmallint", "name": "usi", "max": 100, "min": 0 }, + { "type": "uint", "name": "ui", "max": 100, "min": 0 }, + { "type": "ubigint", "name": "ubi", "max": 100, "min": 0 }, + { "type": "binary", "name": "bin", "len": 32}, + { "type": "nchar", "name": "nch", "len": 64} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/deps/avro/lang/c/examples/quickstop.c b/deps/avro/lang/c/examples/quickstop.c index ff9e9700..a1e5a0f1 100644 --- a/deps/avro/lang/c/examples/quickstop.c +++ b/deps/avro/lang/c/examples/quickstop.c @@ -107,7 +107,7 @@ int print_person(avro_file_reader_t db, avro_schema_t reader_schema) if (rval == 0) { int64_t id; int32_t age; - int32_t *p; + char *p = NULL; size_t size; avro_value_t id_value; avro_value_t first_value; diff --git a/inc/bench.h b/inc/bench.h index 52874d66..248a2e8f 100644 --- a/inc/bench.h +++ b/inc/bench.h @@ -470,6 +470,7 @@ enum TEST_MODE { INSERT_TEST, // 0 QUERY_TEST, // 1 SUBSCRIBE_TEST, // 2 + CSVFILE_TEST // 3 }; enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT }; @@ -646,6 +647,8 @@ typedef struct STSMA { #define SUIT_DATAPOS_MUL_FILE 4 #define SUIT_DATAPOS_MIX 5 +#define VAL_NULL "NULL" + enum CONTINUE_IF_FAIL_MODE { NO_IF_FAILED, // 0 YES_IF_FAILED, // 1 @@ -788,14 +791,12 @@ typedef struct SSTREAM_S { bool drop; } SSTREAM; -#ifdef TD_VER_COMPATIBLE_3_0_0_0 typedef struct SVGroup_S { int32_t vgId; uint64_t tbCountPerVgId; SChildTable **childTblArray; uint64_t tbOffset; // internal use } SVGroup; -#endif // TD_VER_COMPATIBLE_3_0_0_0 // typedef struct SDataBase_S { char * dbName; @@ -923,7 +924,6 @@ typedef struct SArguments_S { uint32_t binwidth; uint32_t intColumnCount; uint32_t nthreads; - bool nthreads_auto; uint32_t table_threads; uint64_t prepared_rand; uint32_t reqPerReq; @@ -965,6 +965,9 @@ typedef struct SArguments_S { bool mistMode; bool escape_character; bool pre_load_tb_meta; + char csvPath[MAX_FILE_NAME_LEN]; + + bool bind_vgroup; } SArguments; typedef struct SBenchConn { @@ -1160,7 +1163,7 @@ int32_t benchGetTotalMemory(int64_t *totalKB); int32_t benchParseArgsNoArgp(int argc, char* argv[]); #endif -int32_t execInsert(threadInfo *pThreadInfo, uint32_t k); +int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t* delay3); // if return true, timestmap must add timestap_step, else timestamp no need changed bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt); @@ -1177,5 +1180,6 @@ uint64_t tmpUint64Impl(Field *field, int32_t angle, int64_t k); float tmpFloatImpl(Field *field, int i, int32_t angle, int32_t k); double tmpDoubleImpl(Field *field, int32_t angle, int32_t k); int tmpStr(char *tmp, int iface, Field *field, int64_t k); +int tmpGeometry(char *tmp, int iface, Field *field, int64_t k); #endif // INC_BENCH_H_ diff --git a/inc/benchCsv.h b/inc/benchCsv.h new file mode 100644 index 00000000..25d0c55e --- /dev/null +++ b/inc/benchCsv.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the MIT license as published by the Free Software + * Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef INC_BENCHCSV_H_ +#define INC_BENCHCSV_H_ + +#include + +int csvTestProcess(); + +int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir); + +char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k); + +char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k); + +int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k); + +void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir); + +int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); +int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); + +#endif // INC_BENCHCSV_H_ diff --git a/inc/benchData.h b/inc/benchData.h index a88271b1..29fed177 100644 --- a/inc/benchData.h +++ b/inc/benchData.h @@ -20,8 +20,10 @@ /***** Global variables ******/ /***** Declare functions *****/ void rand_string(char *str, int size, bool chinese); -int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, - int disorderRange); +void rand_geometry(char *str, int fieldLen, int maxType); +int geoCalcBufferSize(int fieldLen); +int getGeoMaxType(int fieldLen); +int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, int disorderRange); int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf, int64_t bufLen, int lenOfOneRow, BArray * fields, int64_t loop, diff --git a/inc/benchDataMix.h b/inc/benchDataMix.h index 4beeb8ee..fe352015 100644 --- a/inc/benchDataMix.h +++ b/inc/benchDataMix.h @@ -17,7 +17,7 @@ #define __BENCHDATAMIX_H_ -uint32_t dataGenByField(Field* fd, char* pstr, uint32_t len, char* prefix, int64_t *k); +uint32_t dataGenByField(Field* fd, char* pstr, uint32_t len, char* prefix, int64_t *k, char* nullVal); // data generate by calc ts uint32_t dataGenByCalcTs(Field* fd, char* pstr, uint32_t len, int64_t ts); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e8a6da33..401c69c2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -235,9 +235,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin ADD_DEPENDENCIES(taosdump deps-jansson) ADD_DEPENDENCIES(taosdump deps-snappy) IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0") - ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) ELSE() - ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) ENDIF() ELSE () INCLUDE_DIRECTORIES(/usr/local/include) @@ -246,9 +246,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin SET(OS_ID "Darwin") IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0") - ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) ELSE() - ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) ENDIF() ENDIF () @@ -441,9 +441,9 @@ ELSE () SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /utf-8") SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} /utf-8") IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0") - ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c) ELSE () - ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) + ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchCsv.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c) ENDIF () ADD_EXECUTABLE(taosdump taosdump.c toolsSys.c toolstime.c toolsDir.c toolsString.c) diff --git a/src/benchCommandOpt.c b/src/benchCommandOpt.c index bf9d5d24..a2e87f43 100644 --- a/src/benchCommandOpt.c +++ b/src/benchCommandOpt.c @@ -28,8 +28,6 @@ extern char g_configDir[MAX_PATH_LEN]; #define TAOSBENCHMARK_STATUS "unknown" #endif -// libtaos.so -extern char buildinfo[]; char *g_aggreFuncDemo[] = {"*", "count(*)", @@ -49,9 +47,7 @@ void printVersion() { // version printf("taosBenchmark version: %s\ngit: %s\n", taosBenchmark_ver, taosBenchmark_commit); -#ifdef LINUX - printf("build: %s\n", buildinfo); -#endif + printf("build: %s\n", getBuildInfo()); if (strlen(taosBenchmark_status) > 0) { printf("status: %s\n", taosBenchmark_status); } @@ -242,7 +238,6 @@ void initArgument() { g_arguments->performance_print = 0; g_arguments->output_file = DEFAULT_OUTPUT; g_arguments->nthreads = DEFAULT_NTHREADS; - g_arguments->nthreads_auto = true; g_arguments->table_threads = DEFAULT_NTHREADS; g_arguments->prepared_rand = DEFAULT_PREPARED_RAND; g_arguments->reqPerReq = DEFAULT_REQ_PER_REQ; diff --git a/src/benchCsv.c b/src/benchCsv.c new file mode 100644 index 00000000..34e0ba7b --- /dev/null +++ b/src/benchCsv.c @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the MIT license as published by the Free Software + * Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + */ + +#include +#include +#include + + +// +// main etry +// + +#define SHOW_CNT 100000 + +static void *csvWriteThread(void *param) { + // write thread + for (int i = 0; i < g_arguments->databases->size; i++) { + // database + SDataBase * db = benchArrayGet(g_arguments->databases, i); + for (int j=0; j < db->superTbls->size; j++) { + // stb + SSuperTable* stb = benchArrayGet(db->superTbls, j); + // gen csv + int ret = genWithSTable(db, stb, g_arguments->csvPath); + if(ret != 0) { + errorPrint("failed generate to csv. db=%s stb=%s error code=%d \n", db->dbName, stb->stbName, ret); + return NULL; + } + } + } + return NULL; +} + +int csvTestProcess() { + pthread_t handle; + int ret = pthread_create(&handle, NULL, csvWriteThread, NULL); + if (ret != 0) { + errorPrint("pthread_create failed. error code =%d \n", ret); + return -1; + } + + infoPrint("start output to csv %s ...\n", g_arguments->csvPath); + int64_t start = toolsGetTimestampMs(); + pthread_join(handle, NULL); + int64_t delay = toolsGetTimestampMs() - start; + infoPrint("output to csv %s finished. delay:%"PRId64"s \n", g_arguments->csvPath, delay/1000); + + return 0; +} + +int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir) { + // filename + int ret = 0; + char outFile[MAX_FILE_NAME_LEN] = {0}; + obtainCsvFile(outFile, db, stb, outDir); + FILE * fs = fopen(outFile, "w"); + if(fs == NULL) { + errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno)); + return -1; + } + + int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size; + int bufLen = rowLen * g_arguments->reqPerReq; + char* buf = benchCalloc(1, bufLen, true); + + infoPrint("start write csv file: %s \n", outFile); + + if (stb->interlaceRows > 0) { + // interlace mode + ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); + } else { + // batch mode + ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); + } + + tmfree(buf); + fclose(fs); + + succPrint("end write csv file: %s \n", outFile); + return ret; +} + + +void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) { + sprintf(outFile, "%s%s-%s.csv", outDir, db->dbName, stb->stbName); +} + +int32_t writeCsvFile(FILE* f, char * buf, int32_t len) { + size_t size = fwrite(buf, 1, len, f); + if(size != len) { + errorPrint("failed to write csv file. expect write length:%d real write length:%d \n", len, (int32_t)size); + return -1; + } + return 0; +} + +int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { + int ret = 0; + int pos = 0; + int64_t tk = 0; + int64_t show = 0; + + int tagDataLen = stb->lenOfTags + stb->tags->size + 256; + char * tagData = (char *) benchCalloc(1, tagDataLen, true); + int colDataLen = stb->lenOfCols + stb->cols->size + 256; + char * colData = (char *) benchCalloc(1, colDataLen, true); + + // gen child name + for (int64_t i = 0; i < stb->childTblCount; i++) { + int64_t ts = stb->startTimestamp; + int64_t ck = 0; + // tags + genTagData(tagData, stb, i, &tk); + // insert child column data + for(int64_t j = 0; j < stb->insertRows; j++) { + genColumnData(colData, stb, ts, db->precision, &ck); + // combine + pos += sprintf(buf + pos, "%s,%s\n", tagData, colData); + if (bufLen - pos < minRemain) { + // submit + ret = writeCsvFile(fs, buf, pos); + if (ret != 0) { + goto END; + } + + pos = 0; + } + + // ts move next + ts += stb->timestamp_step; + + // check cancel + if(g_arguments->terminate) { + infoPrint("%s", "You are cancel, exiting ...\n"); + ret = -1; + goto END; + } + + // print show + if (++show % SHOW_CNT == 0) { + infoPrint("batch write child table cnt = %"PRId64 " all rows = %" PRId64 "\n", i+1, show); + } + + } + } + + if (pos > 0) { + ret = writeCsvFile(fs, buf, pos); + pos = 0; + } + +END: + // free + tmfree(tagData); + tmfree(colData); + return ret; +} + +int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { + int ret = 0; + int pos = 0; + int64_t n = 0; // already inserted rows for one child table + int64_t tk = 0; + int64_t show = 0; + + char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true); + int colDataLen = stb->lenOfCols + stb->cols->size + 256; + char * colData = (char *) benchCalloc(1, colDataLen, true); + int64_t last_ts = stb->startTimestamp; + + while (n < stb->insertRows ) { + for (int64_t i = 0; i < stb->childTblCount; i++) { + // start one table + int64_t ts = last_ts; + int64_t ck = 0; + // tags + if (tagDatas[i] == NULL) { + tagDatas[i] = genTagData(NULL, stb, i, &tk); + } + + // calc need insert rows + int64_t needInserts = stb->interlaceRows; + if(needInserts > stb->insertRows - n) { + needInserts = stb->insertRows - n; + } + + for (int64_t j = 0; j < needInserts; j++) { + genColumnData(colData, stb, ts, db->precision, &ck); + // combine tags,cols + pos += sprintf(buf + pos, "%s,%s\n", tagDatas[i], colData); + if (bufLen - pos < minRemain) { + // submit + ret = writeCsvFile(fs, buf, pos); + if (ret != 0) { + goto END; + } + pos = 0; + } + + // ts move next + ts += stb->timestamp_step; + + // check cancel + if(g_arguments->terminate) { + infoPrint("%s", "You are cancel, exiting ... \n"); + ret = -1; + goto END; + } + + // print show + if (++show % SHOW_CNT == 0) { + infoPrint("interlace write child table index = %"PRId64 " all rows = %"PRId64 "\n", i+1, show); + } + } + + // if last child table + if (i + 1 == stb->childTblCount ) { + n += needInserts; + last_ts = ts; + } + } + } + + if (pos > 0) { + ret = writeCsvFile(fs, buf, pos); + pos = 0; + } + +END: + // free + for(int64_t m = 0 ; m < stb->childTblCount; m ++) { + tmfree(tagDatas[m]); + } + tmfree(colData); + return ret; +} + +// gen tag data +char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) { + // malloc + char* tagData; + if (buf == NULL) { + int tagDataLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size + 32; + tagData = benchCalloc(1, tagDataLen, true); + } else { + tagData = buf; + } + + int pos = 0; + // tbname + pos += sprintf(tagData, "\'%s%"PRId64"\'", stb->childTblPrefix, i); + // tags + pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k); + + return tagData; +} + +// gen column data +char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { + char szTime[128] = {0}; + toolsFormatTimestamp(szTime, ts, precision); + int pos = sprintf(colData, "\'%s\'", szTime); + + // columns + genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k); + return colData; +} + + +int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) { + + // other cols data + int32_t pos1 = 0; + for(uint16_t i = 0; i < fieldCnt; i++) { + Field* fd = benchArrayGet(fields, i); + char* prefix = ""; + if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) { + if(binanryPrefix) { + prefix = binanryPrefix; + } + } else if(fd->type == TSDB_DATA_TYPE_NCHAR) { + if(ncharPrefix) { + prefix = ncharPrefix; + } + } + + pos1 += dataGenByField(fd, buf, pos1, prefix, k, ""); + } + + return pos1; +} diff --git a/src/benchData.c b/src/benchData.c index 890ba1f5..b3fd211b 100644 --- a/src/benchData.c +++ b/src/benchData.c @@ -397,6 +397,8 @@ uint32_t accumulateRowLen(BArray *fields, int iface) { Field *field = benchArrayGet(fields, i); switch (field->type) { case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_GEOMETRY: case TSDB_DATA_TYPE_NCHAR: len += field->length + 3; break; @@ -453,7 +455,21 @@ uint32_t accumulateRowLen(BArray *fields, int iface) { int tmpStr(char *tmp, int iface, Field *field, int64_t k) { - if (g_arguments->demo_mode) { + if (field->values) { + int arraySize = tools_cJSON_GetArraySize(field->values); + if (arraySize) { + tools_cJSON *buf = tools_cJSON_GetArrayItem( + field->values, + taosRandom() % arraySize); + snprintf(tmp, field->length, + "%s", buf->valuestring); + } else { + errorPrint("%s() cannot read correct value " + "from json file. array size: %d\n", + __func__, arraySize); + return -1; + } + } else if (g_arguments->demo_mode) { unsigned int tmpRand = taosRandom(); if (g_arguments->chinese) { snprintf(tmp, field->length, "%s", @@ -465,7 +481,19 @@ int tmpStr(char *tmp, int iface, Field *field, int64_t k) { snprintf(tmp, field->length, "%s", locations[tmpRand % 10]); } - } else if (field->values) { + } else { + if(field->gen == GEN_ORDER) { + snprintf(tmp, field->length, "%"PRId64, k); + } else { + rand_string(tmp, taosRandom() % field->length, g_arguments->chinese); + } + } + return 0; +} + +int tmpGeometry(char *tmp, int iface, Field *field, int64_t k) { + // values + if (field->values) { int arraySize = tools_cJSON_GetArraySize(field->values); if (arraySize) { tools_cJSON *buf = tools_cJSON_GetArrayItem( @@ -479,13 +507,26 @@ int tmpStr(char *tmp, int iface, Field *field, int64_t k) { __func__, arraySize); return -1; } - } else { - if(field->gen == GEN_ORDER) { - snprintf(tmp, field->length, "%"PRId64, k); - } else { - rand_string(tmp, field->length, g_arguments->chinese); - } + return 0; + } + + // gen point count + int32_t cnt = field->length / 24; + if(cnt == 0) { + snprintf(tmp, field->length, "POINT(%d %d)", tmpUint16(field), tmpUint16(field)); + return 0; + } + + int32_t pos = snprintf(tmp, field->length, "LINESTRING("); + char * format = "%d %d,"; + for(int32_t i = 0; i < cnt; i++) { + if (i == cnt - 1) { + format = "%d %d"; + } + pos += snprintf(tmp + pos, field->length - pos, format, tmpUint16(field), tmpUint16(field)); } + strcat(tmp, ")"); + return 0; } @@ -793,6 +834,7 @@ static int generateRandDataSQL(SSuperTable *stbInfo, char *sampleDataBuf, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *tmp = benchCalloc(1, field->length + 1, false); if (0 != tmpStr(tmp, stbInfo->iface, field, k)) { @@ -804,6 +846,17 @@ static int generateRandDataSQL(SSuperTable *stbInfo, char *sampleDataBuf, tmfree(tmp); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + int bufferSize = field->length + 1; + char *tmp = benchCalloc(1, bufferSize, false); + if (0 != tmpGeometry(tmp, stbInfo->iface, field, i)) { + free(tmp); + return -1; + } + n = snprintf(sampleDataBuf + pos, bufLen - pos, "'%s',", tmp); + tmfree(tmp); + break; + } case TSDB_DATA_TYPE_JSON: { pos += tmpJson(sampleDataBuf, bufLen, pos, fieldsSize, field); @@ -972,6 +1025,7 @@ static int fillStmt( break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *tmp = benchCalloc(1, field->length + 1, false); if (0 != tmpStr(tmp, stbInfo->iface, field, k)) { @@ -994,6 +1048,28 @@ static int fillStmt( tmfree(tmp); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *tmp = benchCalloc(1, field->length + 1, false); + if (0 != tmpGeometry(tmp, stbInfo->iface, field, k)) { + tmfree(tmp); + return -1; + } + if (childCol) { + snprintf((char *)childCol->stmtData.data + + k * field->length, + field->length, + "%s", tmp); + } else { + snprintf((char *)field->stmtData.data + + k * field->length, + field->length, + "%s", tmp); + } + n = snprintf(sampleDataBuf + pos, bufLen - pos, + "'%s',", tmp); + tmfree(tmp); + break; + } case TSDB_DATA_TYPE_JSON: { pos += tmpJson(sampleDataBuf, bufLen, pos, fieldsSize, field); @@ -1220,13 +1296,14 @@ static int generateRandDataSmlTelnet(SSuperTable *stbInfo, char *sampleDataBuf, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *tmp = benchCalloc(1, field->length + 1, false); if (0 != tmpStr(tmp, stbInfo->iface, field, k)) { - free(tmp); + tmfree(tmp); return -1; } - if (field->type == TSDB_DATA_TYPE_BINARY) { + if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) { if (tag) { n = snprintf(sampleDataBuf + pos, bufLen - pos, "%s=L\"%s\" ", @@ -1264,6 +1341,31 @@ static int generateRandDataSmlTelnet(SSuperTable *stbInfo, char *sampleDataBuf, tmfree(tmp); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *tmp = benchCalloc(1, field->length + 1, false); + if (0 != tmpGeometry(tmp, stbInfo->iface, field, k)) { + tmfree(tmp); + return -1; + } + if (tag) { + n = snprintf(sampleDataBuf + pos, bufLen - pos, + "%s=L\"%s\" ", + field->name, tmp); + } else { + n = snprintf(sampleDataBuf + pos, bufLen - pos, + "\"%s\" ", tmp); + } + if (n < 0 || n >= bufLen - pos) { + errorPrint("%s() LN%d snprintf overflow\n", + __func__, __LINE__); + tmfree(tmp); + return -1; + } else { + pos += n; + } + tmfree(tmp); + break; + } case TSDB_DATA_TYPE_JSON: { pos += tmpJson(sampleDataBuf, bufLen, pos, fieldsSize, field); @@ -1372,6 +1474,7 @@ static int generateRandDataSmlJson(SSuperTable *stbInfo, char *sampleDataBuf, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *tmp = benchCalloc(1, field->length + 1, false); if (0 != tmpStr(tmp, stbInfo->iface, field, k)) { @@ -1506,6 +1609,7 @@ static int generateRandDataSmlLine(SSuperTable *stbInfo, char *sampleDataBuf, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *tmp = benchCalloc(1, field->length + 1, false); if (0 != tmpStr(tmp, stbInfo->iface, field, k)) { @@ -1524,6 +1628,18 @@ static int generateRandDataSmlLine(SSuperTable *stbInfo, char *sampleDataBuf, tmfree(tmp); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *tmp = benchCalloc(1, field->length + 1, false); + if (0 != tmpGeometry(tmp, stbInfo->iface, field, k)) { + tmfree(tmp); + return -1; + } + n = snprintf(sampleDataBuf + pos, bufLen - pos, + "%s=\"%s\",", + field->name, tmp); + tmfree(tmp); + break; + } case TSDB_DATA_TYPE_JSON: { n = tmpJson(sampleDataBuf, bufLen, pos, fieldsSize, field); @@ -1884,14 +2000,17 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo, } *delay2 += toolsGetTimestampUs() - start; - // if msg > 3MB, break - start = toolsGetTimestampUs(); - if (taos_stmt_add_batch(stmt)) { - errorPrint("taos_stmt_add_batch() failed! reason: %s\n", - taos_stmt_errstr(stmt)); - return 0; + if(stbInfo->autoTblCreating) { + start = toolsGetTimestampUs(); + if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) { + errorPrint("taos_stmt_add_batch() failed! reason: %s\n", + taos_stmt_errstr(pThreadInfo->conn->stmt)); + return 0; + } + if(delay3) { + *delay3 += toolsGetTimestampUs() - start; + } } - *delay3 += toolsGetTimestampUs() - start; return batch; } @@ -1925,10 +2044,11 @@ void generateSmlJsonTags(tools_cJSON *tagsList, } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *buf = (char *)benchCalloc(tag->length + 1, 1, false); rand_string(buf, tag->length, g_arguments->chinese); - if (tag->type == TSDB_DATA_TYPE_BINARY) { + if (tag->type == TSDB_DATA_TYPE_BINARY || tag->type == TSDB_DATA_TYPE_VARBINARY) { tools_cJSON_AddStringToObject(tags, tagName, buf); } else { tools_cJSON_AddStringToObject(tags, tagName, buf); @@ -1991,10 +2111,11 @@ void generateSmlTaosJsonTags(tools_cJSON *tagsList, SSuperTable *stbInfo, } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *buf = (char *)benchCalloc(tag->length + 1, 1, false); rand_string(buf, tag->length, g_arguments->chinese); - if (tag->type == TSDB_DATA_TYPE_BINARY) { + if (tag->type == TSDB_DATA_TYPE_BINARY || tag->type == TSDB_DATA_TYPE_VARBINARY) { tools_cJSON_AddStringToObject(tagObj, "value", buf); tools_cJSON_AddStringToObject(tagObj, "type", "binary"); } else { @@ -2050,6 +2171,7 @@ void generateSmlJsonValues( break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *buf = (char *)benchCalloc(col->length + 1, 1, false); rand_string(buf, col->length, g_arguments->chinese); @@ -2059,6 +2181,15 @@ void generateSmlJsonValues( tmfree(buf); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *buf = (char *)benchCalloc(col->length + 1, 1, false); + tmpGeometry(buf, stbInfo->iface, col, 0); + value_buf = benchCalloc(len_key + col->length + 3, 1, true); + snprintf(value_buf, len_key + col->length + 3, + "\"value\":\"%s\",", buf); + tmfree(buf); + break; + } default: { value_buf = benchCalloc(len_key + 20, 1, true); double doubleTmp = tmpDouble(col); @@ -2092,6 +2223,7 @@ void generateSmlJsonCols(tools_cJSON *array, tools_cJSON *tag, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *buf = (char *)benchCalloc(col->length + 1, 1, false); rand_string(buf, col->length, g_arguments->chinese); @@ -2103,6 +2235,13 @@ void generateSmlJsonCols(tools_cJSON *array, tools_cJSON *tag, tmfree(buf); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *buf = (char *)benchCalloc(col->length + 1, 1, false); + tmpGeometry(buf, stbInfo->iface, col, 0); + tools_cJSON_AddStringToObject(record, "value", buf); + tmfree(buf); + break; + } default: { double doubleTmp = tmpDouble(col); tools_cJSON_AddNumberToObject(record, "value", doubleTmp); @@ -2149,10 +2288,11 @@ void generateSmlTaosJsonCols(tools_cJSON *array, tools_cJSON *tag, break; } case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: { char *buf = (char *)benchCalloc(col->length + 1, 1, false); rand_string(buf, col->length, g_arguments->chinese); - if (col->type == TSDB_DATA_TYPE_BINARY) { + if (col->type == TSDB_DATA_TYPE_BINARY || col->type == TSDB_DATA_TYPE_VARBINARY) { tools_cJSON_AddStringToObject(value, "value", buf); tools_cJSON_AddStringToObject(value, "type", "binary"); } else { @@ -2162,6 +2302,13 @@ void generateSmlTaosJsonCols(tools_cJSON *array, tools_cJSON *tag, tmfree(buf); break; } + case TSDB_DATA_TYPE_GEOMETRY: { + char *buf = (char *)benchCalloc(col->length + 1, 1, false); + tmpGeometry(buf, stbInfo->iface, col, 0); + tools_cJSON_AddStringToObject(value, "value", buf); + tools_cJSON_AddStringToObject(value, "type", "geometry"); + tmfree(buf); + } default: { double dblTmp = (double)col->min; if (col->max != col->min) { diff --git a/src/benchDataGeometry.c b/src/benchDataGeometry.c new file mode 100644 index 00000000..a13189a1 --- /dev/null +++ b/src/benchDataGeometry.c @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the MIT license as published by the Free Software + * Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + */ + +#include +#include + +typedef struct { + double x; + double y; +} SGeoCoord2D; + +typedef enum { GEO_SUB_TYPE_POINT = 0, GEO_SUB_TYPE_LINESTRING, GEO_SUB_TYPE_POLYGON, GEO_SUB_TYPE_COUNT } EGeoSubType; + +const int CCoordSize = 16; + +typedef struct { + const char *preifx; + const char *suffix; + int baseLen; + int minCoordNum; + int maxCoordNum; + bool isClosed; // if true, first coord and last coord must be the same +} SGeoSubTypeInfo; + +// should be ordered by (baseLen + minCoordNum * CCoordSize), ASC +const SGeoSubTypeInfo GeoInfo[] = { + {"POINT(", ")", 5, 1, 1, false}, {"LINESTRING(", ")", 9, 2, 4094, false}, {"POLYGON((", "))", 13, 3, 4094, true}}; + +typedef struct { + EGeoSubType type; + BArray *coordArray; // element: SGeoCoord2D +} SGeoObj2D; + +static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray); +static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum); +static void geoObject2DDestory(SGeoObj2D *pObj); + +static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord); +static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray); +static int geoObject2DToStr(char *buffer, SGeoObj2D *object); + +static BArray *randCoordArray(int count); + +/*--- init & destory ---*/ +static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray) { + SGeoObj2D *pObj = (SGeoObj2D *)benchCalloc(1, sizeof(SGeoObj2D), true); + pObj->type = subType; + pObj->coordArray = coordArray; + return pObj; +} + +static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum) { + const SGeoSubTypeInfo *info = &GeoInfo[subType]; + + if (info->isClosed) { + coordNum = coordNum - 1; + } + BArray *array = randCoordArray(coordNum); + if (info->isClosed) { + SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true); + SGeoCoord2D *pFirstCoord= benchArrayGet(array, 0); + memcpy(pCoord, pFirstCoord, sizeof(SGeoCoord2D)); + benchArrayPush(array, pCoord); + } + return geoObject2DInit(subType, array); +} + +static void geoObject2DDestory(SGeoObj2D *pObj) { + if (!pObj) return; + benchArrayDestroy(pObj->coordArray); + tmfree(pObj); +} + +/*--- string formatters ---*/ +static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord) { return sprintf(buffer, "%10.6lf %10.6lf", pCoord->x, pCoord->y); } + +static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray) { + int pos = 0; + bool firstCoord = true; + for (int i = 0; i < coordArray->size; i++) { + int size = 0; + if (firstCoord) { + firstCoord = false; + } else { + size = sprintf(buffer + pos, "%s", ", "); + pos += size; + } + size = geoCoord2DToStr(buffer + pos, benchArrayGet(coordArray, i)); + pos += size; + } + return pos; +} + +static int geoObject2DToStr(char *buffer, SGeoObj2D *object) { + int pos = sprintf(buffer, "%s", GeoInfo[object->type].preifx); + pos += geoCoord2DArrayToStr(buffer + pos, object->coordArray); + pos += sprintf(buffer + pos, "%s", GeoInfo[object->type].suffix); + return pos; +} + +static BArray *randCoordArray(int count) { + BArray *array = benchArrayInit(8, sizeof(SGeoCoord2D)); + int minVal = -1000, maxVal = 1000; + for (int i = 0; i < count; i++) { + double x = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal); + double y = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal); + SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true); + pCoord->x = x; + pCoord->y = y; + benchArrayPush(array, pCoord); + } + return array; +} + +int geoCalcBufferSize(int fieldLen) { + // not accurate, but enough + return fieldLen + 20; +} + +int getGeoMaxType(int fieldLen) { + int maxType = -1; + for (int type = GEO_SUB_TYPE_COUNT - 1; type >= 0; type--) { + const SGeoSubTypeInfo *info = &GeoInfo[type]; + + int minLen = info->baseLen + info->minCoordNum * CCoordSize; + if (fieldLen >= minLen) { + maxType = type; + break; + } + } + return maxType; +} + +void rand_geometry(char *str, int fieldLen, int maxType) { + EGeoSubType type = taosRandom() % (maxType + 1); + const SGeoSubTypeInfo *info = &GeoInfo[type]; + + int maxCoordNum = (fieldLen - info->baseLen) / CCoordSize; + maxCoordNum = min(maxCoordNum, info->maxCoordNum); + + int coordNum = info->minCoordNum; + if (maxCoordNum > info->minCoordNum) { + coordNum = info->minCoordNum + taosRandom() % (maxCoordNum - info->minCoordNum); + } + + SGeoObj2D *pObj = geoObject2DRandInit(type, coordNum); + geoObject2DToStr(str, pObj); + geoObject2DDestory(pObj); +} diff --git a/src/benchDataMix.c b/src/benchDataMix.c index 1521f263..3692e57f 100644 --- a/src/benchDataMix.c +++ b/src/benchDataMix.c @@ -14,8 +14,6 @@ #include "benchDataMix.h" #include -#define VAL_NULL "NULL" - #define VBOOL_CNT 3 int32_t inul = 20; // interval null count @@ -97,21 +95,27 @@ uint32_t genRadomString(char* val, uint32_t len, char* prefix) { // data row generate by randowm -uint32_t dataGenByField(Field* fd, char* pstr, uint32_t len, char* prefix, int64_t *k) { +uint32_t dataGenByField(Field* fd, char* pstr, uint32_t len, char* prefix, int64_t *k, char* nullVal) { uint32_t size = 0; - char val[512] = VAL_NULL; + int64_t nowts= 0; + char val[512] = {0}; if( fd->fillNull && RD(inul) == 0 ) { - size = sprintf(pstr + len, ",%s", VAL_NULL); + size = sprintf(pstr + len, ",%s", nullVal); return size; } + const char * format = ",%s"; + switch (fd->type) { case TSDB_DATA_TYPE_BOOL: sprintf(val, "%d", tmpBool(fd)); break; // timestamp case TSDB_DATA_TYPE_TIMESTAMP: - strcpy(val, "now"); + nowts = toolsGetTimestampMs(); + strcpy(val, "\'"); + toolsFormatTimestamp(val, nowts, TSDB_TIME_PRECISION_MILLI); + strcat(val, "\'"); break; // signed case TSDB_DATA_TYPE_TINYINT: @@ -149,17 +153,18 @@ uint32_t dataGenByField(Field* fd, char* pstr, uint32_t len, char* prefix, int64 // binary nchar case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_BINARY: - if(fd->gen == GEN_ORDER) { - tmpStr(val, 0, fd, *k); - } else { - genRadomString(val, fd->length > sizeof(val) ? sizeof(val) : fd->length, prefix); - } + case TSDB_DATA_TYPE_VARBINARY: + format = ",\'%s\'"; + tmpStr(val, 0, fd, *k); + break; + case TSDB_DATA_TYPE_GEOMETRY: + tmpGeometry(val, 0, fd, 0); break; default: break; } - size += sprintf(pstr + len, ",%s", val); + size += sprintf(pstr + len, format, val); return size; } @@ -202,6 +207,7 @@ uint32_t dataGenByCalcTs(Field* fd, char* pstr, uint32_t len, int64_t ts) { break; // binary nchar case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_NCHAR: sprintf(val, "%" PRId64, ts); break; diff --git a/src/benchInsert.c b/src/benchInsert.c index 32f01a97..002ace31 100644 --- a/src/benchInsert.c +++ b/src/benchInsert.c @@ -246,10 +246,17 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) { Field * col = benchArrayGet(stbInfo->cols, colIndex); int n; if (col->type == TSDB_DATA_TYPE_BINARY || - col->type == TSDB_DATA_TYPE_NCHAR) { + col->type == TSDB_DATA_TYPE_NCHAR || + col->type == TSDB_DATA_TYPE_VARBINARY || + col->type == TSDB_DATA_TYPE_GEOMETRY) { n = snprintf(colsBuf + len, col_buffer_len - len, ",%s %s(%d)", col->name, convertDatatypeToString(col->type), col->length); + if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) { + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, + colIndex); + return -1; + } } else { n = snprintf(colsBuf + len, col_buffer_len - len, ",%s %s", col->name, @@ -311,10 +318,17 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) { for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) { Field *tag = benchArrayGet(stbInfo->tags, tagIndex); if (tag->type == TSDB_DATA_TYPE_BINARY || - tag->type == TSDB_DATA_TYPE_NCHAR) { + tag->type == TSDB_DATA_TYPE_NCHAR || + tag->type == TSDB_DATA_TYPE_VARBINARY || + tag->type == TSDB_DATA_TYPE_GEOMETRY) { n = snprintf(tagsBuf + len, tag_buffer_len - len, "%s %s(%d),", tag->name, convertDatatypeToString(tag->type), tag->length); + if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) { + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, + tagIndex); + return -1; + } } else if (tag->type == TSDB_DATA_TYPE_JSON) { n = snprintf(tagsBuf + len, tag_buffer_len - len, "%s json", tag->name); @@ -428,7 +442,7 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) { return ret; } -#ifdef TD_VER_COMPATIBLE_3_0_0_0 + int32_t getVgroupsOfDb(SBenchConn *conn, SDataBase *database) { int vgroups = 0; char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0"; @@ -491,7 +505,6 @@ int32_t getVgroupsOfDb(SBenchConn *conn, SDataBase *database) { return vgroups; } -#endif // TD_VER_COMPATIBLE_3_0_0_0 int32_t toolsGetDefaultVGroups() { int32_t cores = toolsGetNumberOfCores(); @@ -521,8 +534,7 @@ int32_t toolsGetDefaultVGroups() { int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) { int dataLen = 0; int n; -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (g_arguments->nthreads_auto || (-1 != g_arguments->inputted_vgroups)) { + if (-1 != g_arguments->inputted_vgroups) { n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen, g_arguments->escape_character ? "CREATE DATABASE IF NOT EXISTS `%s` VGROUPS %d" @@ -538,12 +550,7 @@ int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) { : "CREATE DATABASE IF NOT EXISTS %s", database->dbName); } -#else - n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen, - g_arguments->escape_character - ? "CREATE DATABASE IF NOT EXISTS `%s`" - : "CREATE DATABASE IF NOT EXISTS %s", database->dbName); -#endif // TD_VER_COMPATIBLE_3_0_0_0 + if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) { errorPrint("%s() LN%d snprintf overflow\n", __func__, __LINE__); @@ -676,27 +683,29 @@ int32_t getRemainVnodes(SBenchConn *conn) { int createDatabaseTaosc(SDataBase* database) { char command[SHORT_1K_SQL_BUFF_LEN] = "\0"; + // conn SBenchConn* conn = initBenchConn(); if (NULL == conn) { return -1; } - if (g_arguments->taosc_version == 3) { - for (int i = 0; i < g_arguments->streams->size; i++) { - SSTREAM* stream = benchArrayGet(g_arguments->streams, i); - if (stream->drop) { - snprintf(command, SHORT_1K_SQL_BUFF_LEN, - "DROP STREAM IF EXISTS %s;", - stream->stream_name); - if (queryDbExecCall(conn, command)) { - closeBenchConn(conn); - return -1; - } - infoPrint("%s\n", command); - memset(command, 0, SHORT_1K_SQL_BUFF_LEN); + + // drop stream in old database + for (int i = 0; i < g_arguments->streams->size; i++) { + SSTREAM* stream = benchArrayGet(g_arguments->streams, i); + if (stream->drop) { + snprintf(command, SHORT_1K_SQL_BUFF_LEN, + "DROP STREAM IF EXISTS %s;", + stream->stream_name); + if (queryDbExecCall(conn, command)) { + closeBenchConn(conn); + return -1; } + infoPrint("%s\n", command); + memset(command, 0, SHORT_1K_SQL_BUFF_LEN); } } + // drop old database snprintf(command, SHORT_1K_SQL_BUFF_LEN, g_arguments->escape_character ? "DROP DATABASE IF EXISTS `%s`;": @@ -716,9 +725,9 @@ int createDatabaseTaosc(SDataBase* database) { #endif } + // get remain vgroups int remainVnodes = INT_MAX; -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (g_arguments->nthreads_auto) { + if (g_arguments->bind_vgroup) { remainVnodes = getRemainVnodes(conn); if (0 >= remainVnodes) { errorPrint("Remain vnodes %d, failed to create database\n", @@ -726,9 +735,9 @@ int createDatabaseTaosc(SDataBase* database) { return -1; } } -#endif - geneDbCreateCmd(database, command, remainVnodes); + // generate and execute create database sql + geneDbCreateCmd(database, command, remainVnodes); int32_t code = queryDbExecCall(conn, command); int32_t trying = g_arguments->keep_trying; while (code && trying) { @@ -761,19 +770,17 @@ int createDatabaseTaosc(SDataBase* database) { } infoPrint("command to create database: <%s>\n", command); -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (database->superTbls) { - if (g_arguments->nthreads_auto) { - int32_t vgroups = getVgroupsOfDb(conn, database); - if (vgroups <=0) { - closeBenchConn(conn); - errorPrint("Database %s's vgroups is %d\n", - database->dbName, vgroups); - return -1; - } + + // malloc and get vgroup + if (g_arguments->bind_vgroup) { + int32_t vgroups = getVgroupsOfDb(conn, database); + if (vgroups <= 0) { + closeBenchConn(conn); + errorPrint("Database %s's vgroups is %d\n", + database->dbName, vgroups); + return -1; } } -#endif // TD_VER_COMPATIBLE_3_0_0_0 closeBenchConn(conn); return 0; @@ -852,28 +859,30 @@ static int getIntervalOfTblCreating(threadInfo *pThreadInfo, return 0; } +// table create thread static void *createTable(void *sarg) { if (g_arguments->supplementInsert) { return NULL; } - threadInfo * pThreadInfo = (threadInfo *)sarg; - SDataBase * database = pThreadInfo->dbInfo; - SSuperTable *stbInfo = pThreadInfo->stbInfo; + threadInfo *pThreadInfo = (threadInfo *)sarg; + SDataBase *database = pThreadInfo->dbInfo; + SSuperTable *stbInfo = pThreadInfo->stbInfo; + uint64_t lastTotalCreate = 0; + uint64_t lastPrintTime = toolsGetTimestampMs(); + int32_t len = 0; + int32_t batchNum = 0; + char ttl[SMALL_BUFF_LEN] = ""; + #ifdef LINUX prctl(PR_SET_NAME, "createTable"); #endif - uint64_t lastPrintTime = toolsGetTimestampMs(); pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false); - int len = 0; - int batchNum = 0; infoPrint( "thread[%d] start creating table from %" PRIu64 " to %" PRIu64 "\n", pThreadInfo->threadID, pThreadInfo->start_table_from, pThreadInfo->end_table_to); - - char ttl[SMALL_BUFF_LEN] = ""; if (stbInfo->ttl != 0) { snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl); } @@ -885,9 +894,9 @@ static void *createTable(void *sarg) { int w = 0; // record tagData int smallBatchCount = 0; - for (uint64_t i = pThreadInfo->start_table_from + stbInfo->childTblFrom; - (i <= (pThreadInfo->end_table_to + stbInfo->childTblFrom) - && !g_arguments->terminate); i++) { + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to && !g_arguments->terminate; + i++) { if (g_arguments->terminate) { goto create_table_end; } @@ -987,10 +996,11 @@ static void *createTable(void *sarg) { memset(pThreadInfo->buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN); uint64_t currentPrintTime = toolsGetTimestampMs(); if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) { - infoPrint( - "thread[%d] already created %" PRId64 " tables\n", - pThreadInfo->threadID, pThreadInfo->tables_created); - lastPrintTime = currentPrintTime; + float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime); + infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n", + pThreadInfo->threadID, pThreadInfo->tables_created, speed); + lastPrintTime = currentPrintTime; + lastTotalCreate = pThreadInfo->tables_created; } } @@ -1028,35 +1038,34 @@ static void *createTable(void *sarg) { return NULL; } -static int startMultiThreadCreateChildTable( - SDataBase* database, SSuperTable* stbInfo) { - int code = -1; - int threads = g_arguments->table_threads; - int64_t ntables; +static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) { + int32_t code = -1; + int32_t threads = g_arguments->table_threads; + int64_t ntables; if (stbInfo->childTblTo > 0) { ntables = stbInfo->childTblTo - stbInfo->childTblFrom; + } else if(stbInfo->childTblFrom > 0) { + ntables = stbInfo->childTblCount - stbInfo->childTblFrom; } else { ntables = stbInfo->childTblCount; } pthread_t *pids = benchCalloc(1, threads * sizeof(pthread_t), false); threadInfo *infos = benchCalloc(1, threads * sizeof(threadInfo), false); - uint64_t tableFrom = 0; + uint64_t tableFrom = stbInfo->childTblFrom; if (threads < 1) { threads = 1; } - - int64_t a = ntables / threads; - if (a < 1) { - threads = (int)ntables; - a = 1; - } - if (ntables == 0) { - errorPrint("failed to create child table, childTblCount: %"PRId64"\n", - ntables); + errorPrint("failed to create child table, childTblCount: %"PRId64"\n", ntables); goto over; } - int64_t b = ntables % threads; + + int64_t div = ntables / threads; + if (div < 1) { + threads = (int)ntables; + div = 1; + } + int64_t mod = ntables % threads; int threadCnt = 0; for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) { @@ -1077,10 +1086,12 @@ static int startMultiThreadCreateChildTable( } } pThreadInfo->start_table_from = tableFrom; - pThreadInfo->ntables = i < b ? a + 1 : a; - pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + pThreadInfo->ntables = i < mod ? div + 1 : div; + pThreadInfo->end_table_to = i < mod ? tableFrom + div : tableFrom + div - 1; tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->tables_created = 0; + debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from, + pThreadInfo->end_table_to, pThreadInfo->ntables); pthread_create(pids + i, NULL, createTable, pThreadInfo); threadCnt ++; } @@ -1119,7 +1130,7 @@ static int createChildTables() { "start creating %" PRId64 " table(s) with %d thread(s)\n", g_arguments->totalChildTables, g_arguments->table_threads); } - double start = (double)toolsGetTimestampMs(); + int64_t start = (double)toolsGetTimestampMs(); for (int i = 0; (i < g_arguments->databases->size && !g_arguments->terminate); i++) { @@ -1150,14 +1161,20 @@ static int createChildTables() { } } - double end = (double)toolsGetTimestampMs(); + int64_t end = toolsGetTimestampMs(); + if(end == start) { + end += 1; + } succPrint( "Spent %.4f seconds to create %" PRId64 - " table(s) with %d thread(s), already exist %" PRId64 + " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64 " table(s), actual %" PRId64 " table(s) pre created, %" PRId64 " table(s) will be auto created\n", - (end - start) / 1000.0, g_arguments->totalChildTables, - g_arguments->table_threads, g_arguments->existedChildTables, + (float)(end - start) / 1000.0, + g_arguments->totalChildTables, + g_arguments->table_threads, + g_arguments->actualChildTables * 1000 / (float)(end - start), + g_arguments->existedChildTables, g_arguments->actualChildTables, g_arguments->autoCreatedChildTables); return 0; @@ -1252,22 +1269,17 @@ void postFreeResource() { tmfree(stbInfo->sqls); } - -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if ((0 == stbInfo->interlaceRows) - && (g_arguments->nthreads_auto)) { + // thread_bind + if (database->vgArray) { for (int32_t v = 0; v < database->vgroups; v++) { SVGroup *vg = benchArrayGet(database->vgArray, v); tmfree(vg->childTblArray); vg->childTblArray = NULL; } + benchArrayDestroy(database->vgArray); + database->vgArray = NULL; } -#endif // TD_VER_COMPATIBLE_3_0_0_0 } -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (database->vgArray) - benchArrayDestroy(database->vgArray); -#endif // TD_VER_COMPATIBLE_3_0_0_0 benchArrayDestroy(database->superTbls); } } @@ -1276,12 +1288,13 @@ void postFreeResource() { tools_cJSON_Delete(root); } -int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { +int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) { SDataBase * database = pThreadInfo->dbInfo; SSuperTable *stbInfo = pThreadInfo->stbInfo; TAOS_RES * res = NULL; int32_t code = 0; uint16_t iface = stbInfo->iface; + int64_t start = 0; int32_t trying = (stbInfo->keep_trying)? stbInfo->keep_trying:g_arguments->keep_trying; @@ -1335,6 +1348,20 @@ int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { break; case STMT_IFACE: + // add batch + if(!stbInfo->autoTblCreating) { + start = toolsGetTimestampUs(); + if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) { + errorPrint("taos_stmt_add_batch() failed! reason: %s\n", + taos_stmt_errstr(pThreadInfo->conn->stmt)); + return -1; + } + if(delay3) { + *delay3 += toolsGetTimestampUs() - start; + } + } + + // execute code = taos_stmt_execute(pThreadInfo->conn->stmt); if (code) { errorPrint( @@ -1555,7 +1582,9 @@ void loadChildTableInfo(threadInfo* pThreadInfo) { pos = 0; } } - infoPrint("end load child tables info. delay=%.2fs\n", (toolsGetTimestampUs() - start)/1E6); + int64_t delay = toolsGetTimestampUs() - start; + infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6); + pThreadInfo->totalDelay += delay; tmfree(buf); } @@ -1611,7 +1640,15 @@ static void *syncWriteInterlace(void *sarg) { goto free_of_interlace; } int64_t pos = pThreadInfo->pos; - SChildTable *childTbl = stbInfo->childTblArray[tableSeq]; + + // get childTable + SChildTable *childTbl; + if (g_arguments->bind_vgroup) { + childTbl = pThreadInfo->vg->childTblArray[tableSeq]; + } else { + childTbl = stbInfo->childTblArray[tableSeq]; + } + char * tableName = childTbl->name; char *sampleDataBuf = childTbl->useOwnSample? childTbl->sampleDataBuf: @@ -1864,7 +1901,7 @@ static void *syncWriteInterlace(void *sarg) { } startTs = toolsGetTimestampUs(); - if (execInsert(pThreadInfo, generated)) { + if (execInsert(pThreadInfo, generated, &delay3)) { g_fail = true; goto free_of_interlace; } @@ -1944,12 +1981,13 @@ static void *syncWriteInterlace(void *sarg) { infoPrint( "thread[%d] has currently inserted rows: %" PRIu64 ", peroid insert rate: %.3f rows/s \n", - pThreadInfo->threadID, pThreadInfo->totalInsertRows, + pThreadInfo->threadID, pThreadInfo->totalInsertRows, (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime)); lastPrintTime = currentPrintTime; lastTotalInsertRows = pThreadInfo->totalInsertRows; } } + free_of_interlace: cleanupAndPrint(pThreadInfo, "interlace"); if(csvFile) { @@ -2408,25 +2446,12 @@ void *syncWriteProgressive(void *sarg) { return NULL; } -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (g_arguments->nthreads_auto) { - if (0 == pThreadInfo->vg->tbCountPerVgId) { - return NULL; - } - } else { - infoPrint( - "thread[%d] start progressive inserting into table from " - "%" PRIu64 " to %" PRIu64 "\n", - pThreadInfo->threadID, pThreadInfo->start_table_from, - pThreadInfo->end_table_to + 1); - } -#else infoPrint( - "thread[%d] start progressive inserting into table from " - "%" PRIu64 " to %" PRIu64 "\n", - pThreadInfo->threadID, pThreadInfo->start_table_from, - pThreadInfo->end_table_to + 1); -#endif + "thread[%d] start progressive inserting into table from " + "%" PRIu64 " to %" PRIu64 "\n", + pThreadInfo->threadID, pThreadInfo->start_table_from, + pThreadInfo->end_table_to + 1); + uint64_t lastPrintTime = toolsGetTimestampMs(); uint64_t lastTotalInsertRows = 0; int64_t startTs = toolsGetTimestampUs(); @@ -2447,21 +2472,14 @@ void *syncWriteProgressive(void *sarg) { tableSeq <= pThreadInfo->end_table_to; tableSeq++) { char *sampleDataBuf; SChildTable *childTbl; -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if (g_arguments->nthreads_auto) { + + if (g_arguments->bind_vgroup) { childTbl = pThreadInfo->vg->childTblArray[tableSeq]; } else { - childTbl = stbInfo->childTblArray[ - stbInfo->childTblExists? - tableSeq: - stbInfo->childTblFrom + tableSeq]; + childTbl = stbInfo->childTblArray[tableSeq]; } -#else - childTbl = stbInfo->childTblArray[ - stbInfo->childTblExists? - tableSeq: - stbInfo->childTblFrom + tableSeq]; -#endif + debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name); + if (childTbl->useOwnSample) { sampleDataBuf = childTbl->sampleDataBuf; } else { @@ -2478,7 +2496,16 @@ void *syncWriteProgressive(void *sarg) { int64_t delay3 = 0; if (stmt) { taos_stmt_close(pThreadInfo->conn->stmt); - pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos); + if(stbInfo->autoTblCreating) { + pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos); + } else { + TAOS_STMT_OPTIONS op; + op.reqId = 0; + op.singleStbInsert = true; + op.singleTableBindOnce = true; + pThreadInfo->conn->stmt = taos_stmt_init_with_options(pThreadInfo->conn->taos, &op); + } + if (NULL == pThreadInfo->conn->stmt) { errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL)); @@ -2557,7 +2584,7 @@ void *syncWriteProgressive(void *sarg) { } // only measure insert startTs = toolsGetTimestampUs(); - int code = execInsert(pThreadInfo, generated); + int code = execInsert(pThreadInfo, generated, &delay3); if (code) { if (NO_IF_FAILED == stbInfo->continueIfFail) { warnPrint("The super table parameter " @@ -2598,7 +2625,7 @@ void *syncWriteProgressive(void *sarg) { w = 0; } - code = execInsert(pThreadInfo, generated); + code = execInsert(pThreadInfo, generated, &delay3); if (code) { g_fail = true; goto free_of_progressive; @@ -2810,6 +2837,8 @@ static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl) { break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_GEOMETRY: { size_t tmpLen = strlen(tmpStr); debugPrint("%s() LN%d, index: %d, " @@ -2902,6 +2931,7 @@ static void initStmtData(char dataType, void **data, uint32_t length) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_GEOMETRY: tmpP = calloc(1, g_arguments->prepared_rand * length); assert(tmpP); tmfree(*data); @@ -3080,9 +3110,9 @@ static int printTotalDelay(SDataBase *database, BArray *total_delay_list, int threads, int64_t totalInsertRows, - int64_t start, int64_t end) { + int64_t spend) { // zero check - if (total_delay_list->size == 0 || (end - start) == 0 || threads == 0) { + if (total_delay_list->size == 0 || spend == 0 || threads == 0) { return -1; } @@ -3096,9 +3126,9 @@ static int printTotalDelay(SDataBase *database, succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64 " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n", - (end - start)/1E6, totalDelay/threads/1E6, totalInsertRows, threads, + spend/1E6, totalDelay/threads/1E6, totalInsertRows, threads, database->dbName, - (double)(totalInsertRows / ((end - start)/1E6)), + (double)(totalInsertRows / (spend/1E6)), (double)(totalInsertRows / (totalDelay/threads/1E6)), subDelay); if (!total_delay_list->size) { return -1; @@ -3242,16 +3272,8 @@ static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) { return true; } -static int startMultiThreadInsertData(SDataBase* database, - SSuperTable* stbInfo) { - if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE) - && !stbInfo->use_metric) { - errorPrint("%s", "schemaless cannot work without stable\n"); - return -1; - } - - preProcessArgument(stbInfo); - +int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) { + // ntable calc int64_t ntables; if (stbInfo->childTblTo > 0) { ntables = stbInfo->childTblTo - stbInfo->childTblFrom; @@ -3260,222 +3282,183 @@ static int startMultiThreadInsertData(SDataBase* database, } else { ntables = stbInfo->childTblCount; } - if (ntables == 0) { + + return ntables; +} + +// assign table to thread with vgroups, return assign thread count +int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) { + SBenchConn* conn = initBenchConn(); + if (NULL == conn) { return 0; } + int32_t threads = 0; - uint64_t tableFrom = 0; - int32_t threads = g_arguments->nthreads; - int64_t a = 0, b = 0; - -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if ((0 == stbInfo->interlaceRows) - && (g_arguments->nthreads_auto)) { - SBenchConn* conn = initBenchConn(); - if (NULL == conn) { + // calc table count per vgroup + for (int64_t i = 0; i < stbInfo->childTblCount; i++) { + int vgId; + int ret = taos_get_table_vgId( + conn->taos, database->dbName, + stbInfo->childTblArray[i]->name, &vgId); + if (ret < 0) { + errorPrint("Failed to get %s db's %s table's vgId\n", + database->dbName, + stbInfo->childTblArray[i]->name); + closeBenchConn(conn); return -1; } - - for (int64_t i = 0; i < stbInfo->childTblCount; i++) { - int vgId; - int ret = taos_get_table_vgId( - conn->taos, database->dbName, - stbInfo->childTblArray[i]->name, &vgId); - if (ret < 0) { - errorPrint("Failed to get %s db's %s table's vgId\n", - database->dbName, - stbInfo->childTblArray[i]->name); - closeBenchConn(conn); - return -1; - } - debugPrint("Db %s\'s table\'s %s vgId is: %d\n", - database->dbName, - stbInfo->childTblArray[i]->name, vgId); - for (int32_t v = 0; v < database->vgroups; v++) { - SVGroup *vg = benchArrayGet(database->vgArray, v); - if (vgId == vg->vgId) { - vg->tbCountPerVgId++; - } - } - } - - threads = 0; - for (int v = 0; v < database->vgroups; v++) { + debugPrint("Db %s\'s table\'s %s vgId is: %d\n", + database->dbName, + stbInfo->childTblArray[i]->name, vgId); + for (int32_t v = 0; v < database->vgroups; v++) { SVGroup *vg = benchArrayGet(database->vgArray, v); - infoPrint("Total %"PRId64" tables on bb %s's vgroup %d (id: %d)\n", - vg->tbCountPerVgId, database->dbName, v, vg->vgId); - if (vg->tbCountPerVgId) { - threads++; - } else { - continue; + if (vgId == vg->vgId) { + vg->tbCountPerVgId++; } - vg->childTblArray = benchCalloc( - vg->tbCountPerVgId, sizeof(SChildTable*), true); - vg->tbOffset = 0; } - for (int64_t i = 0; i < stbInfo->childTblCount; i++) { - int vgId; - int ret = taos_get_table_vgId( - conn->taos, database->dbName, - stbInfo->childTblArray[i]->name, &vgId); - if (ret < 0) { - errorPrint("Failed to get %s db's %s table's vgId\n", - database->dbName, - stbInfo->childTblArray[i]->name); + } - closeBenchConn(conn); - return -1; - } - debugPrint("Db %s\'s table\'s %s vgId is: %d\n", - database->dbName, - stbInfo->childTblArray[i]->name, vgId); - for (int32_t v = 0; v < database->vgroups; v++) { - SVGroup *vg = benchArrayGet(database->vgArray, v); - if (vgId == vg->vgId) { - vg->childTblArray[vg->tbOffset] = - stbInfo->childTblArray[i]; - vg->tbOffset++; - } - } - } - closeBenchConn(conn); - } else { - a = ntables / threads; - if (a < 1) { - threads = (int32_t)ntables; - a = 1; - } - b = 0; - if (threads != 0) { - b = ntables % threads; + // malloc vg->childTblArray memory with table count + for (int v = 0; v < database->vgroups; v++) { + SVGroup *vg = benchArrayGet(database->vgArray, v); + infoPrint("Total %"PRId64" tables on %s's vgroup %d (id: %d)\n", + vg->tbCountPerVgId, database->dbName, v, vg->vgId); + if (vg->tbCountPerVgId) { + threads++; + } else { + continue; } + vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true); + vg->tbOffset = 0; } + + // set vg->childTblArray data + for (int64_t i = 0; i < stbInfo->childTblCount; i++) { + int vgId; + int ret = taos_get_table_vgId( + conn->taos, database->dbName, + stbInfo->childTblArray[i]->name, &vgId); + if (ret < 0) { + errorPrint("Failed to get %s db's %s table's vgId\n", + database->dbName, + stbInfo->childTblArray[i]->name); - // valid check - if(threads <= 0) { - errorPrint("db: %s threads num is invalid. threads=%d\n", + closeBenchConn(conn); + return 0; + } + debugPrint("Db %s\'s table\'s %s vgId is: %d\n", database->dbName, - threads); - return -1; + stbInfo->childTblArray[i]->name, vgId); + for (int32_t v = 0; v < database->vgroups; v++) { + SVGroup *vg = benchArrayGet(database->vgArray, v); + if (vgId == vg->vgId) { + vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i]; + vg->tbOffset++; + } + } } + closeBenchConn(conn); - int32_t vgFrom = 0; -#else - a = ntables / threads; - if (a < 1) { - threads = (int32_t)ntables; - a = 1; - } - b = 0; - if (threads != 0) { - b = ntables % threads; - } -#endif // TD_VER_COMPATIBLE_3_0_0_0 + return threads; +} + +// init insert thread +int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) { + int32_t ret = -1; + uint64_t tbNext = stbInfo->childTblFrom; + int32_t vgNext = 0; + FILE* csvFile = NULL; + char* tagData = NULL; + bool stmtN = stbInfo->iface == STMT_IFACE && stbInfo->autoTblCreating == false; + int w = 0; - FILE* csvFile = NULL; - char* tagData = NULL; - bool stmtN = (stbInfo->iface == STMT_IFACE && stbInfo->autoTblCreating == false); - int w = 0; if (stmtN) { csvFile = openTagCsv(stbInfo); tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false); } - - pthread_t *pids = benchCalloc(1, threads * sizeof(pthread_t), true); - threadInfo *infos = benchCalloc(1, threads * sizeof(threadInfo), true); - - for (int32_t i = 0; i < threads; i++) { + + for (int32_t i = 0; i < nthreads; i++) { + // set table threadInfo *pThreadInfo = infos + i; - pThreadInfo->threadID = i; - pThreadInfo->dbInfo = database; - pThreadInfo->stbInfo = stbInfo; + pThreadInfo->threadID = i; + pThreadInfo->dbInfo = database; + pThreadInfo->stbInfo = stbInfo; pThreadInfo->start_time = stbInfo->startTimestamp; - pThreadInfo->pos = 0; + pThreadInfo->pos = 0; + pThreadInfo->samplePos = 0; pThreadInfo->totalInsertRows = 0; - pThreadInfo->samplePos = 0; -#ifdef TD_VER_COMPATIBLE_3_0_0_0 - if ((0 == stbInfo->interlaceRows) - && (g_arguments->nthreads_auto)) { - int32_t j; - for (j = vgFrom; i < database->vgroups; j++) { + + if (g_arguments->bind_vgroup) { + for (int32_t j = vgNext; j < database->vgroups; j++) { SVGroup *vg = benchArrayGet(database->vgArray, j); if (0 == vg->tbCountPerVgId) { continue; } - pThreadInfo->vg = vg; + pThreadInfo->vg = vg; + pThreadInfo->ntables = vg->tbCountPerVgId; pThreadInfo->start_table_from = 0; - pThreadInfo->ntables = vg->tbCountPerVgId; - pThreadInfo->end_table_to = vg->tbCountPerVgId-1; + pThreadInfo->end_table_to = vg->tbCountPerVgId - 1; + vgNext = j + 1; break; - } - vgFrom = j + 1; + } } else { - pThreadInfo->start_table_from = tableFrom; - pThreadInfo->ntables = i < b ? a + 1 : a; - pThreadInfo->end_table_to = (i < b)?(tableFrom+a):(tableFrom+a-1); - tableFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->start_table_from = tbNext; + pThreadInfo->ntables = i < mod ? div + 1 : div; + pThreadInfo->end_table_to = i < mod ? tbNext + div : tbNext + div - 1; + tbNext = pThreadInfo->end_table_to + 1; } -#else - pThreadInfo->start_table_from = tableFrom; - pThreadInfo->ntables = i < b ? a + 1 : a; - pThreadInfo->end_table_to = (i < b)?(tableFrom+a):(tableFrom+a-1); - tableFrom = pThreadInfo->end_table_to + 1; -#endif // TD_VER_COMPATIBLE_3_0_0_0 + + // init conn pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t)); switch (stbInfo->iface) { + // rest case REST_IFACE: { if (stbInfo->interlaceRows > 0) { pThreadInfo->buffer = new_ds(0); } else { - pThreadInfo->buffer = - benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); + pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); } int sockfd = createSockFd(); if (sockfd < 0) { - FREE_PIDS_INFOS_RETURN_MINUS_1(); + goto END; } pThreadInfo->sockfd = sockfd; break; } + // stmt case STMT_IFACE: { pThreadInfo->conn = initBenchConn(); if (NULL == pThreadInfo->conn) { - FREE_PIDS_INFOS_RETURN_MINUS_1(); + goto END; + } + taos_stmt_close(pThreadInfo->conn->stmt); + if(stbInfo->autoTblCreating) { + pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos); + } else { + TAOS_STMT_OPTIONS op; + op.reqId = 0; + op.singleStbInsert = true; + op.singleTableBindOnce = true; + pThreadInfo->conn->stmt = taos_stmt_init_with_options(pThreadInfo->conn->taos, &op); } - pThreadInfo->conn->stmt = - taos_stmt_init(pThreadInfo->conn->taos); if (NULL == pThreadInfo->conn->stmt) { - errorPrint("taos_stmt_init() failed, reason: %s\n", - taos_errstr(NULL)); - FREE_RESOURCE(); - return -1; + errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL)); + goto END; } if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) { - errorPrint("taos select database(%s) failed\n", - database->dbName); - FREE_RESOURCE(); - return -1; + errorPrint("taos select database(%s) failed\n", database->dbName); + goto END; } if (stmtN) { // generator if (w == 0) { if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile)) { - if(csvFile){ - fclose(csvFile); - } - tmfree(tagData); - FREE_RESOURCE(); - return -1; + goto END; } } if (prepareStmt(stbInfo, pThreadInfo->conn->stmt, tagData, w)) { - if(csvFile){ - fclose(csvFile); - } - tmfree(tagData); - FREE_RESOURCE(); - return -1; + goto END; } // move next @@ -3485,18 +3468,14 @@ static int startMultiThreadInsertData(SDataBase* database, } } - pThreadInfo->bind_ts = benchCalloc(1, sizeof(int64_t), true); - pThreadInfo->bind_ts_array = - benchCalloc(1, sizeof(int64_t)*g_arguments->reqPerReq, - true); - pThreadInfo->bindParams = benchCalloc( - 1, sizeof(TAOS_MULTI_BIND)*(stbInfo->cols->size + 1), - true); - pThreadInfo->is_null = benchCalloc(1, g_arguments->reqPerReq, - true); + // malloc bind + pThreadInfo->bind_ts = benchCalloc(1, sizeof(int64_t), true); + pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->reqPerReq, true); + pThreadInfo->bindParams = benchCalloc(1, sizeof(TAOS_MULTI_BIND)*(stbInfo->cols->size + 1), true); + pThreadInfo->is_null = benchCalloc(1, g_arguments->reqPerReq, true); + parseBufferToStmtBatch(stbInfo); - for (int64_t child = 0; - child < stbInfo->childTblCount; child++) { + for (int64_t child = 0; child < stbInfo->childTblCount; child++) { SChildTable *childTbl = stbInfo->childTblArray[child]; if (childTbl->useOwnSample) { parseBufferToStmtBatchChildTbl(stbInfo, childTbl); @@ -3505,44 +3484,34 @@ static int startMultiThreadInsertData(SDataBase* database, break; } + // sml rest case SML_REST_IFACE: { int sockfd = createSockFd(); if (sockfd < 0) { - free(pids); - free(infos); - return -1; + goto END; } pThreadInfo->sockfd = sockfd; } + // sml case SML_IFACE: { pThreadInfo->conn = initBenchConn(); if (pThreadInfo->conn == NULL) { errorPrint("%s() init connection failed\n", __func__); - FREE_RESOURCE(); - return -1; + goto END; } if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) { - errorPrint("taos select database(%s) failed\n", - database->dbName); - FREE_RESOURCE(); - return -1; + errorPrint("taos select database(%s) failed\n", database->dbName); + goto END; } - pThreadInfo->max_sql_len = - stbInfo->lenOfCols + stbInfo->lenOfTags; + pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags; if (stbInfo->iface == SML_REST_IFACE) { - pThreadInfo->buffer = - benchCalloc(1, g_arguments->reqPerReq * - (1 + pThreadInfo->max_sql_len), true); + pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true); } int protocol = stbInfo->lineProtocol; - if (TSDB_SML_JSON_PROTOCOL != protocol - && SML_JSON_TAOS_FORMAT != protocol) { - pThreadInfo->sml_tags = - (char **)benchCalloc(pThreadInfo->ntables, - sizeof(char *), true); + if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) { + pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true); for (int t = 0; t < pThreadInfo->ntables; t++) { - pThreadInfo->sml_tags[t] = - benchCalloc(1, stbInfo->lenOfTags, true); + pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true); } for (int t = 0; t < pThreadInfo->ntables; t++) { @@ -3556,20 +3525,14 @@ static int startMultiThreadInsertData(SDataBase* database, debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t, pThreadInfo->sml_tags[t]); } - pThreadInfo->lines = - benchCalloc(g_arguments->reqPerReq, - sizeof(char *), true); - - for (int j = 0; (j < g_arguments->reqPerReq - && !g_arguments->terminate); j++) { - pThreadInfo->lines[j] = - benchCalloc(1, pThreadInfo->max_sql_len, true); + pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true); + for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) { + pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true); } } else { - pThreadInfo->json_array = tools_cJSON_CreateArray(); - pThreadInfo->sml_json_tags = tools_cJSON_CreateArray(); - pThreadInfo->sml_tags_json_array = (char **)benchCalloc( - pThreadInfo->ntables, sizeof(char *), true); + pThreadInfo->json_array = tools_cJSON_CreateArray(); + pThreadInfo->sml_json_tags = tools_cJSON_CreateArray(); + pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true); for (int t = 0; t < pThreadInfo->ntables; t++) { if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) { generateSmlJsonTags( @@ -3583,48 +3546,33 @@ static int startMultiThreadInsertData(SDataBase* database, pThreadInfo->start_table_from, t); } } - pThreadInfo->lines = (char **)benchCalloc( - 1, sizeof(char *), true); - if ((0 == stbInfo->interlaceRows) - && (TSDB_SML_JSON_PROTOCOL == protocol)) { - pThreadInfo->line_buf_len = - g_arguments->reqPerReq * - accumulateRowLen(pThreadInfo->stbInfo->tags, - pThreadInfo->stbInfo->iface); - debugPrint("%s() LN%d, line_buf_len=%d\n", - __func__, __LINE__, pThreadInfo->line_buf_len); - pThreadInfo->lines[0] = benchCalloc( - 1, pThreadInfo->line_buf_len, true); - pThreadInfo->sml_json_value_array = - (char **)benchCalloc( - pThreadInfo->ntables, sizeof(char *), true); + pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true); + if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) { + pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface); + debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len); + pThreadInfo->lines[0] = benchCalloc(1, pThreadInfo->line_buf_len, true); + pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true); for (int t = 0; t < pThreadInfo->ntables; t++) { - generateSmlJsonValues( - pThreadInfo->sml_json_value_array, stbInfo, t); + generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t); } } } break; } + // taos case TAOSC_IFACE: { pThreadInfo->conn = initBenchConn(); if (pThreadInfo->conn == NULL) { errorPrint("%s() failed to connect\n", __func__); - FREE_RESOURCE(); - return -1; + goto END; } char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false); snprintf(command, SHORT_1K_SQL_BUFF_LEN, - g_arguments->escape_character - ? "USE `%s`" - : "USE %s", + g_arguments->escape_character ? "USE `%s`" : "USE %s", database->dbName); if (queryDbExecCall(pThreadInfo->conn, command)) { - errorPrint("taos select database(%s) failed\n", - database->dbName); - FREE_RESOURCE(); - tmfree(command); - return -1; + errorPrint("taos select database(%s) failed\n", database->dbName); + goto END; } tmfree(command); command = NULL; @@ -3632,11 +3580,9 @@ static int startMultiThreadInsertData(SDataBase* database, if (stbInfo->interlaceRows > 0) { pThreadInfo->buffer = new_ds(0); } else { - pThreadInfo->buffer = - benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); + pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); if (g_arguments->check_sql) { - pThreadInfo->csql = - benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); + pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true); memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN); } } @@ -3648,30 +3594,92 @@ static int startMultiThreadInsertData(SDataBase* database, } } + // success + ret = 0; + +END: if (csvFile) { fclose(csvFile); } tmfree(tagData); + return ret; +} - infoPrint("Estimate memory usage: %.2fMB\n", - (double)g_memoryUsage / 1048576); - prompt(0); +#define EMPTY_SLOT -1 +// run with limit thread +int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) { + infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads); + + // slots save threadInfo array index + int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false); + int32_t t = 0; // thread index + for (int32_t i = 0; i < limitThread; i++) { + slot[i] = EMPTY_SLOT; + } + + while (!g_arguments->terminate) { + int32_t emptySlot = 0; + for (int32_t i = 0; i < limitThread; i++) { + int32_t idx = slot[i]; + // check slot thread end + if(idx != EMPTY_SLOT) { + if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) { + // thread is running + toolsMsleep(2000); + } else { + // thread is end , set slot empty + infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t); + slot[i] = EMPTY_SLOT; + } + } + + if (slot[i] == EMPTY_SLOT && t < nthreads) { + // slot is empty , set new thread to running + threadInfo *pThreadInfo = infos + t; + if (stbInfo->interlaceRows > 0) { + pthread_create(pids + t, NULL, syncWriteInterlace, pThreadInfo); + } else { + pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo); + } + + // save current and move next + slot[i] = t; + t++; + infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]); + } + // check slot empty + if(slot[i] == EMPTY_SLOT) { + emptySlot++; + } + } + + // check all thread end + if(emptySlot == limitThread) { + debugPrint("all threads(%d) is run finished.\n", nthreads); + break; + } else { + debugPrint("current thread index=%d all thread=%d\n", t, nthreads); + } + } + + return 0; +} + +// run +int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) { + infoPrint("run insert thread. real nthread=%d\n", nthreads); // create threads int threadCnt = 0; - for (int i = 0; (i < threads && !g_arguments->terminate); i++) { + for (int i = 0; i < nthreads && !g_arguments->terminate; i++) { threadInfo *pThreadInfo = infos + i; if (stbInfo->interlaceRows > 0) { - pthread_create(pids + i, NULL, - syncWriteInterlace, pThreadInfo); + pthread_create(pids + i, NULL, syncWriteInterlace, pThreadInfo); } else { - pthread_create(pids + i, NULL, - syncWriteProgressive, pThreadInfo); + pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo); } threadCnt ++; - } - - int64_t start = toolsGetTimestampUs(); + } // wait threads for (int i = 0; i < threadCnt; i++) { @@ -3679,7 +3687,12 @@ static int startMultiThreadInsertData(SDataBase* database, pthread_join(pids[i], NULL); } - int64_t end = toolsGetTimestampUs()+1; + return 0; +} + + +// exit and free resource +int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) { if (g_arguments->terminate) toolsMsleep(100); @@ -3691,7 +3704,7 @@ static int startMultiThreadInsertData(SDataBase* database, uint64_t totalInsertRows = 0; // free threads resource - for (int i = 0; i < threads; i++) { + for (int i = 0; i < nthreads; i++) { threadInfo *pThreadInfo = infos + i; // free check sql if (pThreadInfo->csql) { @@ -3699,6 +3712,7 @@ static int startMultiThreadInsertData(SDataBase* database, pThreadInfo->csql = NULL; } + // close conn int protocol = stbInfo->lineProtocol; switch (stbInfo->iface) { case REST_IFACE: @@ -3807,18 +3821,101 @@ static int startMultiThreadInsertData(SDataBase* database, if (g_arguments->terminate) toolsMsleep(100); - free(pids); - free(infos); + tmfree(pids); + tmfree(infos); + // print result int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3, - total_delay_list, threads, totalInsertRows, start, end); + total_delay_list, nthreads, totalInsertRows, spend); benchArrayDestroy(total_delay_list); - if (g_fail || ret) { + if (g_fail || ret != 0) { return -1; } return 0; } +static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) { + if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE) + && !stbInfo->use_metric) { + errorPrint("%s", "schemaless cannot work without stable\n"); + return -1; + } + + // check argument valid + preProcessArgument(stbInfo); + + // ntable + int64_t ntables = obtainTableCount(database, stbInfo); + if (ntables == 0) { + errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName); + return -1; + } + + // assign table to thread + int32_t nthreads = g_arguments->nthreads; + int64_t div = 0; // ntable / nthread division + int64_t mod = 0; // ntable % nthread + int64_t spend = 0; + + if (g_arguments->bind_vgroup) { + nthreads = assignTableToThread(database, stbInfo); + if(nthreads == 0) { + errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName); + return -1; + } + } else { + if(nthreads == 0) { + errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName); + return -1; + } + div = ntables / nthreads; + if (div < 1) { + nthreads = (int32_t)ntables; + div = 1; + } + mod = ntables % nthreads; + } + + + // init each thread information + pthread_t *pids = benchCalloc(1, nthreads * sizeof(pthread_t), true); + threadInfo *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true); + + // init + int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod); + if( ret != 0) { + errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName); + tmfree(pids); + tmfree(infos); + return ret; + } + + infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576); + prompt(0); + + + // run + int64_t start = toolsGetTimestampUs(); + if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) { + // need many batch execute all threads + ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids); + } else { + // only one batch execute all threads + ret = runInsertThread(database, stbInfo, nthreads, infos, pids); + } + + int64_t end = toolsGetTimestampUs(); + if(end == start) { + spend = 1; + } else { + spend = end - start; + } + + // exit + ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend); + return ret; +} + static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) { int rows = 0; char command[SHORT_1K_SQL_BUFF_LEN]; @@ -4011,7 +4108,6 @@ int insertTestProcess() { succPrint("created database (%s)\n", database->dbName); } else { // database already exist, get vgroups from server - #ifdef TD_VER_COMPATIBLE_3_0_0_0 if (database->superTbls) { SBenchConn* conn = initBenchConn(); if (conn) { @@ -4025,7 +4121,6 @@ int insertTestProcess() { succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups); } } - #endif // TD_VER_COMPATIBLE_3_0_0_0 } } @@ -4079,7 +4174,7 @@ int insertTestProcess() { } } - // create threads + // tsma if (g_arguments->taosc_version == 3) { for (int i = 0; i < g_arguments->databases->size; i++) { SDataBase* database = benchArrayGet(g_arguments->databases, i); diff --git a/src/benchInsertMix.c b/src/benchInsertMix.c index 3e62143e..a137f90a 100644 --- a/src/benchInsertMix.c +++ b/src/benchInsertMix.c @@ -317,7 +317,7 @@ uint32_t genRowMixAll(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t l } } - size += dataGenByField(fd, pstr, len + size, prefix, k); + size += dataGenByField(fd, pstr, len + size, prefix, k, VAL_NULL); } // end @@ -766,7 +766,6 @@ bool checkCorrect(threadInfo* info, SDataBase* db, SSuperTable* stb, char* tbNam // bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) { int64_t lastPrintTime = 0; - // check interface if (stb->iface != TAOSC_IFACE) { return false; @@ -777,6 +776,8 @@ bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) { return false; } + infoPrint("insert mode is mix. generate_row_rule=%d\n", stb->genRowRule); + FILE* csvFile = NULL; char* tagData = NULL; bool acreate = (stb->genRowRule == RULE_OLD || stb->genRowRule == RULE_MIX_RANDOM) && stb->autoTblCreating; @@ -797,7 +798,14 @@ bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) { // loop insert child tables for (uint64_t tbIdx = info->start_table_from; tbIdx <= info->end_table_to; ++tbIdx) { - char* tbName = stb->childTblArray[tbIdx]->name; + // get child table + SChildTable *childTbl; + if (g_arguments->bind_vgroup) { + childTbl = info->vg->childTblArray[tbIdx]; + } else { + childTbl = stb->childTblArray[tbIdx]; + } + char* tbName = childTbl->name; SMixRatio mixRatio; mixRatioInit(&mixRatio, stb); @@ -843,7 +851,7 @@ bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) { int64_t startTs = toolsGetTimestampUs(); //g_arguments->debug_print = false; - if(execInsert(info, batchRows) != 0) { + if(execInsert(info, batchRows, NULL) != 0) { FAILED_BREAK() } //g_arguments->debug_print = true; @@ -893,7 +901,7 @@ bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) { batTotal.delRows = genBatchDelSql(stb, &mixRatio, batStartTime, info->conn->taos, tbName, info->buffer, len, querySql); if (batTotal.delRows > 0) { // g_arguments->debug_print = false; - if (execInsert(info, batTotal.delRows) != 0) { + if (execInsert(info, batTotal.delRows, NULL) != 0) { FAILED_BREAK() } diff --git a/src/benchJsonOpt.c b/src/benchJsonOpt.c index f458b663..162db5a2 100644 --- a/src/benchJsonOpt.c +++ b/src/benchJsonOpt.c @@ -11,6 +11,7 @@ */ #include +#include #include extern char g_configDir[MAX_PATH_LEN]; @@ -304,7 +305,8 @@ static int getColumnAndTagTypeFromInsertJsonFile( } else { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_JSON - || type == TSDB_DATA_TYPE_NCHAR) { + || type == TSDB_DATA_TYPE_NCHAR + || type == TSDB_DATA_TYPE_GEOMETRY) { length = g_arguments->binwidth; } else { length = convertTypeToLength(type); @@ -456,6 +458,8 @@ static int getColumnAndTagTypeFromInsertJsonFile( } else { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_JSON + || type == TSDB_DATA_TYPE_VARBINARY + || type == TSDB_DATA_TYPE_GEOMETRY || type == TSDB_DATA_TYPE_NCHAR) { length = g_arguments->binwidth; } else { @@ -1443,6 +1447,27 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } } + g_arguments->csvPath[0] = 0; + tools_cJSON *csv = tools_cJSON_GetObjectItem(json, "csvPath"); + if (csv && (csv->type == tools_cJSON_String) + && (csv->valuestring != NULL)) { + tstrncpy(g_arguments->csvPath, csv->valuestring, MAX_FILE_NAME_LEN); + } + + size_t len = strlen(g_arguments->csvPath); + + if(len == 0) { + // set default with current path + strcpy(g_arguments->csvPath, "./output/"); + mkdir(g_arguments->csvPath, 0775); + } else { + // append end + if (g_arguments->csvPath[len-1] != '/' ) { + strcat(g_arguments->csvPath, "/"); + } + mkdir(g_arguments->csvPath, 0775); + } + code = 0; return code; } @@ -1485,6 +1510,13 @@ static int getMetaFromInsertJsonFile(tools_cJSON *json) { g_arguments->nthreads = (uint32_t)threads->valueint; } + tools_cJSON *bindVGroup = tools_cJSON_GetObjectItem(json, "thread_bind_vgroup"); + if (tools_cJSON_IsString(bindVGroup)) { + if (0 == strcasecmp(bindVGroup->valuestring, "yes")) { + g_arguments->bind_vgroup = true; + } + } + tools_cJSON *keepTrying = tools_cJSON_GetObjectItem(json, "keep_trying"); if (keepTrying && keepTrying->type == tools_cJSON_Number) { g_arguments->keep_trying = (int32_t)keepTrying->valueint; @@ -2096,7 +2128,7 @@ static int getMetaFromTmqJsonFile(tools_cJSON *json) { if (tools_cJSON_IsString(groupMode)) { g_tmqInfo.consumerInfo.groupMode = groupMode->valuestring; } - + tools_cJSON *pollDelay = tools_cJSON_GetObjectItem(tmqInfo, "poll_delay"); if (tools_cJSON_IsNumber(pollDelay)) { @@ -2248,6 +2280,8 @@ int getInfoFromJsonFile() { g_arguments->test_mode = QUERY_TEST; } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { g_arguments->test_mode = SUBSCRIBE_TEST; + } else if (0 == strcasecmp("csvfile", filetype->valuestring)) { + g_arguments->test_mode = CSVFILE_TEST; } else { errorPrint("%s", "failed to read json, filetype not support\n"); @@ -2259,7 +2293,7 @@ int getInfoFromJsonFile() { // read common item code = getMetaFromCommonJsonFile(root); - if (INSERT_TEST == g_arguments->test_mode) { + if (INSERT_TEST == g_arguments->test_mode || CSVFILE_TEST == g_arguments->test_mode) { code = getMetaFromInsertJsonFile(root); #ifdef TD_VER_COMPATIBLE_3_0_0_0 } else if (QUERY_TEST == g_arguments->test_mode) { diff --git a/src/benchMain.c b/src/benchMain.c index 82e45ddb..04b21191 100644 --- a/src/benchMain.c +++ b/src/benchMain.c @@ -11,6 +11,7 @@ */ #include +#include #include SArguments* g_arguments; @@ -86,7 +87,6 @@ int main(int argc, char* argv[]) { if (dsn != NULL) { g_arguments->dsn = dsn; g_arguments->websocket = true; - g_arguments->nthreads_auto = false; } else { g_arguments->dsn = false; } @@ -117,6 +117,11 @@ int main(int argc, char* argv[]) { errorPrint("%s", "insert test process failed\n"); ret = -1; } + } else if (g_arguments->test_mode == CSVFILE_TEST) { + if (csvTestProcess()) { + errorPrint("%s", "query test process failed\n"); + ret = -1; + } } else if (g_arguments->test_mode == QUERY_TEST) { if (queryTestProcess(g_arguments)) { errorPrint("%s", "query test process failed\n"); diff --git a/src/benchSys.c b/src/benchSys.c index 59c9b4e1..d848155a 100644 --- a/src/benchSys.c +++ b/src/benchSys.c @@ -254,13 +254,11 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { case 'f': g_arguments->demo_mode = false; g_arguments->metaFile = arg; - g_arguments->nthreads_auto = false; break; case 'h': g_arguments->host = arg; g_arguments->host_auto = false; - g_arguments->nthreads_auto = false; break; case 'P': @@ -290,7 +288,6 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { stbInfo->iface = STMT_IFACE; } else if (0 == strcasecmp(arg, "rest")) { stbInfo->iface = REST_IFACE; - g_arguments->nthreads_auto = false; if (false == g_arguments->port_inputted) { g_arguments->port = DEFAULT_REST_PORT; } @@ -311,19 +308,15 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { || (0 == strcasecmp(arg, "sml-rest-line"))) { stbInfo->iface = SML_REST_IFACE; stbInfo->lineProtocol = TSDB_SML_LINE_PROTOCOL; - g_arguments->nthreads_auto = false; } else if (0 == strcasecmp(arg, "sml-rest-telnet")) { stbInfo->iface = SML_REST_IFACE; stbInfo->lineProtocol = TSDB_SML_TELNET_PROTOCOL; - g_arguments->nthreads_auto = false; } else if (0 == strcasecmp(arg, "sml-rest-json")) { stbInfo->iface = SML_REST_IFACE; stbInfo->lineProtocol = TSDB_SML_JSON_PROTOCOL; - g_arguments->nthreads_auto = false; } else if (0 == strcasecmp(arg, "sml-rest-taosjson")) { stbInfo->iface = SML_REST_IFACE; stbInfo->lineProtocol = SML_JSON_TAOS_FORMAT; - g_arguments->nthreads_auto = false; } else { errorPrint( "Invalid -I: %s, will auto set to default (taosc)\n", @@ -361,8 +354,6 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { "Invalid -T: %s, will auto set to default(8)\n", arg); g_arguments->nthreads = DEFAULT_NTHREADS; - } else { - g_arguments->nthreads_auto = false; } break; @@ -448,7 +439,6 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { case 'U': g_arguments->supplementInsert = true; - g_arguments->nthreads_auto = false; break; case 't': @@ -652,7 +642,6 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { #ifdef WEBSOCKET case 'W': - g_arguments->nthreads_auto = false; g_arguments->dsn = arg; break; @@ -669,7 +658,6 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) { if (!toolsIsStringNumber(arg)) { errorPrintReqArg2(CUS_PROMPT"Benchmark", "v"); } - g_arguments->nthreads_auto = false; g_arguments->inputted_vgroups = atoi(arg); break; #endif diff --git a/src/benchUtil.c b/src/benchUtil.c index 5ebe0913..e860ef55 100644 --- a/src/benchUtil.c +++ b/src/benchUtil.c @@ -341,7 +341,7 @@ void closeBenchConn(SBenchConn* conn) { if(conn->taos) { taos_close(conn->taos); conn->taos = NULL; - } + } if (conn->ctaos) { taos_close(conn->ctaos); conn->ctaos = NULL; @@ -870,6 +870,8 @@ char *convertDatatypeToString(int type) { return "double"; case TSDB_DATA_TYPE_JSON: return "json"; + case TSDB_DATA_TYPE_GEOMETRY: + return "geometry"; default: break; } @@ -1002,6 +1004,10 @@ int convertStringToDatatype(char *type, int length) { return TSDB_DATA_TYPE_JSON; } else if (0 == strcasecmp(type, "varchar")) { return TSDB_DATA_TYPE_BINARY; + } else if (0 == strcasecmp(type, "varbinary")) { + return TSDB_DATA_TYPE_VARBINARY; + } else if (0 == strcasecmp(type, "geometry")) { + return TSDB_DATA_TYPE_GEOMETRY; } else { errorPrint("unknown data type: %s\n", type); exit(EXIT_FAILURE); @@ -1039,6 +1045,10 @@ int convertStringToDatatype(char *type, int length) { return TSDB_DATA_TYPE_JSON; } else if (0 == strncasecmp(type, "varchar", length)) { return TSDB_DATA_TYPE_BINARY; + } else if (0 == strncasecmp(type, "varbinary", length)) { + return TSDB_DATA_TYPE_VARBINARY; + } else if (0 == strncasecmp(type, "geometry", length)) { + return TSDB_DATA_TYPE_GEOMETRY; } else { errorPrint("unknown data type: %s\n", type); exit(EXIT_FAILURE); @@ -1046,6 +1056,7 @@ int convertStringToDatatype(char *type, int length) { } } + int compare(const void *a, const void *b) { return *(int64_t *)a - *(int64_t *)b; } @@ -1095,7 +1106,7 @@ void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems) { void* dst = BARRAY_GET_ELEM(pArray, pArray->size); memcpy(dst, pData, pArray->elemSize * elems); - tmfree(pData); + tmfree(pData); // TODO remove this pArray->size += elems; return dst; } @@ -1221,7 +1232,7 @@ void destroySockFd(int sockfd) { if (sockfd < 0) { return; } - + // shutdown the connection since no more data will be sent int result; result = shutdown(sockfd, SHUT_WR); diff --git a/src/taosdump.c b/src/taosdump.c index 3c14c405..f0c92ac5 100644 --- a/src/taosdump.c +++ b/src/taosdump.c @@ -656,8 +656,6 @@ static uint64_t getUniqueIDFromEpoch() { return id; } -// libtaos.so -extern char buildinfo[]; static void printVersion(FILE *file) { char taostools_longver[] = TAOSDUMP_TAG; @@ -670,9 +668,7 @@ static void printVersion(FILE *file) { char taosdump_commit[] = TAOSDUMP_COMMIT_SHA1; fprintf(file,"taosdump version: %s\ngit: %s\n", taostools_ver, taosdump_commit); -#ifdef LINUX - printf("build: %s\n ", buildinfo); -#endif + printf("build: %s\n", getBuildInfo()); if (strlen(taosdump_status) > 0) { fprintf(file, "status:%s\n", taosdump_status); } diff --git a/tests/taosbenchmark/commandline-sml.py b/tests/taosbenchmark/commandline-sml.py index 436720a4..aebe88cb 100644 --- a/tests/taosbenchmark/commandline-sml.py +++ b/tests/taosbenchmark/commandline-sml.py @@ -88,9 +88,9 @@ def run(self): tdSql.query("select count(*) from test.meters") tdSql.checkData(0, 0, 1) - cmd = "%s -N -I sml -y" % binPath + cmd = "%s -I sml -t 10 -n 10000 -y" % binPath tdLog.info("%s" % cmd) - assert os.system("%s" % cmd) != 0 + tdSql.checkData(0, 0, 10*10000) def stop(self): tdSql.close() diff --git a/tests/taosbenchmark/commandline.py b/tests/taosbenchmark/commandline.py index 7e03b0c9..fe7b5cae 100644 --- a/tests/taosbenchmark/commandline.py +++ b/tests/taosbenchmark/commandline.py @@ -338,7 +338,7 @@ def run(self): tdLog.info("%s" % cmd) assert os.system("%s" % cmd) != 0 - cmd = "%s -n 1 -t 1 -y -A int,json" % binPath + cmd = "%s -n 1 -t 1 -y -A json" % binPath tdLog.info("%s" % cmd) assert os.system("%s" % cmd) != 0 diff --git a/tests/taosbenchmark/sml_telnet_alltypes.py b/tests/taosbenchmark/sml_telnet_alltypes.py index 2536763c..cea2fa1f 100644 --- a/tests/taosbenchmark/sml_telnet_alltypes.py +++ b/tests/taosbenchmark/sml_telnet_alltypes.py @@ -93,7 +93,7 @@ def run(self): if major_ver == "3": tdSql.checkData(1, 1, "VARCHAR") tdSql.checkData( - 1, 2, 16 + 1, 2, 8 ) # 3.0 will use a bit more space for schemaless create table else: tdSql.checkData(1, 1, "BINARY") @@ -102,7 +102,7 @@ def run(self): tdSql.query("describe db.stb13") tdSql.checkData(1, 1, "NCHAR") if major_ver == "3": - tdSql.checkData(1, 2, 16) + tdSql.checkData(1, 2, 8) else: tdSql.checkData(1, 2, 8)