From 9071af796a1c8692ad36ef23aea9412c9ccb2d42 Mon Sep 17 00:00:00 2001 From: zengyi Date: Mon, 1 Dec 2025 15:27:43 +0800 Subject: [PATCH 1/6] [Fix][Elasticsearch] Format LocalDateTime timestamps with ES-compatible pattern --- .../serialize/ElasticsearchRowSerializer.java | 8 +++++ .../ElasticsearchRowSerializerTest.java | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index aa1e46d2837..d85988ae7a1 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -36,6 +36,8 @@ import lombok.NonNull; import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.time.temporal.Temporal; import java.util.Collections; import java.util.HashMap; @@ -45,6 +47,9 @@ /** use in elasticsearch version >= 2.x and <= 8.x */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { + private static final DateTimeFormatter LOCAL_DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private final SeaTunnelRowType seaTunnelRowType; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -203,6 +208,9 @@ private Map toDocumentMap(SeaTunnelRow row, SeaTunnelRowType row private Object convertValue(String fieldName, Object value) { if (value instanceof Temporal) { // jackson not support jdk8 new time api + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).format(LOCAL_DATE_TIME_FORMATTER); + } return value.toString(); } else if (value instanceof Map) { for (Map.Entry entry : ((Map) value).entrySet()) { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java index 2131bdc942b..e55d91242cb 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -184,4 +185,33 @@ public void testSerializeDelete() { String upsertStr = serializer.serializeRow(row); Assertions.assertEquals(expected, upsertStr); } + + @Test + public void testSerializeLocalDateTimeFieldFormat() { + String index = "st_index"; + Map confMap = new HashMap<>(); + confMap.put(ElasticsearchSinkOptions.INDEX.key(), index); + + ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(index, pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"id", "ts"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + LocalDateTime ts = LocalDateTime.of(2023, 1, 2, 3, 4, 5); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, ts}); + row.setRowKind(RowKind.UPDATE_AFTER); + + String result = serializer.serializeRow(row); + Assertions.assertTrue( + result.contains("\"ts\":\"2023-01-02 03:04:05\""), + "LocalDateTime field should be formatted with space separator"); + } } From 4d87866a53eb47c00599c9ec1eef766b07f1c26d Mon Sep 17 00:00:00 2001 From: zengyi Date: Tue, 2 Dec 2025 09:02:40 +0800 Subject: [PATCH 2/6] [Fix][Elasticsearch] increase timeout --- .github/workflows/backend.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 07e19fc13b5..152eb1efa9b 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -387,7 +387,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 180 + timeout-minutes: 210 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} From f1ef5ef18801d4990a4d95d46e2b556f779d606e Mon Sep 17 00:00:00 2001 From: zengyi Date: Wed, 3 Dec 2025 11:35:19 +0800 Subject: [PATCH 3/6] [Fix][Elasticsearch] restore --- .github/workflows/backend.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 152eb1efa9b..07e19fc13b5 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -387,7 +387,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 210 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} From 00c4433666eecaec43af14b8f20780d671167e3f Mon Sep 17 00:00:00 2001 From: zengyi Date: Wed, 3 Dec 2025 16:37:34 +0800 Subject: [PATCH 4/6] [Fix][Elasticsearch] Format LocalDate/LocalDateTime with ES-compatible pattern --- .../serialize/ElasticsearchRowSerializer.java | 9 ++++++++- .../serialize/ElasticsearchRowSerializerTest.java | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index d85988ae7a1..e1c367b65e9 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -36,6 +36,7 @@ import lombok.NonNull; import java.nio.ByteBuffer; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.Temporal; @@ -47,8 +48,11 @@ /** use in elasticsearch version >= 2.x and <= 8.x */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { + // Use ISO 8601 format which is compatible with Elasticsearch's strict_date_optional_time private static final DateTimeFormatter LOCAL_DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); + private static final DateTimeFormatter LOCAL_DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd"); private final SeaTunnelRowType seaTunnelRowType; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -209,7 +213,10 @@ private Object convertValue(String fieldName, Object value) { if (value instanceof Temporal) { // jackson not support jdk8 new time api if (value instanceof LocalDateTime) { + // Use ISO 8601 format compatible with Elasticsearch's strict_date_optional_time return ((LocalDateTime) value).format(LOCAL_DATE_TIME_FORMATTER); + } else if (value instanceof LocalDate) { + return ((LocalDate) value).format(LOCAL_DATE_FORMATTER); } return value.toString(); } else if (value instanceof Map) { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java index e55d91242cb..6f95a3706de 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java @@ -211,7 +211,7 @@ public void testSerializeLocalDateTimeFieldFormat() { String result = serializer.serializeRow(row); Assertions.assertTrue( - result.contains("\"ts\":\"2023-01-02 03:04:05\""), - "LocalDateTime field should be formatted with space separator"); + result.contains("\"ts\":\"2023-01-02T03:04:05\""), + "LocalDateTime field should be formatted with ISO-8601 'T' separator"); } } From d55c168de228e42742187acb886b2cc6494121a2 Mon Sep 17 00:00:00 2001 From: zengyi Date: Thu, 4 Dec 2025 11:18:32 +0800 Subject: [PATCH 5/6] [Fix][Elasticsearch] Align date/time serialization with ES and e2e parsing --- .../serialize/ElasticsearchRowSerializer.java | 57 ++++++++++--------- .../elasticsearch/ElasticsearchIT.java | 26 ++++++--- 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index e1c367b65e9..f4558e4ea45 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -210,55 +210,56 @@ private Map toDocumentMap(SeaTunnelRow row, SeaTunnelRowType row } private Object convertValue(String fieldName, Object value) { + if (value == null) { + return null; + } + if (value instanceof Temporal) { // jackson not support jdk8 new time api if (value instanceof LocalDateTime) { // Use ISO 8601 format compatible with Elasticsearch's strict_date_optional_time return ((LocalDateTime) value).format(LOCAL_DATE_TIME_FORMATTER); - } else if (value instanceof LocalDate) { + } + if (value instanceof LocalDate) { return ((LocalDate) value).format(LOCAL_DATE_FORMATTER); } return value.toString(); - } else if (value instanceof Map) { + } + + if (value instanceof Map) { for (Map.Entry entry : ((Map) value).entrySet()) { ((Map) value).put(entry.getKey(), convertValue(fieldName, entry.getValue())); } return value; - } else if (value instanceof List) { + } + + if (value instanceof List) { for (int i = 0; i < ((List) value).size(); i++) { ((List) value).set(i, convertValue(fieldName, ((List) value).get(i))); } return value; - } else if (value instanceof ByteBuffer) { - // Check if this field is configured as a vectorization field - if (vectorizationFields != null && vectorizationFields.contains(fieldName)) { - ByteBuffer buffer = (ByteBuffer) value; - Float[] floats = VectorUtils.toFloatArray(buffer); - - // Use the configured dimension or calculate it from the buffer size - int dimension = vectorDimension > 0 ? vectorDimension : buffer.remaining() / 4; - - // Read the floats from the buffer - for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) { - floats[i] = buffer.getFloat(); - } + } - return floats; - } else { - // Default behavior for ByteBuffer fields not specified as vectorization fields - ByteBuffer buffer = (ByteBuffer) value; - Float[] floats = VectorUtils.toFloatArray(buffer); - int floatCount = buffer.remaining() / 4; + if (value instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) value; + Float[] floats = VectorUtils.toFloatArray(buffer); - for (int i = 0; i < floatCount; i++) { - floats[i] = buffer.getFloat(); - } + // Use configured dimension for vectorization fields, otherwise calculate from buffer + int dimension = + (vectorizationFields != null + && vectorizationFields.contains(fieldName) + && vectorDimension > 0) + ? vectorDimension + : buffer.remaining() / 4; - return floats; + for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) { + floats[i] = buffer.getFloat(); } - } else { - return value; + + return floats; } + + return value; } private Map createMetadata(@NonNull SeaTunnelRow row, @NonNull String key) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 2479be67b02..4804af474dd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -67,6 +67,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -90,6 +91,12 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { private static final long INDEX_REFRESH_MILL_DELAY = 5000L; + private static final DateTimeFormatter DATE_TIME_MINUTE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm"); + + private static final DateTimeFormatter DATE_TIME_SECOND_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); + private List testDataset1; private List testDataset2; @@ -857,11 +864,7 @@ private List getDocsWithTransformDate( } x.replace( "c_date", - LocalDate.parse( - x.get("c_date").toString(), - DateTimeFormatter.ofPattern( - "yyyy-MM-dd'T'HH:mm")) - .toString()); + parseToLocalDate(x.get("c_date").toString()).toString()); }); List docs = scrollResult.getDocs().stream() @@ -908,10 +911,7 @@ private List getDocsWithTransformDate( if (x.containsKey(dateField)) { x.replace( dateField, - LocalDate.parse( - x.get(dateField).toString(), - DateTimeFormatter.ofPattern( - "yyyy-MM-dd'T'HH:mm")) + parseToLocalDate(x.get(dateField).toString()) .toString()); } } @@ -982,6 +982,14 @@ private List mapTestDatasetForDSL( .collect(Collectors.toList()); } + private LocalDate parseToLocalDate(String value) { + try { + return LocalDateTime.parse(value, DATE_TIME_SECOND_FORMATTER).toLocalDate(); + } catch (DateTimeParseException e) { + return LocalDateTime.parse(value, DATE_TIME_MINUTE_FORMATTER).toLocalDate(); + } + } + @AfterEach @Override public void tearDown() { From c1191efbe10130bd7bd5952bfde7818b37fb97b3 Mon Sep 17 00:00:00 2001 From: zengyi Date: Fri, 5 Dec 2025 10:27:54 +0800 Subject: [PATCH 6/6] [Fix][Elasticsearch] Revert custom date formatting and add LocalDateTime test --- .../serialize/ElasticsearchRowSerializer.java | 15 ----------- .../elasticsearch/ElasticsearchIT.java | 26 +++++++------------ 2 files changed, 9 insertions(+), 32 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index f4558e4ea45..5b97c12c31c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -36,9 +36,6 @@ import lombok.NonNull; import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.time.temporal.Temporal; import java.util.Collections; import java.util.HashMap; @@ -48,11 +45,6 @@ /** use in elasticsearch version >= 2.x and <= 8.x */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { - // Use ISO 8601 format which is compatible with Elasticsearch's strict_date_optional_time - private static final DateTimeFormatter LOCAL_DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); - private static final DateTimeFormatter LOCAL_DATE_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd"); private final SeaTunnelRowType seaTunnelRowType; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -216,13 +208,6 @@ private Object convertValue(String fieldName, Object value) { if (value instanceof Temporal) { // jackson not support jdk8 new time api - if (value instanceof LocalDateTime) { - // Use ISO 8601 format compatible with Elasticsearch's strict_date_optional_time - return ((LocalDateTime) value).format(LOCAL_DATE_TIME_FORMATTER); - } - if (value instanceof LocalDate) { - return ((LocalDate) value).format(LOCAL_DATE_FORMATTER); - } return value.toString(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 4804af474dd..2479be67b02 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -67,7 +67,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -91,12 +90,6 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { private static final long INDEX_REFRESH_MILL_DELAY = 5000L; - private static final DateTimeFormatter DATE_TIME_MINUTE_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm"); - - private static final DateTimeFormatter DATE_TIME_SECOND_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); - private List testDataset1; private List testDataset2; @@ -864,7 +857,11 @@ private List getDocsWithTransformDate( } x.replace( "c_date", - parseToLocalDate(x.get("c_date").toString()).toString()); + LocalDate.parse( + x.get("c_date").toString(), + DateTimeFormatter.ofPattern( + "yyyy-MM-dd'T'HH:mm")) + .toString()); }); List docs = scrollResult.getDocs().stream() @@ -911,7 +908,10 @@ private List getDocsWithTransformDate( if (x.containsKey(dateField)) { x.replace( dateField, - parseToLocalDate(x.get(dateField).toString()) + LocalDate.parse( + x.get(dateField).toString(), + DateTimeFormatter.ofPattern( + "yyyy-MM-dd'T'HH:mm")) .toString()); } } @@ -982,14 +982,6 @@ private List mapTestDatasetForDSL( .collect(Collectors.toList()); } - private LocalDate parseToLocalDate(String value) { - try { - return LocalDateTime.parse(value, DATE_TIME_SECOND_FORMATTER).toLocalDate(); - } catch (DateTimeParseException e) { - return LocalDateTime.parse(value, DATE_TIME_MINUTE_FORMATTER).toLocalDate(); - } - } - @AfterEach @Override public void tearDown() {