Skip to content

Commit

Permalink
fix: taosdump use batch submit (#677)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuanKuanJun committed Jun 13, 2023
1 parent ca353a9 commit df524bf
Showing 1 changed file with 121 additions and 96 deletions.
217 changes: 121 additions & 96 deletions src/taosdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -6739,112 +6739,113 @@ static int64_t dumpInAvroDataImpl(
int64_t failed = 0;
int64_t count = 0;
int64_t countTSOutOfRange = 0;
char *tbName = NULL;
int colAdj = g_dumpInLooseModeFlag ? 0 : 1;

bool printDot = true;
while (!avro_file_reader_read_value(reader, &value)) {
avro_value_t tbname_value, tbname_branch;

char *tbName = NULL;
int colAdj = 1;
if (!g_dumpInLooseModeFlag) {
avro_value_get_by_name(&value, "tbname", &tbname_value, NULL);
avro_value_get_current_branch(&tbname_value, &tbname_branch);

size_t tbname_size;
avro_value_get_string(&tbname_branch,
(const char **)&tbName, &tbname_size);
} else {
tbName = malloc(TSDB_TABLE_NAME_LEN+1);
ASSERT(tbName);

char *dupSeq = strdup(fileName);
char *running = dupSeq;
strsep(&running, ".");
char *tb = strsep(&running, ".");

strcpy(tbName, tb);
free(dupSeq);
colAdj = 0;
}
debugPrint("%s() LN%d table: %s parsed from file:%s\n",
__func__, __LINE__, tbName, fileName);

const int escapedTbNameLen = TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 3;
char *escapedTbName = calloc(1, escapedTbNameLen);
if (NULL == escapedTbName) {
errorPrint("%s() LN%d, memory allocation failed!\n", __func__, __LINE__);
free(bindArray);
free(stmtBuffer);
free(tableDes);
tfree(tbName);
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
ws_stmt_close(ws_stmt);
// setTBName
if(tbName == NULL) {
avro_value_t tbname_value, tbname_branch;
if (!g_dumpInLooseModeFlag) {
avro_value_get_by_name(&value, "tbname", &tbname_value, NULL);
avro_value_get_current_branch(&tbname_value, &tbname_branch);

size_t tbname_size;
avro_value_get_string(&tbname_branch,
(const char **)&tbName, &tbname_size);
} else {
#endif
taos_stmt_close(stmt);
#ifdef WEBSOCKET
tbName = malloc(TSDB_TABLE_NAME_LEN+1);
ASSERT(tbName);

char *dupSeq = strdup(fileName);
char *running = dupSeq;
strsep(&running, ".");
char *tb = strsep(&running, ".");

strcpy(tbName, tb);
free(dupSeq);
}
debugPrint("%s() LN%d table: %s parsed from file:%s\n",
__func__, __LINE__, tbName, fileName);

const int escapedTbNameLen = TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 3;
char *escapedTbName = calloc(1, escapedTbNameLen);
if (NULL == escapedTbName) {
errorPrint("%s() LN%d, memory allocation failed!\n", __func__, __LINE__);
free(bindArray);
free(stmtBuffer);
free(tableDes);
tfree(tbName);
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
ws_stmt_close(ws_stmt);
} else {
#endif
taos_stmt_close(stmt);
#ifdef WEBSOCKET
}
#endif
return -1;
}
#endif
return -1;
}

#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
snprintf(escapedTbName, escapedTbNameLen, "%s.%s%s%s",
namespace, g_escapeChar, tbName, g_escapeChar);
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
snprintf(escapedTbName, escapedTbNameLen, "%s.%s%s%s",
namespace, g_escapeChar, tbName, g_escapeChar);

debugPrint("%s() LN%d escaped table: %s\n",
__func__, __LINE__, escapedTbName);
debugPrint("%s() LN%d escaped table: %s\n",
__func__, __LINE__, escapedTbName);

debugPrint("%s() LN%d, stmt: %p, will call ws_stmt_set_tbname(%s)\n",
__func__, __LINE__, ws_stmt, escapedTbName);
if (0 != (code = ws_stmt_set_tbname(ws_stmt, escapedTbName))) {
errorPrint("%s() LN%d, failed to execute ws_stmt_set_tbname(%s)."
" ws_taos: %p, code: 0x%08x, reason: %s\n",
__func__, __LINE__,
escapedTbName, taos, code, ws_errstr(ws_stmt));
free(escapedTbName);
freeTbNameIfLooseMode(tbName);
continue;
}
debugPrint("%s() LN%d, stmt: %p, ws_stmt_set_tbname(%s) done\n",
__func__, __LINE__, ws_stmt, escapedTbName);
} else {
#endif
snprintf(escapedTbName, escapedTbNameLen, "%s%s%s",
g_escapeChar, tbName, g_escapeChar);
debugPrint("%s() LN%d, stmt: %p, will call ws_stmt_set_tbname(%s)\n",
__func__, __LINE__, ws_stmt, escapedTbName);
if (0 != (code = ws_stmt_set_tbname(ws_stmt, escapedTbName))) {
errorPrint("%s() LN%d, failed to execute ws_stmt_set_tbname(%s)."
" ws_taos: %p, code: 0x%08x, reason: %s\n",
__func__, __LINE__,
escapedTbName, taos, code, ws_errstr(ws_stmt));
free(escapedTbName);
freeTbNameIfLooseMode(tbName);
continue;
}
debugPrint("%s() LN%d, stmt: %p, ws_stmt_set_tbname(%s) done\n",
__func__, __LINE__, ws_stmt, escapedTbName);
} else {
#endif
snprintf(escapedTbName, escapedTbNameLen, "%s%s%s",
g_escapeChar, tbName, g_escapeChar);

debugPrint("%s() LN%d escaped table: %s\n",
__func__, __LINE__, escapedTbName);
debugPrint("%s() LN%d escaped table: %s\n",
__func__, __LINE__, escapedTbName);

if (0 != taos_stmt_set_tbname(stmt, escapedTbName)) {
errorPrint("Failed to execute taos_stmt_set_tbname(%s)."
"reason: %s\n",
escapedTbName, taos_stmt_errstr(stmt));
free(escapedTbName);
freeTbNameIfLooseMode(tbName);
continue;
if (0 != taos_stmt_set_tbname(stmt, escapedTbName)) {
errorPrint("Failed to execute taos_stmt_set_tbname(%s)."
"reason: %s\n",
escapedTbName, taos_stmt_errstr(stmt));
free(escapedTbName);
freeTbNameIfLooseMode(tbName);
tbName = NULL;
continue;
}
#ifdef WEBSOCKET
}
#ifdef WEBSOCKET
}
#endif
free(escapedTbName);

if ((0 == strlen(tableDes->name))
|| (0 != strcmp(tableDes->name, tbName))) {
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
getTableDesWS(taos, namespace,
tbName, tableDes, true);
} else {
#endif
getTableDesNative(taos, namespace,
tbName, tableDes, true);
#ifdef WEBSOCKET
#endif
free(escapedTbName);
if ((0 == strlen(tableDes->name))
|| (0 != strcmp(tableDes->name, tbName))) {
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
getTableDesWS(taos, namespace,
tbName, tableDes, true);
} else {
#endif
getTableDesNative(taos, namespace,
tbName, tableDes, true);
#ifdef WEBSOCKET
}
#endif
}
#endif
}
} // tbName

debugPrint("%s() LN%d, count: %"PRId64"\n",
__func__, __LINE__, count);
Expand Down Expand Up @@ -7096,7 +7097,9 @@ static int64_t dumpInAvroDataImpl(
countFailureAndFree(bindArray, onlyCol, &failed, tbName);
continue;
}
if (0 != (code = taos_stmt_execute(stmt))) {

// batch execute
if ( 0 == (count % g_args.data_batch) && 0 != (code = taos_stmt_execute(stmt))) {
if (code == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
countTSOutOfRange++;
} else {
Expand All @@ -7114,8 +7117,16 @@ static int64_t dumpInAvroDataImpl(
}
#endif
freeBindArray(bindArray, onlyCol);
freeTbNameIfLooseMode(tbName);
}

// last batch execute
if(0 != (count % g_args.data_batch)){
if (0 != (code = taos_stmt_execute(stmt))) {
errorPrint("error last =%s\n", taos_stmt_errstr(stmt));
}
}

freeTbNameIfLooseMode(tbName);
avro_value_decref(&value);
avro_value_iface_decref(value_class);
tfree(bindArray);
Expand Down Expand Up @@ -12387,6 +12398,9 @@ static int dumpEntry() {

time_t tTime = time(NULL);
struct tm tm = *localtime(&tTime);
printf("start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);

taos_options(TSDB_OPTION_CONFIGDIR, g_configDir);

Expand Down Expand Up @@ -12448,6 +12462,17 @@ static int dumpEntry() {
g_resultStatistics.totalRowsOfDumpOut);
}

// end time
tTime = time(NULL);
tm = *localtime(&tTime);
printf("end time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);

fprintf(g_fpOfResult, "end time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);

fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);

Expand Down

0 comments on commit df524bf

Please sign in to comment.