Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.jdbc;

import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

Expand Down Expand Up @@ -65,4 +67,45 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
throw new UnsupportedOperationException("Array types are not supported by ClickHouse JDBC sink. "
+ "Consider using PostgreSQL JDBC sink for array support.");
}

/**
* ClickHouse supports various Date types.
* <p>
* Inputs are converted to the appropriate Date type.
* </p>
* <p>
* <strong>Implementation Guidelines:</strong>
* <ul>
* <li>If the value is a Timestamp, it is converted to a Timestamp</li>
* <li>If the value is a Date, it is converted to a Date</li>
* <li>If the value is a Time, it not supported</li>
* </ul>
* </p>
*
* @param statement the PreparedStatement (not used)
* @param index the parameter index (not used)
* @param value the value (not used)
* @param targetSqlType the target SQL type (not used)
* @return false as SQLite doesn't support datetime
* @throws Exception if conversion or binding fails
*/
@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value, String targetSqlType)
throws Exception {
if (targetSqlType == null) {
return false;
}
switch (targetSqlType) {
case "Timestamp":
statement.setTimestamp(index, (Timestamp) value);
return true;
case "Date":
statement.setDate(index, (Date) value);
return true;
case "Time":
return false;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jdbc;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.Time;
import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ClickHouseDateTimeSupportTest {

private ClickHouseJdbcAutoSchemaSink sink;
private PreparedStatement mockStatement;

@BeforeMethod
public void setUp() throws Exception {
sink = new ClickHouseJdbcAutoSchemaSink();
mockStatement = mock(PreparedStatement.class);
}

@Test
public void testTimestampHandling() throws Exception {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "Timestamp");

assertTrue(result);
verify(mockStatement).setTimestamp(1, timestamp);
}

@Test
public void testDateHandling() throws Exception {
Date date = new Date(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, date, "Date");

assertTrue(result);
verify(mockStatement).setDate(1, date);
}

@Test
public void testTimeHandlingNotSupported() throws Exception {
Time time = new Time(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, time, "Time");

assertFalse(result);
}

@Test
public void testUnsupportedTargetType() throws Exception {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, timestamp, "UnsupportedType");

assertFalse(result);
}

@Test
public void testNullTargetType() throws Exception {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, timestamp, null);

assertFalse(result);
}

@Test
public void testEmptyTargetType() throws Exception {
Date date = new Date(System.currentTimeMillis());
boolean result = sink.handleDateTime(mockStatement, 1, date, "");

assertFalse(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -56,6 +59,55 @@ public List<ColumnId> getColumnsForUpsert() {
throw new IllegalStateException("UPSERT not supported");
}

/**
* Handles datetime value binding for database-specific date, time, and timestamp types.
* <p>
* This method is called when an input value is of datetime or timestamp value.
* Implementations should convert the timestamp or datetime value to the appropriate
* database-specific type or revert to primitive types. The statement is passed into the function to handle
* field conversion and set it accordingly.
* </p>
* <p>
* The method is invoked automatically by {@link #setColumnValue(PreparedStatement, int, Object, String)}
* when it detects an input of type java.sql.Timestamp, java.sql.Date, or java.sql.Time.
* </p>
* <p>
* <strong>Implementation Guidelines:</strong>
* <ul>
* <li>If not available, convert timestamp values to long. Alternatively use setTimestamp on the
* PreparedStatement</li>
* <li>If not available, convert date values to int. Alternatively use setDate on the
* PreparedStatement</li>
* <li>If not available, convert time values to int or long. Alternatively user setTime on the
* PreparedStatement</li>
* <li>Provide informative logs and wrap JDBC exceptions with contextual information</li>
* </ul>
* </p>
* <p>
* <strong>Example Usage:</strong>
* <pre>{@code
* // For PostgreSQL implementation of timestamps:
* if (value instanceof Timestamp) {
* statement.setTimestamp(index, (Timestamp) value);
* }
* }</pre>
* </p>
*
* @param statement the PreparedStatement to bind the value to
* @param index the parameter index (1-based) in the PreparedStatement
* @param value the value to be bound
* @param targetSqlType the target SQL type name for the column (e.g., "Timestamp", "Date", "Time")
* @return true if the value is handled, false otherwise. Databases that do not support datetime will return
* false and the value will be bound to its original type.
* @throws Exception if conversion or binding fails, including:
* <ul>
* <li>{@code SQLException} for JDBC creation or binding failures</li>
* </ul>
* @see #setColumnValue(PreparedStatement, int, Object, String)
*/
protected abstract boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception;

/**
* Handles array value binding for database-specific array types.
* <p>
Expand Down Expand Up @@ -200,7 +252,13 @@ public Mutation createMutation(Record<GenericObject> message) {
if (schemaType.isPrimitive()) {
throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType);
}
recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
if (schemaType == SchemaType.AVRO || schemaType == SchemaType.JSON) {
Map<String, Object> data = new HashMap<>();
fillKeyValueSchemaData(message.getSchema(), record, data);
recordValueGetter = (k) -> data.get(k);
} else {
recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
}
}
String action = message.getProperties().get(ACTION_PROPERTY);
if (action != null) {
Expand Down Expand Up @@ -298,6 +356,16 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
return;
}

if (value instanceof Timestamp || value instanceof Date || value instanceof Time) {
String typeName = value instanceof Timestamp ? "Timestamp"
: value instanceof Date ? "Date"
: "Time";
boolean timestampConverted = handleDateTime(statement, index, value, typeName);
if (timestampConverted) {
return;
}
}

if (value instanceof Integer) {
statement.setInt(index, (Integer) value);
} else if (value instanceof Long) {
Expand Down Expand Up @@ -344,8 +412,7 @@ private static Object getValueFromJsonNode(final JsonNode fn) {
return null;
}
if (fn.isContainerNode()) {
throw new IllegalArgumentException("Container nodes are not supported, the JSON must contains only "
+ "first level fields.");
return fn.toString();
} else if (fn.isBoolean()) {
return fn.asBoolean();
} else if (fn.isFloatingPointNumber()) {
Expand Down Expand Up @@ -447,7 +514,34 @@ static Object convertAvroField(Object avroValue, Schema schema) {
switch (schema.getType()) {
case NULL:
case INT:
if (schema.getLogicalType() != null) {
String logicalTypeName = schema.getLogicalType().getName();
int time = (Integer) avroValue;
if ("time-millis".equals(logicalTypeName)) {
return new Time(time);
} else if ("date".equals(logicalTypeName)) {
return new Date(time);
} else {
throw new UnsupportedOperationException("Unsupported avro integer logical type="
+ logicalTypeName + " for value field schema " + schema.getName());
}
}
case LONG:
if (schema.getLogicalType() != null) {
String logicalTypeName = schema.getLogicalType().getName();
long timestamp = (Long) avroValue;
if ("timestamp-millis".equals(logicalTypeName)
|| "timestamp-micros".equals(logicalTypeName)
|| "local-timestamp-millis".equals(logicalTypeName)
|| "local-timestamp-micros".equals(logicalTypeName)) {
return new Timestamp(timestamp);
} else if ("time-micros".equals(logicalTypeName)) {
return new Time(timestamp);
} else {
throw new UnsupportedOperationException("Unsupported avro long logical type="
+ logicalTypeName + " for value field schema " + schema.getName());
}
}
case DOUBLE:
case FLOAT:
case BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}

@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception {
return false;
}
};
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
Expand Down Expand Up @@ -244,6 +250,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}

@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception {
return false;
}
};

Field field = JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
Expand Down Expand Up @@ -293,6 +305,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}

@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception {
return false;
}
};

// Test all primitive type conversions still work
Expand Down Expand Up @@ -337,6 +355,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}

@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception {
return false;
}
};

PreparedStatement mockStatement = mock(PreparedStatement.class);
Expand All @@ -363,6 +387,12 @@ protected void handleArrayValue(PreparedStatement statement, int index, Object a
String targetSqlType) throws Exception {
throw new UnsupportedOperationException("Array handling not implemented in test");
}

@Override
protected boolean handleDateTime(PreparedStatement statement, int index, Object value,
String targetSqlType) throws Exception {
return false;
}
};

PreparedStatement mockStatement = mock(PreparedStatement.class);
Expand Down
Loading
Loading