diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
index 36a0b3210bd7a..81a27ab678d68 100644
--- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Date;
import java.sql.PreparedStatement;
+import java.sql.Timestamp;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -65,4 +67,45 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by ClickHouse JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * ClickHouse supports various Date types.
+ *
getColumnsForUpsert() {
throw new IllegalStateException("UPSERT not supported");
}
+ /**
+ * Handles datetime value binding for database-specific date, time, and timestamp types.
+ *
+ * This method is called when an input value is of datetime or timestamp value.
+ * Implementations should convert the timestamp or datetime value to the appropriate
+ * database-specific type or revert to primitive types. The statement is passed into the function to handle
+ * field conversion and set it accordingly.
+ *
+ *
+ * The method is invoked automatically by {@link #setColumnValue(PreparedStatement, int, Object, String)}
+ * when it detects an input of type java.sql.Timestamp, java.sql.Date, or java.sql.Time.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If not available, convert timestamp values to long. Alternatively use setTimestamp on the
+ * PreparedStatement
+ * - If not available, convert date values to int. Alternatively use setDate on the
+ * PreparedStatement
+ * - If not available, convert time values to int or long. Alternatively user setTime on the
+ * PreparedStatement
+ * - Provide informative logs and wrap JDBC exceptions with contextual information
+ *
+ *
+ *
+ * Example Usage:
+ *
{@code
+ * // For PostgreSQL implementation of timestamps:
+ * if (value instanceof Timestamp) {
+ * statement.setTimestamp(index, (Timestamp) value);
+ * }
+ * }
+ *
+ *
+ * @param statement the PreparedStatement to bind the value to
+ * @param index the parameter index (1-based) in the PreparedStatement
+ * @param value the value to be bound
+ * @param targetSqlType the target SQL type name for the column (e.g., "Timestamp", "Date", "Time")
+ * @return true if the value is handled, false otherwise. Databases that do not support datetime will return
+ * false and the value will be bound to its original type.
+ * @throws Exception if conversion or binding fails, including:
+ *
+ * - {@code SQLException} for JDBC creation or binding failures
+ *
+ * @see #setColumnValue(PreparedStatement, int, Object, String)
+ */
+ protected abstract boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception;
+
/**
* Handles array value binding for database-specific array types.
*
@@ -200,7 +252,13 @@ public Mutation createMutation(Record message) {
if (schemaType.isPrimitive()) {
throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType);
}
- recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+ if (schemaType == SchemaType.AVRO || schemaType == SchemaType.JSON) {
+ Map data = new HashMap<>();
+ fillKeyValueSchemaData(message.getSchema(), record, data);
+ recordValueGetter = (k) -> data.get(k);
+ } else {
+ recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
+ }
}
String action = message.getProperties().get(ACTION_PROPERTY);
if (action != null) {
@@ -298,6 +356,16 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
return;
}
+ if (value instanceof Timestamp || value instanceof Date || value instanceof Time) {
+ String typeName = value instanceof Timestamp ? "Timestamp"
+ : value instanceof Date ? "Date"
+ : "Time";
+ boolean timestampConverted = handleDateTime(statement, index, value, typeName);
+ if (timestampConverted) {
+ return;
+ }
+ }
+
if (value instanceof Integer) {
statement.setInt(index, (Integer) value);
} else if (value instanceof Long) {
@@ -344,8 +412,7 @@ private static Object getValueFromJsonNode(final JsonNode fn) {
return null;
}
if (fn.isContainerNode()) {
- throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only "
- + "first level fields.");
+ return fn.toString();
} else if (fn.isBoolean()) {
return fn.asBoolean();
} else if (fn.isFloatingPointNumber()) {
@@ -447,7 +514,34 @@ static Object convertAvroField(Object avroValue, Schema schema) {
switch (schema.getType()) {
case NULL:
case INT:
+ if (schema.getLogicalType() != null) {
+ String logicalTypeName = schema.getLogicalType().getName();
+ int time = (Integer) avroValue;
+ if ("time-millis".equals(logicalTypeName)) {
+ return new Time(time);
+ } else if ("date".equals(logicalTypeName)) {
+ return new Date(time);
+ } else {
+ throw new UnsupportedOperationException("Unsupported avro integer logical type="
+ + logicalTypeName + " for value field schema " + schema.getName());
+ }
+ }
case LONG:
+ if (schema.getLogicalType() != null) {
+ String logicalTypeName = schema.getLogicalType().getName();
+ long timestamp = (Long) avroValue;
+ if ("timestamp-millis".equals(logicalTypeName)
+ || "timestamp-micros".equals(logicalTypeName)
+ || "local-timestamp-millis".equals(logicalTypeName)
+ || "local-timestamp-micros".equals(logicalTypeName)) {
+ return new Timestamp(timestamp);
+ } else if ("time-micros".equals(logicalTypeName)) {
+ return new Time(timestamp);
+ } else {
+ throw new UnsupportedOperationException("Unsupported avro long logical type="
+ + logicalTypeName + " for value field schema " + schema.getName());
+ }
+ }
case DOUBLE:
case FLOAT:
case BOOLEAN:
diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index 3979ee79ef84f..823fd9e719153 100644
--- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -217,6 +217,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
@@ -244,6 +250,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
Field field = JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
@@ -293,6 +305,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
// Test all primitive type conversions still work
@@ -337,6 +355,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
PreparedStatement mockStatement = mock(PreparedStatement.class);
@@ -363,6 +387,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}
+
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
+ String targetSqlType) throws Exception {
+ return false;
+ }
};
PreparedStatement mockStatement = mock(PreparedStatement.class);
diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
index 3302312b7122f..2c0c1080bb26c 100644
--- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Date;
import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import org.apache.pulsar.io.core.annotations.Connector;
@@ -76,4 +79,46 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by MariaDB JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * MariaDB supports Date, Timestamp, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
new file mode 100644
index 0000000000000..466e683a97dc7
--- /dev/null
+++ b/pulsar-io/jdbc/mariadb/src/test/java/org/apache/pulsar/io/jdbc/MariadbDateTimeSupportTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MariadbDateTimeSupportTest {
+
+ private MariadbJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new MariadbJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
index 0b7028a2417c8..d3d8e39adcfd9 100644
--- a/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/openmldb/src/main/java/org/apache/pulsar/io/jdbc/OpenMLDBJdbcAutoSchemaSink.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Date;
import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
@@ -33,7 +36,8 @@ public class OpenMLDBJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
/**
* OpenMLDB does not support native array types.
*
- * OpenMLDB is focused on time-series and real-time analytics workloads and does not
+ * OpenMLDB is focused on time-series and real-time analytics workloads and does
+ * not
* provide native array data types like PostgreSQL. This implementation does not
* provide automatic array conversion to maintain consistency with OpenMLDB's
* data model and type system.
@@ -52,7 +56,8 @@ public class OpenMLDBJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
* @param index the parameter index (not used)
* @param arrayValue the array value (not used)
* @param targetSqlType the target SQL type (not used)
- * @throws UnsupportedOperationException always thrown as OpenMLDB doesn't support arrays
+ * @throws UnsupportedOperationException always thrown as OpenMLDB doesn't
+ * support arrays
*/
@Override
protected void handleArrayValue(PreparedStatement statement, int index, Object arrayValue, String targetSqlType)
@@ -60,4 +65,46 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by OpenMLDB JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * OpenMLDB supports Date, Timestamp, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
new file mode 100644
index 0000000000000..a38be16c609ac
--- /dev/null
+++ b/pulsar-io/jdbc/openmldb/src/test/java/org/apache/pulsar/io/jdbc/OpenMLDBDateTimeSupportTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenMLDBDateTimeSupportTest {
+
+ private OpenMLDBJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new OpenMLDBJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
index 2ce2afb57d5b7..8446afa7ef677 100644
--- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.io.jdbc;
import java.sql.Array;
+import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -805,4 +808,46 @@ private void validateArrayElements(Object[] elements, String postgresArrayType,
+ "bigint, text, boolean, numeric, real, float8, timestamp.");
}
}
+
+ /**
+ * PostgreSQL supports Date, Timestamp, Timestamptz, and Time types.
+ *
+ * Inputs are converted to the appropriate datetime type.
+ *
+ *
+ * Implementation Guidelines:
+ *
+ * - If the value is a Timestamp or Timestamp with time zone, it is converted to a Timestamp
+ * - If the value is a Date, it is converted to a Date
+ * - If the value is a Time, it is converted to a Time
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ if (targetSqlType == null) {
+ return false;
+ }
+ switch (targetSqlType) {
+ case "Timestamp":
+ statement.setTimestamp(index, (Timestamp) value);
+ return true;
+ case "Date":
+ statement.setDate(index, (Date) value);
+ return true;
+ case "Time":
+ statement.setTime(index, (Time) value);
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
index 5cda22b576ccb..101ea67ff1fd1 100644
--- a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
+++ b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresArraySupportTest.java
@@ -17,6 +17,7 @@
* under the License.
*/
package org.apache.pulsar.io.jdbc;
+
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doThrow;
diff --git a/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
new file mode 100644
index 0000000000000..a90bd39cb7fb2
--- /dev/null
+++ b/pulsar-io/jdbc/postgres/src/test/java/org/apache/pulsar/io/jdbc/PostgresDateTimeSupportTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PostgresDateTimeSupportTest {
+
+ private PostgresJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+ private Connection mockConnection;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new PostgresJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ mockConnection = mock(Connection.class);
+
+ PostgresArrayTestConfig.configureSinkWithConnection(sink, mockConnection);
+ }
+
+ @Test
+ public void testTimestampHandling() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertTrue(result);
+ verify(mockStatement).setTimestamp(1, timestamp);
+ }
+
+ @Test
+ public void testDateHandling() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertTrue(result);
+ verify(mockStatement).setDate(1, date);
+ }
+
+ @Test
+ public void testTimeHandling() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertTrue(result);
+ verify(mockStatement).setTime(1, time);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
index a579880671339..83e60a01f75f7 100644
--- a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java
@@ -82,4 +82,30 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by SQLite JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}
+
+ /**
+ * SQLite does not support native datetime types.
+ *
+ * SQLite does not have native datetime data types like PostgreSQL. These should be stored as strings or
+ * their default numeric types.
+ *
+ *
+ * Alternatives:
+ *
+ * - Use PostgreSQL JDBC sink for native datetime support
+ *
+ *
+ *
+ * @param statement the PreparedStatement (not used)
+ * @param index the parameter index (not used)
+ * @param value the value (not used)
+ * @param targetSqlType the target SQL type (not used)
+ * @return false as SQLite doesn't support datetime
+ * @throws Exception if conversion or binding fails
+ */
+ @Override
+ protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
+ throws Exception {
+ return false;
+ }
}
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
new file mode 100644
index 0000000000000..ffcb69317bbe4
--- /dev/null
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteDateTimeSupportTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jdbc;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SqliteDateTimeSupportTest {
+
+ private SqliteJdbcAutoSchemaSink sink;
+ private PreparedStatement mockStatement;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ sink = new SqliteJdbcAutoSchemaSink();
+ mockStatement = mock(PreparedStatement.class);
+ }
+
+ @Test
+ public void testTimestampHandlingNotSupported() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testDateHandlingNotSupported() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testTimeHandlingNotSupported() throws Exception {
+ Time time = new Time(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testUnsupportedTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNullTargetType() throws Exception {
+ Timestamp timestamp = new Timestamp(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testEmptyTargetType() throws Exception {
+ Date date = new Date(System.currentTimeMillis());
+ boolean result = sink.handleDateTime(mockStatement, 1, date, "");
+
+ assertFalse(result);
+ }
+}
\ No newline at end of file