Skip to content

Commit

Permalink
Merge pull request #659 from taosdata/fix/TD-24332
Browse files Browse the repository at this point in the history
query core when concurrent threads
  • Loading branch information
plum-lihui committed May 24, 2023
2 parents d39a301 + 3c11f3a commit 47eba90
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 122 deletions.
291 changes: 170 additions & 121 deletions src/benchQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ static void *specifiedTableQuery(void *sarg) {
}

while (index < queryTimes) {
if (g_queryInfo.specifiedQueryInfo.queryInterval
&& (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval*1000) {
// check cancel
if (g_arguments->terminate) {
return NULL;
}

if (g_queryInfo.specifiedQueryInfo.queryInterval &&
(et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
toolsMsleep((int32_t)(
g_queryInfo.specifiedQueryInfo.queryInterval*1000
- (et - st))); // ms
Expand Down Expand Up @@ -392,144 +396,187 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
return ret;
}

// free g_queryInfo.specailQueryInfo memory , can re-call
void freeSpecialQueryInfo() {
// can re-call
if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
return;
}

// loop free each item memory
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
tmfree(sql->command);
tmfree(sql->delay_list);
}

// free Array
benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
g_queryInfo.specifiedQueryInfo.sqls = NULL;
}


static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
pthread_t * pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
if ((nSqlCount > 0) && (nConcurrent > 0)) {
pids = benchCalloc(1, nConcurrent*nSqlCount*sizeof(pthread_t), false);
infos = benchCalloc(1, nConcurrent*nSqlCount*sizeof(threadInfo), false);
for (uint64_t i = 0; i < nSqlCount; i++) {
SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = (int)seq;
pThreadInfo->querySeq = i;
if (iface == REST_IFACE) {
int sockfd = createSockFd();
// int iMode = 1;
// ioctl(sockfd, FIONBIO, &iMode);
if (sockfd < 0) {
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}
pThreadInfo->sockfd = sockfd;
} else {
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
destroySockFd(pThreadInfo->sockfd);
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}

// check invaid
if(nSqlCount == 0 || nConcurrent == 0 ) {
if(nSqlCount == 0)
infoPrint(" query sql count is %" PRIu64 ". must set query sqls. \n", nSqlCount);
if(nConcurrent == 0)
infoPrint(" concurrent is %d , specified_table_query->concurrent must not zero. \n", nConcurrent);
return 0;
}

// malloc funciton global memory
pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false);
infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);

bool exeError = false;
for (uint64_t i = 0; i < nSqlCount; i++) {
// reset
memset(pids, 0, nConcurrent * sizeof(pthread_t));
memset(infos, 0, nConcurrent * sizeof(threadInfo));

// get execute sql
SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);

// create threads
for (int j = 0; j < nConcurrent; j++) {
threadInfo *pThreadInfo = infos + j;
pThreadInfo->threadID = i * nConcurrent + j;
pThreadInfo->querySeq = i;
if (iface == REST_IFACE) {
int sockfd = createSockFd();
// int iMode = 1;
// ioctl(sockfd, FIONBIO, &iMode);
if (sockfd < 0) {
exeError = true;

break;
}
pThreadInfo->sockfd = sockfd;
} else {
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
destroySockFd(pThreadInfo->sockfd);
exeError = true;
break;
}
}

pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
}
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
pthread_join(pids[seq], NULL);
threadInfo *pThreadInfo = infos + seq;
if (iface == REST_IFACE) {
pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
}

// if failed, set termainte flag true like ctrl+c exit
if (exeError) {
errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
g_arguments->terminate = true;
}

// wait threads execute finished one by one
for (int j = 0; (j < nConcurrent && pids[j] > 0) ; j++) {
pthread_join(pids[j], NULL);
threadInfo *pThreadInfo = infos + j;
if (iface == REST_IFACE) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
close(pThreadInfo->sockfd);
#endif
} else {
closeBenchConn(pThreadInfo->conn);
}
if (g_fail) {
tmfree(pThreadInfo->query_delay_list);
}
}
} else {
closeBenchConn(pThreadInfo->conn);
pThreadInfo->conn = NULL;
}

if (g_fail) {
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}
uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueryTimes = query_times * nConcurrent;
double avg_delay = 0.0;
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
avg_delay += pThreadInfo->avg_delay;
for (uint64_t k = 0;
k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
sql->delay_list[j*query_times + k] =
pThreadInfo->query_delay_list[k];
}
// need exit in loop
if (g_fail || g_arguments->terminate) {
// free BArray
tmfree(pThreadInfo->query_delay_list);
}
avg_delay /= nConcurrent;
qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes,
sizeof(uint64_t), compare);
infoPrintNoTimestamp("complete query with %d threads and %"PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times,
avg_delay/1E6, /* avg */
sql->delay_list[0]/1E6, /* min */
sql->delay_list[totalQueryTimes - 1]/1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
sql->command);
infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
"complete query with %d threads and %"PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times,
avg_delay/1E6, /* avg */
sql->delay_list[0]/1E6, /* min */
sql->delay_list[totalQueryTimes - 1]/1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
sql->command);
pThreadInfo->query_delay_list = NULL;
}
}
} else {
return 0;

// cancel or need exit check
if (g_fail || g_arguments->terminate) {
// free current funciton malloc memory
tmfree((char *)pids);
tmfree((char *)infos);
// free global
freeSpecialQueryInfo();
return -1;
}

// execute successfully
uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueryTimes = query_times * nConcurrent;
double avg_delay = 0.0;
for (int j = 0; j < nConcurrent; j++) {
threadInfo *pThreadInfo = infos + j;
avg_delay += pThreadInfo->avg_delay;
for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
}

// free BArray
tmfree(pThreadInfo->query_delay_list);
pThreadInfo->query_delay_list = NULL;
}
avg_delay /= nConcurrent;
qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times, avg_delay / 1E6, /* avg */
sql->delay_list[0] / 1E6, /* min */
sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
"complete query with %d threads and %" PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times, avg_delay / 1E6, /* avg */
sql->delay_list[0] / 1E6, /* min */
sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
}

g_queryInfo.specifiedQueryInfo.totalQueried =
g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
tmfree((char *)pids);
tmfree((char *)infos);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
tmfree(sql->command);
tmfree(sql->delay_list);
}
benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);

// free specialQueryInfo
freeSpecialQueryInfo();
return 0;
}

Expand Down Expand Up @@ -587,6 +634,8 @@ static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
pThreadInfo->query_delay_list->size);
total_delay += pThreadInfo->total_delay;
tmfree(pThreadInfo->query_delay_list);
pThreadInfo->query_delay_list = NULL;

if (iface == REST_IFACE) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
Expand Down
12 changes: 11 additions & 1 deletion src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,17 @@ SBenchConn* initBenchConn() {
}

void closeBenchConn(SBenchConn* conn) {
if(conn == NULL)
return ;
#ifdef WEBSOCKET
if (g_arguments->websocket) {
ws_close(conn->taos_ws);
} else {
#endif
taos_close(conn->taos);
if(conn->taos) {
taos_close(conn->taos);
conn->taos = NULL;
}
if (conn->ctaos) {
taos_close(conn->ctaos);
conn->ctaos = NULL;
Expand Down Expand Up @@ -1166,6 +1171,11 @@ static void closeSockFd(int sockfd) {
}

void destroySockFd(int sockfd) {
// check valid
if (sockfd < 0) {
return;
}

// shutdown the connection since no more data will be sent
int result;
result = shutdown(sockfd, SHUT_WR);
Expand Down

0 comments on commit 47eba90

Please sign in to comment.