Skip to content

Commit

Permalink
fix: with limit thread frame build ok
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed May 31, 2024
1 parent b105aa3 commit dc8f52e
Showing 1 changed file with 52 additions and 22 deletions.
74 changes: 52 additions & 22 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -3070,9 +3070,9 @@ static int printTotalDelay(SDataBase *database,
BArray *total_delay_list,
int threads,
int64_t totalInsertRows,
int64_t start, int64_t end) {
int64_t spend) {
// zero check
if (total_delay_list->size == 0 || (end - start) == 0 || threads == 0) {
if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
return -1;
}

Expand All @@ -3086,9 +3086,9 @@ static int printTotalDelay(SDataBase *database,

succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
" with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
(end - start)/1E6, totalDelay/threads/1E6, totalInsertRows, threads,
spend/1E6, totalDelay/threads/1E6, totalInsertRows, threads,
database->dbName,
(double)(totalInsertRows / ((end - start)/1E6)),
(double)(totalInsertRows / (spend/1E6)),
(double)(totalInsertRows / (totalDelay/threads/1E6)), subDelay);
if (!total_delay_list->size) {
return -1;
Expand Down Expand Up @@ -3556,9 +3556,29 @@ int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthr
return ret;
}

// run with limit thread
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids, int64_t *spend) {
infoPrint("run with limit insert thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
return 0;
}

// run
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t *spend) {
infoPrint("run insert thread. real nthread=%d\n", nthreads);
// create threads
int threadCnt = 0;
for (int i = 0; (i < nthreads && !g_arguments->terminate); i++) {
threadInfo *pThreadInfo = infos + i;
if (stbInfo->interlaceRows > 0) {
pthread_create(pids + i, NULL,
syncWriteInterlace, pThreadInfo);
} else {
pthread_create(pids + i, NULL,
syncWriteProgressive, pThreadInfo);
}
threadCnt ++;
}

// free resource
int32_t waitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t threadCnt, threadInfo *infos, pthread_t *pids) {
int64_t start = toolsGetTimestampUs();

// wait threads
Expand All @@ -3568,7 +3588,19 @@ int32_t waitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthr
}

// thread end
int64_t end = toolsGetTimestampUs()+1;
int64_t end = toolsGetTimestampUs();
if(end == start) {
*spend = 1;
} else {
*spend = end - start;
}

return 0;
}


// exit and free resource
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {

if (g_arguments->terminate) toolsMsleep(100);

Expand Down Expand Up @@ -3694,7 +3726,7 @@ int32_t waitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthr

// print result
int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
total_delay_list, nthreads, totalInsertRows, start, end);
total_delay_list, nthreads, totalInsertRows, spend);
benchArrayDestroy(total_delay_list);
if (g_fail || ret != 0) {
return -1;
Expand Down Expand Up @@ -3723,6 +3755,7 @@ static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo)
int32_t nthreads = g_arguments->nthreads;
int64_t div = 0; // ntable / nthread division
int64_t mod = 0; // ntable % nthread
int64_t spend = 0;

if (g_arguments->bind_vgroup) {
nthreads = assignTableToThread(database, stbInfo);
Expand All @@ -3747,6 +3780,8 @@ static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo)
// init each thread information
pthread_t *pids = benchCalloc(1, nthreads * sizeof(pthread_t), true);
threadInfo *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);

// init
int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
if( ret != 0) {
errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
Expand All @@ -3758,22 +3793,17 @@ static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo)
infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
prompt(0);

// create threads
int threadCnt = 0;
for (int i = 0; (i < nthreads && !g_arguments->terminate); i++) {
threadInfo *pThreadInfo = infos + i;
if (stbInfo->interlaceRows > 0) {
pthread_create(pids + i, NULL,
syncWriteInterlace, pThreadInfo);
} else {
pthread_create(pids + i, NULL,
syncWriteProgressive, pThreadInfo);
}
threadCnt ++;
// run
if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
// need many batch execute all threads
ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids, &spend);
} else {
// only one batch execute all threads
ret = runInsertThread(database, stbInfo, nthreads, infos, pids, &spend);
}

// wait insert end
ret = waitInsertThread(database, stbInfo, nthreads, threadCnt, infos, pids);
// exit
ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
return ret;
}

Expand Down

0 comments on commit dc8f52e

Please sign in to comment.