diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index aa1e46d2837..5b97c12c31c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -45,6 +45,7 @@ /** use in elasticsearch version >= 2.x and <= 8.x */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { + private final SeaTunnelRowType seaTunnelRowType; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -201,49 +202,49 @@ private Map toDocumentMap(SeaTunnelRow row, SeaTunnelRowType row } private Object convertValue(String fieldName, Object value) { + if (value == null) { + return null; + } + if (value instanceof Temporal) { // jackson not support jdk8 new time api return value.toString(); - } else if (value instanceof Map) { + } + + if (value instanceof Map) { for (Map.Entry entry : ((Map) value).entrySet()) { ((Map) value).put(entry.getKey(), convertValue(fieldName, entry.getValue())); } return value; - } else if (value instanceof List) { + } + + if (value instanceof List) { for (int i = 0; i < ((List) value).size(); i++) { ((List) value).set(i, convertValue(fieldName, ((List) value).get(i))); } return value; - } else if (value instanceof ByteBuffer) { - // Check if this field is configured as a vectorization field - if (vectorizationFields != null && vectorizationFields.contains(fieldName)) { - ByteBuffer buffer = (ByteBuffer) value; - Float[] floats = VectorUtils.toFloatArray(buffer); - - // Use the configured dimension or calculate it from the buffer size - int dimension = vectorDimension > 0 ? vectorDimension : buffer.remaining() / 4; - - // Read the floats from the buffer - for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) { - floats[i] = buffer.getFloat(); - } + } - return floats; - } else { - // Default behavior for ByteBuffer fields not specified as vectorization fields - ByteBuffer buffer = (ByteBuffer) value; - Float[] floats = VectorUtils.toFloatArray(buffer); - int floatCount = buffer.remaining() / 4; + if (value instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) value; + Float[] floats = VectorUtils.toFloatArray(buffer); - for (int i = 0; i < floatCount; i++) { - floats[i] = buffer.getFloat(); - } + // Use configured dimension for vectorization fields, otherwise calculate from buffer + int dimension = + (vectorizationFields != null + && vectorizationFields.contains(fieldName) + && vectorDimension > 0) + ? vectorDimension + : buffer.remaining() / 4; - return floats; + for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) { + floats[i] = buffer.getFloat(); } - } else { - return value; + + return floats; } + + return value; } private Map createMetadata(@NonNull SeaTunnelRow row, @NonNull String key) { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java index 2131bdc942b..6f95a3706de 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -184,4 +185,33 @@ public void testSerializeDelete() { String upsertStr = serializer.serializeRow(row); Assertions.assertEquals(expected, upsertStr); } + + @Test + public void testSerializeLocalDateTimeFieldFormat() { + String index = "st_index"; + Map confMap = new HashMap<>(); + confMap.put(ElasticsearchSinkOptions.INDEX.key(), index); + + ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap); + ElasticsearchClusterInfo clusterInfo = + ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build(); + IndexInfo indexInfo = new IndexInfo(index, pluginConf); + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"id", "ts"}, + new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE}); + + final ElasticsearchRowSerializer serializer = + new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema); + + String id = "0001"; + LocalDateTime ts = LocalDateTime.of(2023, 1, 2, 3, 4, 5); + SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, ts}); + row.setRowKind(RowKind.UPDATE_AFTER); + + String result = serializer.serializeRow(row); + Assertions.assertTrue( + result.contains("\"ts\":\"2023-01-02T03:04:05\""), + "LocalDateTime field should be formatted with ISO-8601 'T' separator"); + } }