Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stmt delay1 delay2 delay3 feature #741

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,9 @@ typedef struct SThreadInfo_S {
uint64_t totalInsertRows;
uint64_t totalQueried;
int64_t totalDelay;
int64_t totalDelay1;
int64_t totalDelay2;
int64_t totalDelay3;
uint64_t querySeq;
TAOS_SUB *tsub;
char ** lines;
Expand Down
3 changes: 2 additions & 1 deletion inc/benchData.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf,
bool tag, BArray *childCols);
int prepareStmt(SSuperTable *stbInfo, TAOS_STMT *stmt, char* tagData, uint64_t tableSeq);
uint32_t bindParamBatch(threadInfo *pThreadInfo,
uint32_t batch, int64_t startTime, SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n);
uint32_t batch, int64_t startTime,
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3);
int prepareSampleData(SDataBase* database, SSuperTable* stbInfo);
void generateSmlJsonTags(tools_cJSON *tagsList,
char **sml_tags_json_array,
Expand Down
6 changes: 5 additions & 1 deletion src/benchData.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,

uint32_t bindParamBatch(threadInfo *pThreadInfo,
uint32_t batch, int64_t startTime,
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n) {
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3) {
TAOS_STMT *stmt = pThreadInfo->conn->stmt;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
uint32_t columnCount = stbInfo->cols->size;
Expand Down Expand Up @@ -1858,12 +1858,14 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo,
}
}

int64_t start = toolsGetTimestampUs();
if (taos_stmt_bind_param_batch(
stmt, (TAOS_MULTI_BIND *)pThreadInfo->bindParams)) {
errorPrint("taos_stmt_bind_param_batch() failed! reason: %s\n",
taos_stmt_errstr(stmt));
return 0;
}
*delay2 += toolsGetTimestampUs() - start;

for (int c = 0; c < stbInfo->cols->size + 1; c++) {
TAOS_MULTI_BIND *param =
Expand All @@ -1873,11 +1875,13 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo,
}

// if msg > 3MB, break
start = toolsGetTimestampUs();
if (taos_stmt_add_batch(stmt)) {
errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
taos_stmt_errstr(stmt));
return 0;
}
*delay3 += toolsGetTimestampUs() - start;
return batch;
}

Expand Down
64 changes: 51 additions & 13 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,10 @@ static void *syncWriteInterlace(void *sarg) {
if (stbInfo->autoTblCreating) {
csvFile = openTagCsv(stbInfo);
tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
}
}
int64_t delay1 = 0;
int64_t delay2 = 0;
int64_t delay3 = 0;

while (insertRows > 0) {
int64_t tmp_total_insert_rows = 0;
Expand Down Expand Up @@ -1693,6 +1696,7 @@ static void *syncWriteInterlace(void *sarg) {
snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
tableName);
}
int64_t start = toolsGetTimestampUs();
if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
escapedTbName)) {
errorPrint(
Expand All @@ -1702,9 +1706,12 @@ static void *syncWriteInterlace(void *sarg) {
g_fail = true;
goto free_of_interlace;
}
delay1 += toolsGetTimestampUs() - start;

int32_t n = 0;
generated = bindParamBatch(pThreadInfo, interlaceRows,
childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);
childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);

childTbl->ts += stbInfo->timestamp_step * n;
break;
}
Expand Down Expand Up @@ -1852,7 +1859,8 @@ static void *syncWriteInterlace(void *sarg) {
break;
}

int64_t delay = endTs - startTs;
int64_t delay4 = endTs - startTs;
int64_t delay = delay1 + delay2 + delay3 + delay4;
if (delay <=0) {
debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
pThreadInfo->threadID, startTs, endTs);
Expand All @@ -1866,7 +1874,11 @@ static void *syncWriteInterlace(void *sarg) {
tmfree(pdelay);
}
pThreadInfo->totalDelay += delay;
pThreadInfo->totalDelay1 += delay1;
pThreadInfo->totalDelay2 += delay2;
pThreadInfo->totalDelay3 += delay3;
}
delay1 = delay2 = delay3 = 0;

int64_t currentPrintTime = toolsGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
Expand All @@ -1876,7 +1888,7 @@ static void *syncWriteInterlace(void *sarg) {
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
lastPrintTime = currentPrintTime;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
}
}
free_of_interlace:
Expand All @@ -1891,7 +1903,7 @@ static void *syncWriteInterlace(void *sarg) {
static int32_t prepareProgressDataStmt(
threadInfo *pThreadInfo,
SChildTable *childTbl,
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
SSuperTable *stbInfo = pThreadInfo->stbInfo;
char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
if (g_arguments->escape_character) {
Expand All @@ -1901,6 +1913,7 @@ static int32_t prepareProgressDataStmt(
snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
childTbl->name);
}
int64_t start = toolsGetTimestampUs();
if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
escapedTbName)) {
errorPrint(
Expand All @@ -1909,13 +1922,14 @@ static int32_t prepareProgressDataStmt(
taos_stmt_errstr(pThreadInfo->conn->stmt));
return -1;
}
*delay1 = toolsGetTimestampUs() - start;
int32_t n =0;
int32_t generated = bindParamBatch(
pThreadInfo,
(g_arguments->reqPerReq > (stbInfo->insertRows - i))
? (stbInfo->insertRows - i)
: g_arguments->reqPerReq,
*timestamp, childTbl, pkCur, pkCnt, &n);
*timestamp, childTbl, pkCur, pkCnt, &n, delay2, delay3);
*timestamp += n * stbInfo->timestamp_step;
return generated;
}
Expand Down Expand Up @@ -2398,6 +2412,9 @@ void *syncWriteProgressive(void *sarg) {
int32_t pos = 0;
int32_t pkCur = 0; // record generate same timestamp current count
int32_t pkCnt = 0; // record generate same timestamp count
int64_t delay1 = 0;
int64_t delay2 = 0;
int64_t delay3 = 0;
if (stmt) {
taos_stmt_close(pThreadInfo->conn->stmt);
pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos);
Expand Down Expand Up @@ -2457,7 +2474,7 @@ void *syncWriteProgressive(void *sarg) {
case STMT_IFACE: {
generated = prepareProgressDataStmt(
pThreadInfo,
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt);
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
break;
}
case SML_REST_IFACE:
Expand Down Expand Up @@ -2597,7 +2614,8 @@ void *syncWriteProgressive(void *sarg) {
break;
}

int64_t delay = endTs - startTs;
int64_t delay4 = endTs - startTs;
int64_t delay = delay1 + delay2 + delay3 + delay4;
if (delay <= 0) {
debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
pThreadInfo->threadID, startTs, endTs);
Expand All @@ -2611,7 +2629,11 @@ void *syncWriteProgressive(void *sarg) {
tmfree(pDelay);
}
pThreadInfo->totalDelay += delay;
pThreadInfo->totalDelay1 += delay1;
pThreadInfo->totalDelay2 += delay2;
pThreadInfo->totalDelay3 += delay3;
}
delay1 = delay2 = delay3 = 0;

int64_t currentPrintTime = toolsGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
Expand Down Expand Up @@ -2991,6 +3013,9 @@ static void preProcessArgument(SSuperTable *stbInfo) {

static int printTotalDelay(SDataBase *database,
int64_t totalDelay,
int64_t totalDelay1,
int64_t totalDelay2,
int64_t totalDelay3,
BArray *total_delay_list,
int threads,
int64_t totalInsertRows,
Expand All @@ -3000,9 +3025,17 @@ static int printTotalDelay(SDataBase *database,
return -1;
}

succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
char subDelay[128] = "";
if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
sprintf(subDelay, "delay1=%.2f delay2=%.2f delay3=%.2f",
totalDelay1/threads/1E6,
totalDelay2/threads/1E6,
totalDelay3/threads/1E6);
}

succPrint("Spent %.6f ( real %.6f %s) seconds to insert rows: %" PRIu64
" with %d thread(s) into %s %.2f (real %.2f) records/second\n",
(end - start)/1E6, totalDelay/threads/1E6, totalInsertRows, threads,
(end - start)/1E6, totalDelay/threads/1E6, subDelay, totalInsertRows, threads,
database->dbName,
(double)(totalInsertRows / ((end - start)/1E6)),
(double)(totalInsertRows / (totalDelay/threads/1E6)));
Expand Down Expand Up @@ -3591,6 +3624,9 @@ static int startMultiThreadInsertData(SDataBase* database,

BArray * total_delay_list = benchArrayInit(1, sizeof(int64_t));
int64_t totalDelay = 0;
int64_t totalDelay1 = 0;
int64_t totalDelay2 = 0;
int64_t totalDelay3 = 0;
uint64_t totalInsertRows = 0;

// free threads resource
Expand Down Expand Up @@ -3682,6 +3718,9 @@ static int startMultiThreadInsertData(SDataBase* database,
}
totalInsertRows += pThreadInfo->totalInsertRows;
totalDelay += pThreadInfo->totalDelay;
totalDelay1 += pThreadInfo->totalDelay1;
totalDelay2 += pThreadInfo->totalDelay2;
totalDelay3 += pThreadInfo->totalDelay3;
benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
pThreadInfo->delayList->size);
tmfree(pThreadInfo->delayList);
Expand All @@ -3702,9 +3741,8 @@ static int startMultiThreadInsertData(SDataBase* database,
free(pids);
free(infos);

int ret = printTotalDelay(database, totalDelay,
total_delay_list, threads,
totalInsertRows, start, end);
int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
total_delay_list, threads, totalInsertRows, start, end);
benchArrayDestroy(total_delay_list);
if (g_fail || ret) {
return -1;
Expand Down
14 changes: 10 additions & 4 deletions src/benchMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ int main(int argc, char* argv[]) {
modifyArgument();
}

g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a");
if (NULL == g_arguments->fpOfInsertResult) {
errorPrint("failed to open %s for save result\n",
g_arguments->output_file);
if(g_arguments->output_file[0] == 0) {
infoPrint("%s","result_file is empty, ignore output.");
g_arguments->fpOfInsertResult = NULL;
} else {
g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a");
if (NULL == g_arguments->fpOfInsertResult) {
errorPrint("failed to open %s for save result\n",
g_arguments->output_file);
}
}

infoPrint("client version: %s\n", taos_get_client_info());

if (g_arguments->test_mode == INSERT_TEST) {
Expand Down
Loading