Skip to content

Commit 5189991

Browse files
[Feature][Doris] Add Doris type converter (apache#6354)
--------- Co-authored-by: Jia Fan <[email protected]>
1 parent 52d1020 commit 5189991

File tree

54 files changed

+4516
-843
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+4516
-843
lines changed

docs/en/connector-v2/sink/Doris.md

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ You can use the following placeholders
116116
description of Doris
117117
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
118118
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
119+
- rowtype_duplicate_key: Used to get the duplicate key in the upstream schema (only for doris source, maybe a list)
119120

120121
## Data Type Mapping
121122

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
public enum SaveModePlaceHolder {
2424
ROWTYPE_PRIMARY_KEY("rowtype_primary_key", "primary keys"),
2525
ROWTYPE_UNIQUE_KEY("rowtype_unique_key", "unique keys"),
26+
ROWTYPE_DUPLICATE_KEY("rowtype_duplicate_key", "duplicate keys"),
2627
ROWTYPE_FIELDS("rowtype_fields", "fields"),
2728
TABLE_NAME("table_name", "table name"),
2829
DATABASE("database", "database");

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java

+15
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,21 @@ default Optional<Factory> getFactory() {
131131
*/
132132
CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException;
133133

134+
/**
135+
* Return a {@link CatalogTable} identified by the given {@link TablePath} and field names. The
136+
* framework will resolve the metadata objects when necessary.
137+
*
138+
* @param tablePath Path of the table
139+
* @param fieldNames The field names need read
140+
* @return The requested table
141+
* @throws CatalogException in case of any runtime exception
142+
*/
143+
default CatalogTable getTable(TablePath tablePath, List<String> fieldNames)
144+
throws CatalogException, TableNotExistException {
145+
throw CommonError.unsupportedOperation(
146+
name(), "get table with tablePath " + tablePath + ", fieldNames: " + fieldNames);
147+
}
148+
134149
default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogException {
135150
// Get the list of specified tables
136151
List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicDataConverter.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.seatunnel.api.table.catalog.Column;
2121
import org.apache.seatunnel.api.table.type.ArrayType;
22-
import org.apache.seatunnel.api.table.type.BasicType;
2322
import org.apache.seatunnel.api.table.type.MapType;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2524
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -164,7 +163,7 @@ default Object[] convertArray(T typeDefine, Column columnDefine, Object value)
164163
default Object[] convertArray(ArrayType typeDefine, Object value)
165164
throws UnsupportedOperationException {
166165
if (value.getClass().isArray()) {
167-
BasicType elementType = typeDefine.getElementType();
166+
SeaTunnelDataType elementType = typeDefine.getElementType();
168167

169168
Object[] array = (Object[]) value;
170169
for (int i = 0; i < array.length; i++) {
@@ -173,7 +172,7 @@ default Object[] convertArray(ArrayType typeDefine, Object value)
173172
return array;
174173
}
175174
if (value instanceof List) {
176-
BasicType elementType = typeDefine.getElementType();
175+
SeaTunnelDataType elementType = typeDefine.getElementType();
177176

178177
List<Object> list = (List<Object>) value;
179178
int elements = list.size();
@@ -183,7 +182,7 @@ default Object[] convertArray(ArrayType typeDefine, Object value)
183182
return list.toArray();
184183
}
185184
if (value instanceof Set) {
186-
BasicType elementType = typeDefine.getElementType();
185+
SeaTunnelDataType elementType = typeDefine.getElementType();
187186

188187
return ((Set) value).stream().map(e -> convert(elementType, e)).toArray();
189188
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,26 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
3939
public static final ArrayType<Double[], Double> DOUBLE_ARRAY_TYPE =
4040
new ArrayType<>(Double[].class, BasicType.DOUBLE_TYPE);
4141

42+
public static final ArrayType<LocalTimeType[], LocalTimeType> LOCAL_DATE_ARRAY_TYPE =
43+
new ArrayType(LocalTimeType[].class, LocalTimeType.LOCAL_DATE_TYPE);
44+
45+
public static final ArrayType<LocalTimeType[], LocalTimeType> LOCAL_TIME_ARRAY_TYPE =
46+
new ArrayType(LocalTimeType[].class, LocalTimeType.LOCAL_TIME_TYPE);
47+
48+
public static final ArrayType<LocalTimeType[], LocalTimeType> LOCAL_DATE_TIME_ARRAY_TYPE =
49+
new ArrayType(LocalTimeType[].class, LocalTimeType.LOCAL_DATE_TIME_TYPE);
50+
4251
// --------------------------------------------------------------------------------------------
4352

4453
private final Class<T> arrayClass;
45-
private final BasicType<E> elementType;
54+
private final SeaTunnelDataType<E> elementType;
4655

47-
private ArrayType(Class<T> arrayClass, BasicType<E> elementType) {
56+
protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
4857
this.arrayClass = arrayClass;
4958
this.elementType = elementType;
5059
}
5160

52-
public BasicType<E> getElementType() {
61+
public SeaTunnelDataType<E> getElementType() {
5362
return elementType;
5463
}
5564

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.type;
19+
20+
public class DecimalArrayType extends ArrayType {
21+
private static final long serialVersionUID = 1L;
22+
23+
public static final Class arrayClass = DecimalType[].class;
24+
25+
public DecimalArrayType(DecimalType elementType) {
26+
super(arrayClass, elementType);
27+
}
28+
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,26 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
142142
case TIMESTAMP:
143143
return 48;
144144
case ARRAY:
145+
SeaTunnelDataType elementType = ((ArrayType) dataType).getElementType();
146+
if (elementType instanceof DecimalType) {
147+
return ((Object[]) v).length * 36;
148+
}
149+
150+
if (elementType instanceof LocalTimeType) {
151+
SqlType eleSqlType = elementType.getSqlType();
152+
switch (eleSqlType) {
153+
case DATE:
154+
return ((Object[]) v).length * 24;
155+
case TIME:
156+
return ((Object[]) v).length * 12;
157+
case TIMESTAMP:
158+
return ((Object[]) v).length * 48;
159+
default:
160+
throw new UnsupportedOperationException(
161+
"Unsupported type in LocalTimeArrayType: " + eleSqlType);
162+
}
163+
}
164+
145165
return getBytesForArray(v, ((ArrayType) dataType).getElementType());
146166
case MAP:
147167
int size = 0;
@@ -166,7 +186,7 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
166186
}
167187
}
168188

169-
private int getBytesForArray(Object v, BasicType<?> dataType) {
189+
private int getBytesForArray(Object v, SeaTunnelDataType<?> dataType) {
170190
switch (dataType.getSqlType()) {
171191
case STRING:
172192
int s = 0;

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java

+17
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@
3636
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
3737
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
3838
import static org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
39+
import static org.apache.seatunnel.common.exception.CommonErrorCode.OPERATION_NOT_SUPPORTED;
3940
import static org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
4041
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
4142
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
43+
import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
4244
import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
4345

4446
/**
@@ -96,6 +98,13 @@ public static SeaTunnelRuntimeException unsupportedDataType(
9698
return new SeaTunnelRuntimeException(UNSUPPORTED_DATA_TYPE, params);
9799
}
98100

101+
public static SeaTunnelRuntimeException unsupportedVersion(String identifier, String version) {
102+
Map<String, String> params = new HashMap<>();
103+
params.put("identifier", identifier);
104+
params.put("version", version);
105+
return new SeaTunnelRuntimeException(VERSION_NOT_SUPPORTED, params);
106+
}
107+
99108
public static SeaTunnelRuntimeException unsupportedEncoding(String encoding) {
100109
Map<String, String> params = new SingletonMap<>("encoding", encoding);
101110
return new SeaTunnelRuntimeException(UNSUPPORTED_ENCODING, params);
@@ -185,6 +194,14 @@ public static SeaTunnelRuntimeException jsonOperationError(
185194
}
186195
}
187196

197+
public static SeaTunnelRuntimeException unsupportedOperation(
198+
String identifier, String operation) {
199+
Map<String, String> params = new HashMap<>();
200+
params.put("identifier", identifier);
201+
params.put("operation", operation);
202+
return new SeaTunnelRuntimeException(OPERATION_NOT_SUPPORTED, params);
203+
}
204+
188205
public static SeaTunnelRuntimeException sqlTemplateHandledError(
189206
String tableName,
190207
String keyName,

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,14 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
4949
WRITE_SEATUNNEL_ROW_ERROR(
5050
"COMMON-23",
5151
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."),
52+
5253
SQL_TEMPLATE_HANDLED_ERROR(
5354
"COMMON-24",
54-
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template");
55+
"The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"),
56+
57+
VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),
58+
59+
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported.");
5560

5661
private final String code;
5762
private final String description;

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize;
1919

2020
import org.apache.seatunnel.api.table.type.ArrayType;
21-
import org.apache.seatunnel.api.table.type.BasicType;
2221
import org.apache.seatunnel.api.table.type.MapType;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -171,7 +170,7 @@ private AttributeValue convertItem(
171170
return AttributeValue.builder().m(resultMap).build();
172171
case L:
173172
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelDataType;
174-
BasicType<?> elementType = arrayType.getElementType();
173+
SeaTunnelDataType<?> elementType = arrayType.getElementType();
175174
Object[] l = (Object[]) value;
176175
return AttributeValue.builder()
177176
.l(

0 commit comments

Comments
 (0)