Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36271] Support reading json and jsonb types in PostgreSQL dialect #141

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

grzegorz8
Copy link

When using PostgreSQL JDBC Catalog an error is thrown if any of the tables has column of type json or jsonb.

java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet
  • json / jsonb field is returned as VARCHAR when reading the data.
  • Writing values (insert into ...) to json / jsonb column is not allowed in current design.

@grzegorz8 grzegorz8 changed the title Support reading json and jsonb types in PostgreSQL dialect [FLINK-36271] Support reading json and jsonb types in PostgreSQL dialect Sep 12, 2024
@grzegorz8 grzegorz8 marked this pull request as draft September 12, 2024 13:55
@stslingaprabu
Copy link

stslingaprabu commented Oct 7, 2024

@grzegorz8
Can You Give me, Some Example to Build This Code and Add Jar and use in pyflink code for Streaming Data to JSONB in PostgreSQL.

@grzegorz8
Copy link
Author

@grzegorz8 Can You Give me, Some Example to Build This Code and Add Jar and use in pyflink code for Streaming Data to JSONB in PostgreSQL.

I'm sorry but there are still one issue with this change, namely, cratedb depends on postgres module and the following error is thrown in tests:

Caused by: java.lang.NoClassDefFoundError: org/postgresql/util/PGobject
	at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPrimitiveConverter$bd2b50a6$1(PostgresDialectConverter.java:99)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
	at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPostgresArrayConverter$477a3c4c$1(PostgresDialectConverter.java:87)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.toInternal(AbstractDialectConverter.java:78)
	at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
	at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.ClassNotFoundException: org.postgresql.util.PGobject
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 11 more

However, the build will succeed if you skip tests. mvn clean install -DskipTests

@stslingaprabu
Copy link

@grzegorz8
Yeah, I build the code and In Every Modules It generate Jar files, But I want to use Changed Postgres Module Jar, Did you used that in pyflink by importing jar?
image

Which I need to include in code:

t_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    "file:///opt/flink/lib/flink-python-1.17.2.jar;file:///opt/flink/lib/flink-scala_2.12-1.17.2.jar;file:///opt/flink/connector/flink-sql-connector-kafka-1.17.2.jar;file:///opt/flink/connector/flink-connector-jdbc-3.3-SNAPSHOT-shaded.jar;file:///opt/flink/connector/flink-connector-jdbc-postgres-3.3.jar;file:///opt/flink/connector/postgresql-42.6.0.jar",
)

Not working..

@grzegorz8
Copy link
Author

@grzegorz8 Can You Give me, Some Example to Build This Code and Add Jar and use in pyflink code for Streaming Data to JSONB in PostgreSQL.

I'm sorry but there are still one issue with this change, namely, cratedb depends on postgres module and the following error is thrown in tests:

Caused by: java.lang.NoClassDefFoundError: org/postgresql/util/PGobject
	at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPrimitiveConverter$bd2b50a6$1(PostgresDialectConverter.java:99)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
	at org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter.lambda$createPostgresArrayConverter$477a3c4c$1(PostgresDialectConverter.java:87)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.lambda$wrapIntoNullableInternalConverter$ff222c76$1(AbstractDialectConverter.java:127)
	at org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter.toInternal(AbstractDialectConverter.java:78)
	at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
	at org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
Caused by: java.lang.ClassNotFoundException: org.postgresql.util.PGobject
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 11 more

However, the build will succeed if you skip tests. mvn clean install -DskipTests

Hey @matriv! Since you are the author of CrateDB support (#29) maybe you can suggest me how to deal with the error shown above? Thanks in advance.

@matriv
Copy link
Contributor

matriv commented Oct 30, 2024

Hey @matriv! Since you are the author of CrateDB support (#29) maybe you can suggest me how to deal with the error shown above? Thanks in advance.

For sure! Thx for the ping. I will check it and get back to you soon with findings.

@matriv
Copy link
Contributor

matriv commented Oct 31, 2024

@grzegorz8 How do you get this error, is it from inside IntelliJ, or by running mvn clean install?
If I use java11 ($ export JAVA_HOME=/opt/java11) and then mvn clean install I get a different error which has to do with not supporting writes:

[ERROR] org.apache.flink.connector.jdbc.postgres.table.PostgresDynamicTableSinkITCase.testUpsert  Time elapsed: 0.741 s  <<< ERROR!
java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:118)
        at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:81)
        at org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase.testUpsert(JdbcDynamicTableSinkITCase.java:246)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:85)
        at org.apache.flink.table.api.internal.InsertResultProvider.isFirstRowReady(InsertResultProvider.java:71)
        at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:105)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        ... 5 more
Caused by: java.io.IOException: unable to open JDBC writer
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:98)
        at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:53)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
        at org.apache.flink.table.runtime.operators.sink.SinkOperator.open(SinkOperator.java:58)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.postgresql.util.PSQLException: FATAL: sorry, too many clients already
        at org.postgresql.core.v3.ConnectionFactoryImpl.doAuthentication(ConnectionFactoryImpl.java:698)
        at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:207)
        at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:262)
        at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)
        at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:273)
        at org.postgresql.Driver.makeConnection(Driver.java:446)
        at org.postgresql.Driver.connect(Driver.java:298)
        at org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:123)
        at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:96)
        ... 14 more

@grzegorz8
Copy link
Author

@matriv I think Caused by: org.postgresql.util.PSQLException: FATAL: sorry, too many clients already error is irrelevant. I ofter see the error when I run tests locally, even on main branch.

Please run the CrateDB tests in Intellij.

Maybe let me clarify why I'm asking you for help. Namely, I modified postgresql dialect to support jsonb format. While running tests I noticed that crateDB inherits from postgresql dialect, but it does not work well with my change (NoClassDefFoundError: org/postgresql/util/PGobject). I thought you may be able to give me some hint how to handle it.

@matriv
Copy link
Contributor

matriv commented Nov 6, 2024

@grzegorz8 grzegorz8 marked this pull request as ready for review November 12, 2024 20:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants