From b4c43235c933bb715aa1f901d97bd7b8b5e442d2 Mon Sep 17 00:00:00 2001
From: Omri Fried
Date: Sun, 21 Sep 2025 18:03:45 -0700
Subject: [PATCH 1/5] timestamp handling
---
.../io/jdbc/BaseJdbcAutoSchemaSink.java | 95 ++++++++++++++++++-
.../io/jdbc/PostgresJdbcAutoSchemaSink.java | 42 ++++++++
.../io/jdbc/SqliteJdbcAutoSchemaSink.java | 26 +++++
3 files changed, 162 insertions(+), 1 deletion(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 015a62be8a1ba..882c9f65f1c7d 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -22,6 +22,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -56,6 +59,51 @@ public List getColumnsForUpsert() {
throw new IllegalStateException("UPSERT not supported");
}
+ /**
+ * Handles datetime value binding for database-specific date, time, and timestamp types.
+ *
+ * This method is called when an input value is of datetime or timestamp value.
+ * Implementations should convert the timestamp or datetime value to the appropriate
+ * database-specific type or revert to primitive types. The statement is passed into the function to handle
+ * field conversion and set it accordingly.
+ *
+ *
+ * The method is invoked automatically by {@link #setColumnValue(PreparedStatement, int, Object, String)}
+ * when it detects an input of type java.sql.Timestamp, java.sql.Date, or java.sql.Time.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If not available, convert timestamp values to long. Alternatively use setTimestamp on the PreparedStatement
+ * - If not available, convert date values to int. Alternatively use setDate on the PreparedStatement
+ * - If not available, convert time values to int or long. Alternatively user setTime on the PreparedStatement
+ * - Provide informative logs and wrap JDBC exceptions with contextual information
+ *
+ *
+ *
+ * Example Usage:
+ *
{@code
+ * // For PostgreSQL implementation of timestamps:
+ * if (value instanceof Timestamp) {
+ * statement.setTimestamp(index, (Timestamp) value);
+ * }
+ * }
+ *
+ *
+ * @param statement the PreparedStatement to bind the value to
+ * @param index the parameter index (1-based) in the PreparedStatement
+ * @param value the value to be bound
+ * @param targetSqlType the target SQL type name for the column (e.g., "Timestamp", "Date", "Time")
+ * @return true if the value is handled, false otherwise. Databases that do not support datetime will return false and the value will be bound to its original type.
+ * @throws Exception if conversion or binding fails, including:
+ *
+ * - {@code SQLException} for JDBC creation or binding failures
+ *
+ * @see #setColumnValue(PreparedStatement, int, Object, String)
+ */
+ protected abstract boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception;
+
/**
* Handles array value binding for database-specific array types.
*
@@ -200,7 +248,14 @@ public Mutation createMutation(Record message) {
if (schemaType.isPrimitive()) {
throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType);
}
- recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+ if (schemaType == SchemaType.AVRO || schemaType == SchemaType.JSON) {
+ Map data = new HashMap<>();
+ fillKeyValueSchemaData(message.getSchema(), record, data);
+ recordValueGetter = (k) -> data.get(k);
+ }
+ else {
+ recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+ }
}
String action = message.getProperties().get(ACTION_PROPERTY);
if (action != null) {
@@ -298,6 +353,16 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
return;
}
+ if (value instanceof Timestamp || value instanceof Date || value instanceof Time) {
+ String typeName = value instanceof Timestamp ? "Timestamp"
+ : value instanceof Date ? "Date"
+ : "Time";
+ boolean timestamp_converted = handleDateTime(statement, index, value, typeName);
+ if (timestamp_converted) {
+ return;
+ }
+ }
+
if (value instanceof Integer) {
statement.setInt(index, (Integer) value);
} else if (value instanceof Long) {
@@ -447,7 +512,35 @@ static Object convertAvroField(Object avroValue, Schema schema) {
switch (schema.getType()) {
case NULL:
case INT:
+ if (schema.getLogicalType() != null) {
+ String logicalTypeName = schema.getLogicalType().getName();
+ int time = (Integer) avroValue;
+ if ("time-millis".equals(logicalTypeName)) {
+ return new Time(time);
+ }
+ else if ("date".equals(logicalTypeName)) {
+ return new Date(time);
+ }
+ else {
+ throw new UnsupportedOperationException("Unsupported avro integer logical type=" + logicalTypeName
+ + " for value field schema " + schema.getName());
+ }
+ }
case LONG:
+ if (schema.getLogicalType() != null) {
+ String logicalTypeName = schema.getLogicalType().getName();
+ long timestamp = (Long) avroValue;
+ if ("timestamp-millis".equals(logicalTypeName) || "timestamp-micros".equals(logicalTypeName) || "local-timestamp-millis".equals(logicalTypeName) || "local-timestamp-micros".equals(logicalTypeName)) {
+ return new Timestamp(timestamp);
+ }
+ else if ("time-micros".equals(logicalTypeName)) {
+ return new Time(timestamp);
+ }
+ else {
+ throw new UnsupportedOperationException("Unsupported avro long logical type=" + logicalTypeName
+ + " for value field schema " + schema.getName());
+ }
+ }
case DOUBLE:
case FLOAT:
case BOOLEAN:
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index 2ce2afb57d5b7..5aa5a36fbd3d6 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -23,6 +23,9 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
@@ -805,4 +808,43 @@ private void validateArrayElements(Object[] elements, String postgresArrayType,
+ "bigint, text, boolean, numeric, real, float8, timestamp.");
}
}
+
+ /**
+ * PostgreSQL supports Date, Timestamp, Timestamptz, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp or Timestamp with time zone, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index a579880671339..83e60a01f75f7 100644
--- a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -82,4 +82,30 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by SQLite JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * SQLite does not support native datetime types.
+ *
+ * SQLite does not have native datetime data types like PostgreSQL. These should be stored as strings or
+ * their default numeric types.
+ *
+ *
+ * Alternatives:
+ *
+ * - Use PostgreSQL JDBC sink for native datetime support
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ return false;
+ }
}
From 166c8e87dd3c5781726606653800ce99b7161825 Mon Sep 17 00:00:00 2001
From: Omri Fried
Date: Sun, 21 Sep 2025 18:24:00 -0700
Subject: [PATCH 2/5] Remaining dbs
---
.../io/jdbc/ClickHouseJdbcAutoSchemaSink.java | 40 ++++++++++++++++++
.../io/jdbc/MariadbJdbcAutoSchemaSink.java | 42 +++++++++++++++++++
.../io/jdbc/OpenMLDBJdbcAutoSchemaSink.java | 42 +++++++++++++++++++
3 files changed, 124 insertions(+)
diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
index 36a0b3210bd7a..2c3921e040f9b 100644
--- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.io.jdbc;
import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -65,4 +67,42 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by ClickHouse JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * ClickHouse supports various Date types.
+ *
+ * Inputs are converted to the appropriate Date type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it not supported
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ return false;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 3302312b7122f..d0f714d61f300 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -21,6 +21,9 @@
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -76,4 +79,43 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by MariaDB JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * MariaDB supports Date, Timestamp, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
index 0b7028a2417c8..3732d748c0670 100644
--- a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.io.jdbc;
import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -60,4 +63,43 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by OpenMLDB JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * OpenMLDB supports Date, Timestamp, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
From d07fdeebfbdbd07a32fd1fc3782f092ea579878e Mon Sep 17 00:00:00 2001
From: Omri Fried
Date: Sun, 21 Sep 2025 18:47:35 -0700
Subject: [PATCH 3/5] add tests
---
.../io/jdbc/ClickHouseJdbcAutoSchemaSink.java | 3 +
.../jdbc/ClickHouseDateTimeSupportTest.java | 93 +++++++++++++++++
.../io/jdbc/BaseJdbcAutoSchemaSink.java | 3 +-
.../io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 30 ++++++
.../io/jdbc/MariadbJdbcAutoSchemaSink.java | 3 +
.../io/jdbc/MariadbDateTimeSupportTest.java | 94 ++++++++++++++++++
.../io/jdbc/OpenMLDBJdbcAutoSchemaSink.java | 3 +
.../io/jdbc/OpenMLDBDateTimeSupportTest.java | 94 ++++++++++++++++++
.../io/jdbc/PostgresJdbcAutoSchemaSink.java | 3 +
.../io/jdbc/PostgresDateTimeSupportTest.java | 99 +++++++++++++++++++
.../io/jdbc/SqliteDateTimeSupportTest.java | 89 +++++++++++++++++
11 files changed, 512 insertions(+), 2 deletions(-)
create mode 100644 pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
create mode 100644 pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
create mode 100644 pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
create mode 100644 pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
create mode 100644 pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
index 2c3921e040f9b..2ea1844f567f3 100644
--- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
@@ -92,6 +92,9 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
switch (targetSqlType) {
case "Timestamp":
statement.setTimestamp(index, (Timestamp) value);
diff --git a/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java b/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
new file mode 100644
index 0000000000000..4fe138dc7d91f
--- /dev/null
+++ b/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ClickHouseDateTimeSupportTest {
+
+ private ClickHouseJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new ClickHouseJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandlingNotSupported() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 882c9f65f1c7d..ef7cc76dc1f80 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -409,8 +409,7 @@ private static Object getValueFromJsonNode(final JsonNode fn) {
return null;
}
if (fn.isContainerNode()) {
- throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only "
- + "first level fields.");
+ return fn.toString();
} else if (fn.isBoolean()) {
return fn.asBoolean();
} else if (fn.isFloatingPointNumber()) {
diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index 3979ee79ef84f..823fd9e719153 100644
--- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -217,6 +217,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
@@ -244,6 +250,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
Field field = JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
@@ -293,6 +305,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
// Test all primitive type conversions still work
@@ -337,6 +355,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
PreparedStatement mockStatement = mock(PreparedStatement.class);
@@ -363,6 +387,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
PreparedStatement mockStatement = mock(PreparedStatement.class);
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index d0f714d61f300..6c81548abf91c 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -104,6 +104,9 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
switch (targetSqlType) {
case "Timestamp":
statement.setTimestamp(index, (Timestamp) value);
diff --git a/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
new file mode 100644
index 0000000000000..2766fa731e159
--- /dev/null
+++ b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MariadbDateTimeSupportTest {
+
+ private MariadbJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new MariadbJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
index 3732d748c0670..a2ba66e103c7c 100644
--- a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
@@ -88,6 +88,9 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
switch (targetSqlType) {
case "Timestamp":
statement.setTimestamp(index, (Timestamp) value);
diff --git a/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
new file mode 100644
index 0000000000000..746730fd27633
--- /dev/null
+++ b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenMLDBDateTimeSupportTest {
+
+ private OpenMLDBJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new OpenMLDBJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index 5aa5a36fbd3d6..f59b9c809f835 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -833,6 +833,9 @@ private void validateArrayElements(Object[] elements, String postgresArrayType,
@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
switch (targetSqlType) {
case "Timestamp":
statement.setTimestamp(index, (Timestamp) value);
diff --git a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
new file mode 100644
index 0000000000000..7cb27f8760e33
--- /dev/null
+++ b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PostgresDateTimeSupportTest {
+
+ private PostgresJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+ private Connection mockConnection;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new PostgresJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ mockConnection = mock(Connection.class);
+
+ PostgresArrayTestConfig.configureSinkWithConnection(sink, mockConnection);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
new file mode 100644
index 0000000000000..8abf09d48eb0e
--- /dev/null
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+
+import java.sql.PreparedStatement;
+import java.sql.Timestamp;
+import java.sql.Date;
+import java.sql.Time;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SqliteDateTimeSupportTest {
+
+ private SqliteJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new SqliteJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandlingNotSupported() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testDateHandlingNotSupported() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testTimeHandlingNotSupported() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
From dad6c1e24488f63ed02a7980b1c41be36c8cfcb8 Mon Sep 17 00:00:00 2001
From: Omri Fried
Date: Sun, 28 Sep 2025 17:06:41 -0700
Subject: [PATCH 4/5] linting
---
.../io/jdbc/ClickHouseJdbcAutoSchemaSink.java | 2 +-
.../jdbc/ClickHouseDateTimeSupportTest.java | 7 +--
.../io/jdbc/BaseJdbcAutoSchemaSink.java | 56 ++++++++++---------
.../io/jdbc/MariadbJdbcAutoSchemaSink.java | 6 +-
.../io/jdbc/MariadbDateTimeSupportTest.java | 7 +--
.../io/jdbc/OpenMLDBJdbcAutoSchemaSink.java | 10 ++--
.../io/jdbc/OpenMLDBDateTimeSupportTest.java | 7 +--
.../io/jdbc/PostgresJdbcAutoSchemaSink.java | 6 +-
.../io/jdbc/PostgresArraySupportTest.java | 1 +
.../io/jdbc/PostgresDateTimeSupportTest.java | 7 +--
.../io/jdbc/SqliteDateTimeSupportTest.java | 5 +-
11 files changed, 58 insertions(+), 56 deletions(-)
diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
index 2ea1844f567f3..81a27ab678d68 100644
--- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
-import java.sql.Date;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
diff --git a/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java b/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
index 4fe138dc7d91f..bd555cbc3e69a 100644
--- a/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
+++ b/pulsar-io/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseDateTimeSupportTest.java
@@ -20,13 +20,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
-
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
+import static org.testng.Assert.assertTrue;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index ef7cc76dc1f80..c0c1f95e5e049 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -21,10 +21,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -62,7 +62,7 @@ public List getColumnsForUpsert() {
/**
* Handles datetime value binding for database-specific date, time, and timestamp types.
*
- * This method is called when an input value is of datetime or timestamp value.
+ * This method is called when an input value is of datetime or timestamp value.
* Implementations should convert the timestamp or datetime value to the appropriate
* database-specific type or revert to primitive types. The statement is passed into the function to handle
* field conversion and set it accordingly.
@@ -74,9 +74,12 @@ public List getColumnsForUpsert() {
*
* Implementation Guidelines:
*
- * - If not available, convert timestamp values to long. Alternatively use setTimestamp on the PreparedStatement
- * - If not available, convert date values to int. Alternatively use setDate on the PreparedStatement
- * - If not available, convert time values to int or long. Alternatively user setTime on the PreparedStatement
+ * - If not available, convert timestamp values to long. Alternatively use setTimestamp on the
+ * PreparedStatement
+ * - If not available, convert date values to int. Alternatively use setDate on the
+ * PreparedStatement
+ * - If not available, convert time values to int or long. Alternatively user setTime on the
+ * PreparedStatement
* - Provide informative logs and wrap JDBC exceptions with contextual information
*
*
@@ -94,7 +97,8 @@ public List getColumnsForUpsert() {
* @param index the parameter index (1-based) in the PreparedStatement
* @param value the value to be bound
* @param targetSqlType the target SQL type name for the column (e.g., "Timestamp", "Date", "Time")
- * @return true if the value is handled, false otherwise. Databases that do not support datetime will return false and the value will be bound to its original type.
+ * @return true if the value is handled, false otherwise. Databases that do not support datetime will return
+ * false and the value will be bound to its original type.
* @throws Exception if conversion or binding fails, including:
*
* - {@code SQLException} for JDBC creation or binding failures
@@ -252,8 +256,7 @@ public Mutation createMutation(Record message) {
Map data = new HashMap<>();
fillKeyValueSchemaData(message.getSchema(), record, data);
recordValueGetter = (k) -> data.get(k);
- }
- else {
+ } else {
recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
}
}
@@ -354,11 +357,11 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
}
if (value instanceof Timestamp || value instanceof Date || value instanceof Time) {
- String typeName = value instanceof Timestamp ? "Timestamp"
- : value instanceof Date ? "Date"
+ String typeName = value instanceof Timestamp ? "Timestamp"
+ : value instanceof Date ? "Date"
: "Time";
- boolean timestamp_converted = handleDateTime(statement, index, value, typeName);
- if (timestamp_converted) {
+ boolean timestampConverted = handleDateTime(statement, index, value, typeName);
+ if (timestampConverted) {
return;
}
}
@@ -516,30 +519,31 @@ static Object convertAvroField(Object avroValue, Schema schema) {
int time = (Integer) avroValue;
if ("time-millis".equals(logicalTypeName)) {
return new Time(time);
- }
- else if ("date".equals(logicalTypeName)) {
+ } else if ("date".equals(logicalTypeName)) {
return new Date(time);
- }
- else {
- throw new UnsupportedOperationException("Unsupported avro integer logical type=" + logicalTypeName
- + " for value field schema " + schema.getName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported avro integer logical type="
+ + logicalTypeName + " for value field schema " + schema.getName());
}
}
+ break;
case LONG:
if (schema.getLogicalType() != null) {
String logicalTypeName = schema.getLogicalType().getName();
long timestamp = (Long) avroValue;
- if ("timestamp-millis".equals(logicalTypeName) || "timestamp-micros".equals(logicalTypeName) || "local-timestamp-millis".equals(logicalTypeName) || "local-timestamp-micros".equals(logicalTypeName)) {
+ if ("timestamp-millis".equals(logicalTypeName)
+ || "timestamp-micros".equals(logicalTypeName)
+ || "local-timestamp-millis".equals(logicalTypeName)
+ || "local-timestamp-micros".equals(logicalTypeName)) {
return new Timestamp(timestamp);
- }
- else if ("time-micros".equals(logicalTypeName)) {
+ } else if ("time-micros".equals(logicalTypeName)) {
return new Time(timestamp);
- }
- else {
- throw new UnsupportedOperationException("Unsupported avro long logical type=" + logicalTypeName
- + " for value field schema " + schema.getName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported avro long logical type="
+ + logicalTypeName + " for value field schema " + schema.getName());
}
}
+ break;
case DOUBLE:
case FLOAT:
case BOOLEAN:
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 6c81548abf91c..2c0c1080bb26c 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Date;
import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
-import java.sql.Timestamp;
-import java.sql.Date;
-import java.sql.Time;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
diff --git a/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
index 2766fa731e159..466e683a97dc7 100644
--- a/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
+++ b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
@@ -20,13 +20,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
-
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
+import static org.testng.Assert.assertTrue;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
index a2ba66e103c7c..d3d8e39adcfd9 100644
--- a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.io.jdbc;
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -36,7 +36,8 @@ public class OpenMLDBJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
/**
* OpenMLDB does not support native array types.
*
- * OpenMLDB is focused on time-series and real-time analytics workloads and does not
+ * OpenMLDB is focused on time-series and real-time analytics workloads and does
+ * not
* provide native array data types like PostgreSQL. This implementation does not
* provide automatic array conversion to maintain consistency with OpenMLDB's
* data model and type system.
@@ -55,7 +56,8 @@ public class OpenMLDBJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
* @param index the parameter index (not used)
* @param arrayValue the array value (not used)
* @param targetSqlType the target SQL type (not used)
- * @throws UnsupportedOperationException always thrown as OpenMLDB doesn't support arrays
+ * @throws UnsupportedOperationException always thrown as OpenMLDB doesn't
+ * support arrays
*/
@Override
protected void handleArrayValue(PreparedStatement statement, int index, Object arrayValue, String targetSqlType)
diff --git a/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
index 746730fd27633..a38be16c609ac 100644
--- a/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
+++ b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
@@ -20,13 +20,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
-
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
+import static org.testng.Assert.assertTrue;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index f59b9c809f835..8446afa7ef677 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -19,13 +19,13 @@
package org.apache.pulsar.io.jdbc;
import java.sql.Array;
+import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
diff --git a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
index 5cda22b576ccb..101ea67ff1fd1 100644
--- a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
+++ b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
@@ -17,6 +17,7 @@
* under the License.
*/
package org.apache.pulsar.io.jdbc;
+
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doThrow;
diff --git a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
index 7cb27f8760e33..a90bd39cb7fb2 100644
--- a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
+++ b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
@@ -20,14 +20,13 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
-
+import static org.testng.Assert.assertTrue;
import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
index 8abf09d48eb0e..ffcb69317bbe4 100644
--- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
@@ -20,11 +20,10 @@
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
-
-import java.sql.PreparedStatement;
-import java.sql.Timestamp;
import java.sql.Date;
+import java.sql.PreparedStatement;
import java.sql.Time;
+import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
From 42d31c0735e78f5e968b3205048b4af6cea9a492 Mon Sep 17 00:00:00 2001
From: Omri Fried
Date: Fri, 3 Oct 2025 15:39:35 -0700
Subject: [PATCH 5/5] fixes
---
.../java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index c0c1f95e5e049..b0b78571f6e52 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -526,7 +526,6 @@ static Object convertAvroField(Object avroValue, Schema schema) {
+ logicalTypeName + " for value field schema " + schema.getName());
}
}
- break;
case LONG:
if (schema.getLogicalType() != null) {
String logicalTypeName = schema.getLogicalType().getName();
@@ -543,7 +542,6 @@ static Object convertAvroField(Object avroValue, Schema schema) {
+ logicalTypeName + " for value field schema " + schema.getName());
}
}
- break;
case DOUBLE:
case FLOAT:
case BOOLEAN: