Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query core when concurrent threads #659

Merged
merged 8 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 170 additions & 121 deletions src/benchQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ static void *specifiedTableQuery(void *sarg) {
}

while (index < queryTimes) {
if (g_queryInfo.specifiedQueryInfo.queryInterval
&& (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval*1000) {
// check cancel
if (g_arguments->terminate) {
return NULL;
}

if (g_queryInfo.specifiedQueryInfo.queryInterval &&
(et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
toolsMsleep((int32_t)(
g_queryInfo.specifiedQueryInfo.queryInterval*1000
- (et - st))); // ms
Expand Down Expand Up @@ -392,144 +396,187 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
return ret;
}

// free g_queryInfo.specailQueryInfo memory , can re-call
void freeSpecialQueryInfo() {
// can re-call
if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
return;
}

// loop free each item memory
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
tmfree(sql->command);
tmfree(sql->delay_list);
}

// free Array
benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
g_queryInfo.specifiedQueryInfo.sqls = NULL;
}


static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
pthread_t * pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
if ((nSqlCount > 0) && (nConcurrent > 0)) {
pids = benchCalloc(1, nConcurrent*nSqlCount*sizeof(pthread_t), false);
infos = benchCalloc(1, nConcurrent*nSqlCount*sizeof(threadInfo), false);
for (uint64_t i = 0; i < nSqlCount; i++) {
SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = (int)seq;
pThreadInfo->querySeq = i;
if (iface == REST_IFACE) {
int sockfd = createSockFd();
// int iMode = 1;
// ioctl(sockfd, FIONBIO, &iMode);
if (sockfd < 0) {
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}
pThreadInfo->sockfd = sockfd;
} else {
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
destroySockFd(pThreadInfo->sockfd);
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}

// check invaid
if(nSqlCount == 0 || nConcurrent == 0 ) {
if(nSqlCount == 0)
infoPrint(" query sql count is %" PRIu64 ". must set query sqls. \n", nSqlCount);
if(nConcurrent == 0)
infoPrint(" concurrent is %d , specified_table_query->concurrent must not zero. \n", nConcurrent);
return 0;
}

// malloc funciton global memory
pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false);
infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);

bool exeError = false;
for (uint64_t i = 0; i < nSqlCount; i++) {
// reset
memset(pids, 0, nConcurrent * sizeof(pthread_t));
memset(infos, 0, nConcurrent * sizeof(threadInfo));

// get execute sql
SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);

// create threads
for (int j = 0; j < nConcurrent; j++) {
threadInfo *pThreadInfo = infos + j;
pThreadInfo->threadID = i * nConcurrent + j;
pThreadInfo->querySeq = i;
if (iface == REST_IFACE) {
int sockfd = createSockFd();
// int iMode = 1;
// ioctl(sockfd, FIONBIO, &iMode);
if (sockfd < 0) {
exeError = true;

break;
}
pThreadInfo->sockfd = sockfd;
} else {
pThreadInfo->conn = initBenchConn();
if (pThreadInfo->conn == NULL) {
destroySockFd(pThreadInfo->sockfd);
exeError = true;
break;
}
}

pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
}
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
pthread_join(pids[seq], NULL);
threadInfo *pThreadInfo = infos + seq;
if (iface == REST_IFACE) {
pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
}

// if failed, set termainte flag true like ctrl+c exit
if (exeError) {
errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
g_arguments->terminate = true;
}

// wait threads execute finished one by one
for (int j = 0; (j < nConcurrent && pids[j] > 0) ; j++) {
pthread_join(pids[j], NULL);
threadInfo *pThreadInfo = infos + j;
if (iface == REST_IFACE) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
WSACleanup();
closesocket(pThreadInfo->sockfd);
WSACleanup();
#else
close(pThreadInfo->sockfd);
close(pThreadInfo->sockfd);
#endif
} else {
closeBenchConn(pThreadInfo->conn);
}
if (g_fail) {
tmfree(pThreadInfo->query_delay_list);
}
}
} else {
closeBenchConn(pThreadInfo->conn);
pThreadInfo->conn = NULL;
}

if (g_fail) {
tmfree((char *)pids);
tmfree((char *)infos);
return -1;
}
uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueryTimes = query_times * nConcurrent;
double avg_delay = 0.0;
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *pThreadInfo = infos + seq;
avg_delay += pThreadInfo->avg_delay;
for (uint64_t k = 0;
k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
sql->delay_list[j*query_times + k] =
pThreadInfo->query_delay_list[k];
}
// need exit in loop
if (g_fail || g_arguments->terminate) {
// free BArray
tmfree(pThreadInfo->query_delay_list);
}
avg_delay /= nConcurrent;
qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes,
sizeof(uint64_t), compare);
infoPrintNoTimestamp("complete query with %d threads and %"PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times,
avg_delay/1E6, /* avg */
sql->delay_list[0]/1E6, /* min */
sql->delay_list[totalQueryTimes - 1]/1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
sql->command);
infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
"complete query with %d threads and %"PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times,
avg_delay/1E6, /* avg */
sql->delay_list[0]/1E6, /* min */
sql->delay_list[totalQueryTimes - 1]/1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
sql->command);
pThreadInfo->query_delay_list = NULL;
}
}
} else {
return 0;

// cancel or need exit check
if (g_fail || g_arguments->terminate) {
// free current funciton malloc memory
tmfree((char *)pids);
tmfree((char *)infos);
// free global
freeSpecialQueryInfo();
return -1;
}

// execute successfully
uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
uint64_t totalQueryTimes = query_times * nConcurrent;
double avg_delay = 0.0;
for (int j = 0; j < nConcurrent; j++) {
threadInfo *pThreadInfo = infos + j;
avg_delay += pThreadInfo->avg_delay;
for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
}

// free BArray
tmfree(pThreadInfo->query_delay_list);
pThreadInfo->query_delay_list = NULL;
}
avg_delay /= nConcurrent;
qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times, avg_delay / 1E6, /* avg */
sql->delay_list[0] / 1E6, /* min */
sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
"complete query with %d threads and %" PRIu64
" query delay "
"avg: \t%.6fs "
"min: \t%.6fs "
"max: \t%.6fs "
"p90: \t%.6fs "
"p95: \t%.6fs "
"p99: \t%.6fs "
"SQL command: %s"
"\n",
nConcurrent, query_times, avg_delay / 1E6, /* avg */
sql->delay_list[0] / 1E6, /* min */
sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */
/* p90 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
/* p95 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
/* p99 */
sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
}

g_queryInfo.specifiedQueryInfo.totalQueried =
g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
tmfree((char *)pids);
tmfree((char *)infos);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
tmfree(sql->command);
tmfree(sql->delay_list);
}
benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);

// free specialQueryInfo
freeSpecialQueryInfo();
return 0;
}

Expand Down Expand Up @@ -587,6 +634,8 @@ static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
pThreadInfo->query_delay_list->size);
total_delay += pThreadInfo->total_delay;
tmfree(pThreadInfo->query_delay_list);
pThreadInfo->query_delay_list = NULL;

if (iface == REST_IFACE) {
#ifdef WINDOWS
closesocket(pThreadInfo->sockfd);
Expand Down
12 changes: 11 additions & 1 deletion src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,17 @@ SBenchConn* initBenchConn() {
}

void closeBenchConn(SBenchConn* conn) {
if(conn == NULL)
return ;
#ifdef WEBSOCKET
if (g_arguments->websocket) {
ws_close(conn->taos_ws);
} else {
#endif
taos_close(conn->taos);
if(conn->taos) {
taos_close(conn->taos);
conn->taos = NULL;
}
if (conn->ctaos) {
taos_close(conn->ctaos);
conn->ctaos = NULL;
Expand Down Expand Up @@ -1166,6 +1171,11 @@ static void closeSockFd(int sockfd) {
}

void destroySockFd(int sockfd) {
// check valid
if (sockfd < 0) {
return;
}

// shutdown the connection since no more data will be sent
int result;
result = shutdown(sockfd, SHUT_WR);
Expand Down