Skip to content

Commit

Permalink
Merge pull request #703 from taosdata/feat/TS-3949
Browse files Browse the repository at this point in the history
feat: taosdump support rename database during importing
  • Loading branch information
DuanKuanJun committed Sep 14, 2023
2 parents c7cd35a + 4148259 commit df061ca
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 44 deletions.
4 changes: 2 additions & 2 deletions VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
taosbenchmark-3.2.3
taosdump-2.5.3
2.5.3
taosdump-2.5.4
2.5.4
240 changes: 220 additions & 20 deletions src/taosdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ static struct argp_option options[] = {
"Server host from which to dump data. Default is localhost.", 0},
{"user", 'u', "USER", 0,
"User name used to connect to server. Default is root.", 0},
{"password", 'p', 0, 0,
{"password", 'p', 0, 0,
"User password to connect to server. Default is taosdata.", 0},
{"port", 'P', "PORT", 0, "Port to connect", 0},
// input/output file
Expand Down Expand Up @@ -494,12 +494,21 @@ static struct argp_option options[] = {
#endif
{"debug", 'g', 0, 0, "Print debug info.", 15},
{"dot-replace", 'Q', 0, 0, "Repalce dot character with underline character in the table name.", 10},
{"rename", 'W', "RENAME-LIST", 0, "Rename database name with new name during importing data. RENAME-LIST: \"db1=newDB1|db2=newDB2\" means rename db1 to newDB1 and rename db2 to newDB2", 10},
{0}
};

#define HUMAN_TIME_LEN 28
#define DUMP_DIR_LEN (MAX_DIR_LEN - (TSDB_DB_NAME_LEN + 10))

// rename db
struct SRenameDB;
typedef struct SRenameDB {
char* old;
char* new;
void* next;
}SRenameDB;

/* Used by main to communicate with parse_opt. */
typedef struct arguments {
// connection option
Expand Down Expand Up @@ -545,9 +554,7 @@ typedef struct arguments {
bool verbose_print;
bool performance_print;
bool dotReplace;

int dumpDbCount;

#ifdef WEBSOCKET
bool restful;
bool cloud;
Expand All @@ -557,6 +564,10 @@ typedef struct arguments {
int cloudPort;
char cloudHost[MAX_HOSTNAME_LEN];
#endif

// put rename db string
char * renameBuf;
SRenameDB * renameHead;
} SArguments;

static resultStatistics g_resultStatistics = {0};
Expand Down Expand Up @@ -611,6 +622,7 @@ struct arguments g_args = {
false, // performance_print
false, // dotRepalce
0, // dumpDbCount

#ifdef WEBSOCKET
false, // restful
false, // cloud
Expand All @@ -620,6 +632,9 @@ struct arguments g_args = {
0, // cloudPort
{0}, // cloudHost
#endif // WEBSOCKET

NULL, // renameBuf
NULL // renameHead
};


Expand Down Expand Up @@ -781,6 +796,158 @@ int64_t getEndTime(int precision) {
return end_time;
}

SRenameDB* newNode(char* first, SRenameDB* prev) {
SRenameDB* node = (SRenameDB*) malloc(sizeof(SRenameDB));
memset(node, 0, sizeof(SRenameDB));
node->old = first;
// link to list
if(prev) {
prev->next = node;
}

return node;
}

void setRenameDbs(char* arg) {
if (arg == NULL) return ;
// malloc new
int len = strlen(arg);
if(len <= 2) {
return ;
}
len += 1; // include \0

// malloc
char* p = malloc(len);
int j = 0; // j is p pos
for (int i = 0; i < len; i++) {
if (arg[i] == ' ') {
// do nothing
} else if (arg[i] == '=' || arg[i] == '|') {
// set zero
p[j++] = 0;
} else {
// copy
p[j++] = arg[i];
}
}

// splite
SRenameDB* node = newNode(p, NULL);
g_args.renameHead = node;
for (int k = 0; k < j; k++) {
if(p[k] == 0 && k + 1 != j && k > 0) {
// string end and not last end
char* name = &p[k] + 1;
if (node->new == NULL) {
node->new = name;
} else {
node = newNode(name, node);
}
}
}

// end
g_args.renameBuf = p;
}

// find newName
char* findNewName(char* oldName) {
SRenameDB* node = g_args.renameHead;
while(node) {
if (strcmp(node->old, oldName) == 0) {
return node->new;
}
node = (SRenameDB* )node->next;
}
return NULL;
}

bool replaceCopy(char *des, char *src) {
size_t len = strlen(src);
bool replace = false;
for (size_t i = 0; i <= len; i++) {
if (src[i] == '.') {
des[i] = '_';
replace = true;
} else {
des[i] = src[i];
}
}

return replace;
}

// repalce old name with new
char * replaceNewName(char* cmd, int len) {
// database name left char and right char
int nLeftSql = len;
char left = cmd[len];
char right = '.';
if(left == '`') {
right = left;
nLeftSql += 1;
}

// get old database name
char oldName[TSDB_DB_NAME_LEN];
char* s = &cmd[nLeftSql];
char* e = strchr(s, right);
char* e1 = strchr(s, ' ');
if(e == NULL && e1 == NULL) {
return NULL;
} else if(e == NULL && e1) {
e = e1;
} else if(e && e1 ) {
if (e > e1) {
e = e1;
}
}

int oldLen = e - s;
if(oldLen + 1 > TSDB_DB_NAME_LEN) {
return NULL;
}
memcpy(oldName, s, oldLen);
oldName[oldLen] = 0;

// macth new database
char* newName = findNewName(oldName);
if(newName == NULL){
return NULL;
}

// malloc new buff put new sql with new name
int newLen = strlen(cmd) + (strlen(newName) - oldLen) + 1;
char* newCmd = (char *)malloc(newLen);
memset(newCmd, 0, newLen);

// copy left + newName + right from cmd
memcpy(newCmd, cmd, nLeftSql); // left sql
strcat(newCmd, newName); // newName
strcat(newCmd, e); // right sql

return newCmd;
}

// if have database name rename, return new sql with new database name
// retrn value need call free() to free memory
char * afterRenameSql(char *cmd) {
// match pattern
const char* CREATE_DB = "CREATE DATABASE IF NOT EXISTS ";
const char* CREATE_TB = "CREATE TABLE IF NOT EXISTS ";

const char* pres[] = {CREATE_DB, CREATE_TB};
for (int i = 0; i < sizeof(pres); i++ ) {
int len = strlen(pres[i]);
if (strncmp(cmd, pres[i], len) == 0) {
// found
return replaceNewName(cmd, len);
}
}
return NULL;
}

/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
Expand Down Expand Up @@ -967,6 +1134,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
}
state->next = state->argc;
break;
case 'W':
setRenameDbs(arg);
break;

default:
return ARGP_ERR_UNKNOWN;
Expand Down Expand Up @@ -1805,21 +1975,6 @@ static int getDumpDbCount() {
return count;
}

bool replaceCopy(char *des, char *src) {
size_t len = strlen(src);
bool replace = false;
for (size_t i = 0; i <= len; i++) {
if (src[i] == '.') {
des[i] = '_';
replace = true;
} else {
des[i] = src[i];
}
}

return replace;
}

static int dumpCreateMTableClause(
const char* dbName,
const char *stable,
Expand Down Expand Up @@ -6007,6 +6162,13 @@ static int64_t dumpInAvroNtbImpl(
__func__, __LINE__);
continue;
}

char* newBuf = afterRenameSql(buf);
if(newBuf) {
infoPrint(" rename database name for create normal table sql: \n old=%s\n new=%s\n", buf, newBuf);
buf = newBuf;
}

#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
WS_RES *ws_res = ws_query_timeout(taos, buf, g_args.ws_timeout);
Expand All @@ -6026,6 +6188,9 @@ static int64_t dumpInAvroNtbImpl(
} else {
#endif
TAOS_RES *res = taos_query(taos, buf);
if(newBuf) {
free(newBuf);
}
int code = taos_errno(res);
if (0 != code) {
errorPrint("%s() LN%d,"
Expand Down Expand Up @@ -7320,6 +7485,13 @@ static int64_t dumpInOneAvroFile(
}

const char *namespace = avro_schema_namespace((const avro_schema_t)schema);
if(g_args.renameHead) {
char* newDbName = findNewName((char *)namespace);
if(newDbName) {
infoPrint(" ------- rename DB Name %s to %s ------\n", namespace, newDbName);
namespace = newDbName;
}
}
debugPrint("%s() LN%d, Namespace: %s\n",
__func__, __LINE__, namespace);

Expand Down Expand Up @@ -10083,14 +10255,24 @@ static int64_t dumpInOneDebugFile(
}

int ret;
char *newSql = NULL;

if(g_args.renameHead) {
// have rename database options
newSql = afterRenameSql(cmd);
}

debugPrint("%s() LN%d, cmd: %s\n", __func__, __LINE__, cmd);
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
ret = queryDbImplWS(taos, cmd);
ret = queryDbImplWS(taos, newSql?newSql:cmd);
} else {
#endif
ret = queryDbImplNative(taos, cmd);
ret = queryDbImplNative(taos, newSql?newSql:cmd);
if(newSql) {
free(newSql);
}

#ifdef WEBSOCKET
}
#endif
Expand Down Expand Up @@ -13127,6 +13309,24 @@ int main(int argc, char *argv[]) {
} else {
ret = dumpEntry();
}

// free buf
if (g_args.renameBuf) {
free(g_args.renameBuf);
g_args.renameBuf = NULL;
}

// free node
SRenameDB* node = g_args.renameHead;
g_args.renameHead = NULL;
while(node) {
SRenameDB* next = (SRenameDB*)node->next;
free(node);
node = next;
}



return ret;
}

6 changes: 3 additions & 3 deletions tests/taosdump/native/taosdumpDbNtb.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,21 @@ def run(self):
tdSql.execute("drop database db")
# sys.exit(1)

os.system("%s -i %s -T 1" % (binPath, self.tmpdir))
os.system("%s -i %s -T 1 -W db=newdb" % (binPath, self.tmpdir))

tdSql.query("show databases")
dbresult = tdSql.queryResult

found = False
for i in range(len(dbresult)):
print("Found db: %s" % dbresult[i][0])
if dbresult[i][0] == "db":
if dbresult[i][0] == "newdb":
found = True
break

assert found == True

tdSql.execute("use db")
tdSql.execute("use newdb")
tdSql.query("show stables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "st")
Expand Down
Loading

0 comments on commit df061ca

Please sign in to comment.