Skip to content

Conversation

omrifried
Copy link

Main Issue: #23109

Motivation

  • This PR adds abstract support for JDBC sink to enable datetime inserts into databases

Modifications

  • Added a protected method on the base jdbc sink to handle datetimes
  • Parse avro logical types to pull correct time information
  • Insert correct datetimes into databases
  • Fall back to raw data types on lack of support

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • *Added unit tests to all JDBC database connectors for handling datetimes

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 22, 2025
@Technoboy- Technoboy- added this to the 4.2.0 milestone Sep 23, 2025
@Technoboy- Technoboy- changed the title [improve][jdbc] time handling support [improve][jdbc] Add time handling support Sep 23, 2025
@lhotari
Copy link
Member

lhotari commented Sep 23, 2025

Thanks for the contribution @omrifried. There are checkstyle errors. Configuring code style and checkstyle in IntelliJ will help address those. Please check the guide at https://pulsar.apache.org/contribute/setup-ide/#configure-code-style .

You can run mvn -T 1C initialize checkstyle:check to quickly run the check on the command line.

[INFO] There are 17 errors reported by Checkstyle 10.14.2 with /home/runner/work/pulsar/pulsar/buildtools/src/main/resources/pulsar/checkstyle.xml ruleset.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[26,1] (imports) ImportOrder: Import java.sql.Date appears after other imports that it should precede
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[65] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[77] (sizes) LineLength: Line is longer than 120 characters (found 123).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[79] (sizes) LineLength: Line is longer than 120 characters (found 121).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[97] (sizes) LineLength: Line is longer than 120 characters (found 168).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[255,13] (blocks) RightCurly: '}' at column 13 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[357] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[358] (regexp) RegexpSingleline: Trailing whitespace
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[360,21] (naming) LocalVariableName: Name 'timestamp_converted' must match pattern '^[a-z][a-zA-Z0-9]*$'.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[519,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[522,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[524] (sizes) LineLength: Line is longer than 120 characters (found 122).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[528,13] (coding) FallThrough: Fall through from previous branch of the switch statement.
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[532] (sizes) LineLength: Line is longer than 120 characters (found 219).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[534,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[537,21] (blocks) RightCurly: '}' at column 21 should be on the same line as the next part of a multi-block statement (one that directly contains multiple blocks: if/else-if/else, do/while or try/catch/finally).
Error:  src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:[543,13] (coding) FallThrough: Fall through from previous branch of the switch statement.

@Technoboy- Technoboy- requested a review from freeznet September 29, 2025 01:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants