Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/TD-26260: support geometry (not include schemaless) #704

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions inc/benchData.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
/***** Global variables ******/
/***** Declare functions *****/
void rand_string(char *str, int size, bool chinese);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,
int disorderRange);
void rand_geometry(char *str, int fieldLen, int maxType);
int geoCalcBufferSize(int fieldLen);
int getGeoMaxType(int fieldLen);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, int disorderRange);
int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf,
int64_t bufLen,
int lenOfOneRow, BArray * fields, int64_t loop,
Expand Down
12 changes: 6 additions & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
ADD_DEPENDENCIES(taosdump deps-jansson)
ADD_DEPENDENCIES(taosdump deps-snappy)
IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ELSE()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF()
ELSE ()
INCLUDE_DIRECTORIES(/usr/local/include)
Expand All @@ -233,9 +233,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(OS_ID "Darwin")

IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ELSE()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF()
ENDIF ()

Expand Down Expand Up @@ -427,9 +427,9 @@ ELSE ()
SET(CMAKE_C_STANDARD 11)
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /utf-8")
IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c)
ELSE ()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF ()

ADD_EXECUTABLE(taosdump taosdump.c toolsSys.c toolstime.c toolsDir.c toolsString.c)
Expand Down
34 changes: 34 additions & 0 deletions src/benchData.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ uint32_t accumulateRowLen(BArray *fields, int iface) {
switch (field->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY:
len += field->length + 3;
break;
case TSDB_DATA_TYPE_INT:
Expand Down Expand Up @@ -387,6 +388,28 @@ static int tmpStr(char *tmp, int iface, Field *field, int i) {
return 0;
}

static int tmpGeometry(char *tmp, int iface, Field *field, int i) {
if (g_arguments->demo_mode) {
// TODO
} else if (field->values) {
int arraySize = tools_cJSON_GetArraySize(field->values);
if (arraySize) {
tools_cJSON *buf = tools_cJSON_GetArrayItem(field->values, taosRandom() % arraySize);
snprintf(tmp, field->length, "%s", buf->valuestring);
} else {
errorPrint(
"%s() cannot read correct value "
"from json file. array size: %d\n",
__func__, arraySize);
return -1;
}
} else {
int maxType = getGeoMaxType(field->length);
rand_geometry(tmp, field->length, maxType);
}
return 0;
}

FORCE_INLINE double tmpDoubleImpl(Field *field, int32_t angle) {
double doubleTmp = (double)(field->min);

Expand Down Expand Up @@ -708,6 +731,17 @@ static int generateRandDataSQL(SSuperTable *stbInfo, char *sampleDataBuf,
tmfree(tmp);
break;
}
case TSDB_DATA_TYPE_GEOMETRY: {
int bufferSize = geoCalcBufferSize(field->length);
char *tmp = benchCalloc(1, bufferSize, false);
if (0 != tmpGeometry(tmp, stbInfo->iface, field, i)) {
free(tmp);
return -1;
}
n = snprintf(sampleDataBuf + pos, bufLen - pos, "'%s',", tmp);
tmfree(tmp);
break;
}
case TSDB_DATA_TYPE_JSON: {
pos += tmpJson(sampleDataBuf, bufLen, pos,
fieldsSize, field);
Expand Down
158 changes: 158 additions & 0 deletions src/benchDataGeometry.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the MIT license as published by the Free Software
* Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*/

#include <bench.h>
#include <benchData.h>

typedef struct {
double x;
double y;
} SGeoCoord2D;

typedef enum { GEO_SUB_TYPE_POINT = 0, GEO_SUB_TYPE_LINESTRING, GEO_SUB_TYPE_POLYGON, GEO_SUB_TYPE_COUNT } EGeoSubType;

const int CCoordSize = 16;

typedef struct {
const char *preifx;
const char *suffix;
int baseLen;
int minCoordNum;
int maxCoordNum;
bool isClosed; // if true, first coord and last coord must be the same
} SGeoSubTypeInfo;

// should be ordered by (baseLen + minCoordNum * CCoordSize), ASC
const SGeoSubTypeInfo GeoInfo[] = {
{"POINT(", ")", 5, 1, 1, false}, {"LINESTRING(", ")", 9, 2, 4094, false}, {"POLYGON((", "))", 13, 3, 4094, true}};

typedef struct {
EGeoSubType type;
BArray *coordArray; // element: SGeoCoord2D
} SGeoObj2D;

static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray);
static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum);
static void geoObject2DDestory(SGeoObj2D *pObj);

static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord);
static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray);
static int geoObject2DToStr(char *buffer, SGeoObj2D *object);

static BArray *randCoordArray(int count);

/*--- init & destory ---*/
static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray) {
SGeoObj2D *pObj = (SGeoObj2D *)benchCalloc(1, sizeof(SGeoObj2D), true);
pObj->type = subType;
pObj->coordArray = coordArray;
return pObj;
}

static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum) {
const SGeoSubTypeInfo *info = &GeoInfo[subType];

if (info->isClosed) {
coordNum = coordNum - 1;
}
BArray *array = randCoordArray(coordNum);
if (info->isClosed) {
SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true);
SGeoCoord2D *pFirstCoord= benchArrayGet(array, 0);
memcpy(pCoord, pFirstCoord, sizeof(SGeoCoord2D));
benchArrayPush(array, pCoord);
}
return geoObject2DInit(subType, array);
}

static void geoObject2DDestory(SGeoObj2D *pObj) {
if (!pObj) return;
benchArrayDestroy(pObj->coordArray);
tmfree(pObj);
}

/*--- string formatters ---*/
static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord) { return sprintf(buffer, "%10.6lf %10.6lf", pCoord->x, pCoord->y); }

static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray) {
int pos = 0;
bool firstCoord = true;
for (int i = 0; i < coordArray->size; i++) {
int size = 0;
if (firstCoord) {
firstCoord = false;
} else {
size = sprintf(buffer + pos, "%s", ", ");
pos += size;
}
size = geoCoord2DToStr(buffer + pos, benchArrayGet(coordArray, i));
pos += size;
}
return pos;
}

static int geoObject2DToStr(char *buffer, SGeoObj2D *object) {
int pos = sprintf(buffer, "%s", GeoInfo[object->type].preifx);
pos += geoCoord2DArrayToStr(buffer + pos, object->coordArray);
pos += sprintf(buffer + pos, "%s", GeoInfo[object->type].suffix);
return pos;
}

static BArray *randCoordArray(int count) {
BArray *array = benchArrayInit(8, sizeof(SGeoCoord2D));
int minVal = -1000, maxVal = 1000;
for (int i = 0; i < count; i++) {
double x = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal);
double y = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal);
SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true);
pCoord->x = x;
pCoord->y = y;
benchArrayPush(array, pCoord);
}
return array;
}

int geoCalcBufferSize(int fieldLen) {
// not accurate, but enough
return fieldLen + 20;
}

int getGeoMaxType(int fieldLen) {
int maxType = -1;
for (int type = GEO_SUB_TYPE_COUNT - 1; type >= 0; type--) {
const SGeoSubTypeInfo *info = &GeoInfo[type];

int minLen = info->baseLen + info->minCoordNum * CCoordSize;
if (fieldLen >= minLen) {
maxType = type;
break;
}
}
return maxType;
}

void rand_geometry(char *str, int fieldLen, int maxType) {
EGeoSubType type = taosRandom() % (maxType + 1);
const SGeoSubTypeInfo *info = &GeoInfo[type];

int maxCoordNum = (fieldLen - info->baseLen) / CCoordSize;
maxCoordNum = min(maxCoordNum, info->maxCoordNum);

int coordNum = info->minCoordNum;
if (maxCoordNum > info->minCoordNum) {
coordNum = info->minCoordNum + taosRandom() % (maxCoordNum - info->minCoordNum);
}

SGeoObj2D *pObj = geoObject2DRandInit(type, coordNum);
geoObject2DToStr(str, pObj);
geoObject2DDestory(pObj);
}
18 changes: 15 additions & 3 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,16 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
Field * col = benchArrayGet(stbInfo->cols, colIndex);
int n;
if (col->type == TSDB_DATA_TYPE_BINARY ||
col->type == TSDB_DATA_TYPE_NCHAR) {
col->type == TSDB_DATA_TYPE_NCHAR ||
col->type == TSDB_DATA_TYPE_GEOMETRY) {
n = snprintf(colsBuf + len, col_buffer_len - len,
",%s %s(%d)", col->name,
convertDatatypeToString(col->type), col->length);
if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
colIndex);
return -1;
}
} else {
n = snprintf(colsBuf + len, col_buffer_len - len,
",%s %s", col->name,
Expand Down Expand Up @@ -282,10 +288,16 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
if (tag->type == TSDB_DATA_TYPE_BINARY ||
tag->type == TSDB_DATA_TYPE_NCHAR) {
tag->type == TSDB_DATA_TYPE_NCHAR ||
tag->type == TSDB_DATA_TYPE_GEOMETRY) {
n = snprintf(tagsBuf + len, tag_buffer_len - len,
"%s %s(%d),", tag->name,
convertDatatypeToString(tag->type), tag->length);
if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
tagIndex);
return -1;
}
} else if (tag->type == TSDB_DATA_TYPE_JSON) {
n = snprintf(tagsBuf + len, tag_buffer_len - len,
"%s json", tag->name);
Expand Down Expand Up @@ -1738,7 +1750,7 @@ static void *syncWriteInterlace(void *sarg) {
infoPrint(
"thread[%d] has currently inserted rows: %" PRIu64
", peroid insert rate: %.3f rows/s \n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
lastPrintTime = currentPrintTime;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
Expand Down
5 changes: 3 additions & 2 deletions src/benchJsonOpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ static int getColumnAndTagTypeFromInsertJsonFile(
} else {
if (type == TSDB_DATA_TYPE_BINARY
|| type == TSDB_DATA_TYPE_JSON
|| type == TSDB_DATA_TYPE_NCHAR) {
|| type == TSDB_DATA_TYPE_NCHAR
|| type == TSDB_DATA_TYPE_GEOMETRY) {
length = g_arguments->binwidth;
} else {
length = convertTypeToLength(type);
Expand Down Expand Up @@ -1840,7 +1841,7 @@ static int getMetaFromTmqJsonFile(tools_cJSON *json) {
if (tools_cJSON_IsString(groupMode)) {
g_tmqInfo.consumerInfo.groupMode = groupMode->valuestring;
}


tools_cJSON *pollDelay = tools_cJSON_GetObjectItem(tmqInfo, "poll_delay");
if (tools_cJSON_IsNumber(pollDelay)) {
Expand Down
12 changes: 9 additions & 3 deletions src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void closeBenchConn(SBenchConn* conn) {
if(conn->taos) {
taos_close(conn->taos);
conn->taos = NULL;
}
}
if (conn->ctaos) {
taos_close(conn->ctaos);
conn->ctaos = NULL;
Expand Down Expand Up @@ -840,6 +840,8 @@ char *convertDatatypeToString(int type) {
return "double";
case TSDB_DATA_TYPE_JSON:
return "json";
case TSDB_DATA_TYPE_GEOMETRY:
return "geometry";
default:
break;
}
Expand Down Expand Up @@ -972,6 +974,8 @@ int convertStringToDatatype(char *type, int length) {
return TSDB_DATA_TYPE_JSON;
} else if (0 == strcasecmp(type, "varchar")) {
return TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(type, "geometry")) {
return TSDB_DATA_TYPE_GEOMETRY;
} else {
errorPrint("unknown data type: %s\n", type);
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -1009,6 +1013,8 @@ int convertStringToDatatype(char *type, int length) {
return TSDB_DATA_TYPE_JSON;
} else if (0 == strncasecmp(type, "varchar", length)) {
return TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(type, "geometry")) {
return TSDB_DATA_TYPE_GEOMETRY;
} else {
errorPrint("unknown data type: %s\n", type);
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -1065,7 +1071,7 @@ void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems) {

void* dst = BARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize * elems);
tmfree(pData);
tmfree(pData); // TODO remove this
pArray->size += elems;
return dst;
}
Expand Down Expand Up @@ -1191,7 +1197,7 @@ void destroySockFd(int sockfd) {
if (sockfd < 0) {
return;
}

// shutdown the connection since no more data will be sent
int result;
result = shutdown(sockfd, SHUT_WR);
Expand Down
Loading