diff --git a/src/benchQuery.c b/src/benchQuery.c index 87096796..63816b81 100644 --- a/src/benchQuery.c +++ b/src/benchQuery.c @@ -184,9 +184,13 @@ static void *specifiedTableQuery(void *sarg) { } while (index < queryTimes) { - if (g_queryInfo.specifiedQueryInfo.queryInterval - && (et - st) < - (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval*1000) { + // check cancel + if (g_arguments->terminate) { + return NULL; + } + + if (g_queryInfo.specifiedQueryInfo.queryInterval && + (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) { toolsMsleep((int32_t)( g_queryInfo.specifiedQueryInfo.queryInterval*1000 - (et - st))); // ms @@ -392,144 +396,187 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) { return ret; } +// free g_queryInfo.specailQueryInfo memory , can re-call +void freeSpecialQueryInfo() { + // can re-call + if (g_queryInfo.specifiedQueryInfo.sqls == NULL) { + return; + } + + // loop free each item memory + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { + SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); + tmfree(sql->command); + tmfree(sql->delay_list); + } + + // free Array + benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); + g_queryInfo.specifiedQueryInfo.sqls = NULL; +} + + static int multi_thread_specified_table_query(uint16_t iface, char* dbName) { pthread_t * pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from specify table int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size; - if ((nSqlCount > 0) && (nConcurrent > 0)) { - pids = benchCalloc(1, nConcurrent*nSqlCount*sizeof(pthread_t), false); - infos = benchCalloc(1, nConcurrent*nSqlCount*sizeof(threadInfo), false); - for (uint64_t i = 0; i < nSqlCount; i++) { - SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); - for (int j = 0; j < nConcurrent; j++) { - uint64_t seq = i * nConcurrent + j; - threadInfo *pThreadInfo = infos + seq; - pThreadInfo->threadID = (int)seq; - pThreadInfo->querySeq = i; - if (iface == REST_IFACE) { - int sockfd = createSockFd(); - // int iMode = 1; - // ioctl(sockfd, FIONBIO, &iMode); - if (sockfd < 0) { - tmfree((char *)pids); - tmfree((char *)infos); - return -1; - } - pThreadInfo->sockfd = sockfd; - } else { - pThreadInfo->conn = initBenchConn(); - if (pThreadInfo->conn == NULL) { - destroySockFd(pThreadInfo->sockfd); - tmfree((char *)pids); - tmfree((char *)infos); - return -1; - } + + // check invaid + if(nSqlCount == 0 || nConcurrent == 0 ) { + if(nSqlCount == 0) + infoPrint(" query sql count is %" PRIu64 ". must set query sqls. \n", nSqlCount); + if(nConcurrent == 0) + infoPrint(" concurrent is %d , specified_table_query->concurrent must not zero. \n", nConcurrent); + return 0; + } + + // malloc funciton global memory + pids = benchCalloc(1, nConcurrent * sizeof(pthread_t), false); + infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false); + + bool exeError = false; + for (uint64_t i = 0; i < nSqlCount; i++) { + // reset + memset(pids, 0, nConcurrent * sizeof(pthread_t)); + memset(infos, 0, nConcurrent * sizeof(threadInfo)); + + // get execute sql + SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); + + // create threads + for (int j = 0; j < nConcurrent; j++) { + threadInfo *pThreadInfo = infos + j; + pThreadInfo->threadID = i * nConcurrent + j; + pThreadInfo->querySeq = i; + if (iface == REST_IFACE) { + int sockfd = createSockFd(); + // int iMode = 1; + // ioctl(sockfd, FIONBIO, &iMode); + if (sockfd < 0) { + exeError = true; + + break; } + pThreadInfo->sockfd = sockfd; + } else { + pThreadInfo->conn = initBenchConn(); + if (pThreadInfo->conn == NULL) { + destroySockFd(pThreadInfo->sockfd); + exeError = true; + break; + } + } - pthread_create(pids + seq, NULL, specifiedTableQuery, - pThreadInfo); - } - for (int j = 0; j < nConcurrent; j++) { - uint64_t seq = i * nConcurrent + j; - pthread_join(pids[seq], NULL); - threadInfo *pThreadInfo = infos + seq; - if (iface == REST_IFACE) { + pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo); + } + + // if failed, set termainte flag true like ctrl+c exit + if (exeError) { + errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i); + g_arguments->terminate = true; + } + + // wait threads execute finished one by one + for (int j = 0; (j < nConcurrent && pids[j] > 0) ; j++) { + pthread_join(pids[j], NULL); + threadInfo *pThreadInfo = infos + j; + if (iface == REST_IFACE) { #ifdef WINDOWS - closesocket(pThreadInfo->sockfd); - WSACleanup(); + closesocket(pThreadInfo->sockfd); + WSACleanup(); #else - close(pThreadInfo->sockfd); + close(pThreadInfo->sockfd); #endif - } else { - closeBenchConn(pThreadInfo->conn); - } - if (g_fail) { - tmfree(pThreadInfo->query_delay_list); - } - } + } else { + closeBenchConn(pThreadInfo->conn); + pThreadInfo->conn = NULL; + } - if (g_fail) { - tmfree((char *)pids); - tmfree((char *)infos); - return -1; - } - uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes; - uint64_t totalQueryTimes = query_times * nConcurrent; - double avg_delay = 0.0; - for (int j = 0; j < nConcurrent; j++) { - uint64_t seq = i * nConcurrent + j; - threadInfo *pThreadInfo = infos + seq; - avg_delay += pThreadInfo->avg_delay; - for (uint64_t k = 0; - k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) { - sql->delay_list[j*query_times + k] = - pThreadInfo->query_delay_list[k]; - } + // need exit in loop + if (g_fail || g_arguments->terminate) { + // free BArray tmfree(pThreadInfo->query_delay_list); - } - avg_delay /= nConcurrent; - qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, - sizeof(uint64_t), compare); - infoPrintNoTimestamp("complete query with %d threads and %"PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, - avg_delay/1E6, /* avg */ - sql->delay_list[0]/1E6, /* min */ - sql->delay_list[totalQueryTimes - 1]/1E6, /* max */ - /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6, - /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6, - /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6, - sql->command); - infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult, - "complete query with %d threads and %"PRIu64 - " query delay " - "avg: \t%.6fs " - "min: \t%.6fs " - "max: \t%.6fs " - "p90: \t%.6fs " - "p95: \t%.6fs " - "p99: \t%.6fs " - "SQL command: %s" - "\n", - nConcurrent, query_times, - avg_delay/1E6, /* avg */ - sql->delay_list[0]/1E6, /* min */ - sql->delay_list[totalQueryTimes - 1]/1E6, /* max */ - /* p90 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6, - /* p95 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6, - /* p99 */ - sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6, - sql->command); + pThreadInfo->query_delay_list = NULL; + } } - } else { - return 0; + + // cancel or need exit check + if (g_fail || g_arguments->terminate) { + // free current funciton malloc memory + tmfree((char *)pids); + tmfree((char *)infos); + // free global + freeSpecialQueryInfo(); + return -1; + } + + // execute successfully + uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes; + uint64_t totalQueryTimes = query_times * nConcurrent; + double avg_delay = 0.0; + for (int j = 0; j < nConcurrent; j++) { + threadInfo *pThreadInfo = infos + j; + avg_delay += pThreadInfo->avg_delay; + for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) { + sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k]; + } + + // free BArray + tmfree(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + } + avg_delay /= nConcurrent; + qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare); + infoPrintNoTimestamp("complete query with %d threads and %" PRIu64 + " query delay " + "avg: \t%.6fs " + "min: \t%.6fs " + "max: \t%.6fs " + "p90: \t%.6fs " + "p95: \t%.6fs " + "p99: \t%.6fs " + "SQL command: %s" + "\n", + nConcurrent, query_times, avg_delay / 1E6, /* avg */ + sql->delay_list[0] / 1E6, /* min */ + sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ + /* p90 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, + /* p95 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, + /* p99 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); + infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult, + "complete query with %d threads and %" PRIu64 + " query delay " + "avg: \t%.6fs " + "min: \t%.6fs " + "max: \t%.6fs " + "p90: \t%.6fs " + "p95: \t%.6fs " + "p99: \t%.6fs " + "SQL command: %s" + "\n", + nConcurrent, query_times, avg_delay / 1E6, /* avg */ + sql->delay_list[0] / 1E6, /* min */ + sql->delay_list[totalQueryTimes - 1] / 1E6, /* max */ + /* p90 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6, + /* p95 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6, + /* p99 */ + sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command); } g_queryInfo.specifiedQueryInfo.totalQueried = g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent; tmfree((char *)pids); tmfree((char *)infos); - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) { - SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i); - tmfree(sql->command); - tmfree(sql->delay_list); - } - benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls); + + // free specialQueryInfo + freeSpecialQueryInfo(); return 0; } @@ -587,6 +634,8 @@ static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) { pThreadInfo->query_delay_list->size); total_delay += pThreadInfo->total_delay; tmfree(pThreadInfo->query_delay_list); + pThreadInfo->query_delay_list = NULL; + if (iface == REST_IFACE) { #ifdef WINDOWS closesocket(pThreadInfo->sockfd); diff --git a/src/benchUtil.c b/src/benchUtil.c index 9cc86ca9..69aeb47c 100644 --- a/src/benchUtil.c +++ b/src/benchUtil.c @@ -310,12 +310,17 @@ SBenchConn* initBenchConn() { } void closeBenchConn(SBenchConn* conn) { + if(conn == NULL) + return ; #ifdef WEBSOCKET if (g_arguments->websocket) { ws_close(conn->taos_ws); } else { #endif - taos_close(conn->taos); + if(conn->taos) { + taos_close(conn->taos); + conn->taos = NULL; + } if (conn->ctaos) { taos_close(conn->ctaos); conn->ctaos = NULL; @@ -1166,6 +1171,11 @@ static void closeSockFd(int sockfd) { } void destroySockFd(int sockfd) { + // check valid + if (sockfd < 0) { + return; + } + // shutdown the connection since no more data will be sent int result; result = shutdown(sockfd, SHUT_WR);