Skip to content

Commit

Permalink
fix: limit thread is finish
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed May 31, 2024
1 parent dc8f52e commit 24dbb78
Showing 1 changed file with 73 additions and 21 deletions.
94 changes: 73 additions & 21 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -3556,45 +3556,88 @@ int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthr
return ret;
}

#define EMPTY_SLOT -1
// run with limit thread
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids, int64_t *spend) {
infoPrint("run with limit insert thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);

// slots save threadInfo array index
int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false);
int32_t t = 0; // thread index
for (int32_t i = 0; i < limitThread; i++) {
slot[i] = EMPTY_SLOT;
}

while (!g_arguments->terminate) {
int32_t emptySlot = 0;
for (int32_t i = 0; i < limitThread; i++) {
int32_t idx = slot[i];
// check slot thread end
if(idx != EMPTY_SLOT) {
if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
// thread is running
toolsMsleep(2000);
} else {
// thread is end , set slot empty
infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
slot[i] = EMPTY_SLOT;
}
}

if (slot[i] == EMPTY_SLOT && t < nthreads) {
// slot is empty , set new thread to running
threadInfo *pThreadInfo = infos + t;
if (stbInfo->interlaceRows > 0) {
pthread_create(pids + t, NULL, syncWriteInterlace, pThreadInfo);
} else {
pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
}

// save current and move next
slot[i] = t;
t++;
infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
}

// check slot empty
if(slot[i] == EMPTY_SLOT) {
emptySlot++;
}
}

// check all thread end
if(emptySlot == limitThread) {
debugPrint("all threads(%d) is run finished.\n", nthreads);
break;
} else {
debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
}
}

return 0;
}

// run
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t *spend) {
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
infoPrint("run insert thread. real nthread=%d\n", nthreads);
// create threads
int threadCnt = 0;
for (int i = 0; (i < nthreads && !g_arguments->terminate); i++) {
for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
threadInfo *pThreadInfo = infos + i;
if (stbInfo->interlaceRows > 0) {
pthread_create(pids + i, NULL,
syncWriteInterlace, pThreadInfo);
pthread_create(pids + i, NULL, syncWriteInterlace, pThreadInfo);
} else {
pthread_create(pids + i, NULL,
syncWriteProgressive, pThreadInfo);
pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
}
threadCnt ++;
}

int64_t start = toolsGetTimestampUs();
}

// wait threads
for (int i = 0; i < threadCnt; i++) {
infoPrint(" pthread_join %d ...\n", i);
pthread_join(pids[i], NULL);
}

// thread end
int64_t end = toolsGetTimestampUs();
if(end == start) {
*spend = 1;
} else {
*spend = end - start;
}

return 0;
}

Expand Down Expand Up @@ -3793,13 +3836,22 @@ static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo)
infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
prompt(0);


// run
int64_t start = toolsGetTimestampUs();
if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
// need many batch execute all threads
ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids, &spend);
ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
} else {
// only one batch execute all threads
ret = runInsertThread(database, stbInfo, nthreads, infos, pids, &spend);
ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
}

int64_t end = toolsGetTimestampUs();
if(end == start) {
spend = 1;
} else {
spend = end - start;
}

// exit
Expand Down

0 comments on commit 24dbb78

Please sign in to comment.