Skip to content

[Bug] [dinky-cdc-core] The use of lambda expressions in AbstractSqlSinkBuilder causes deserialization issues in Flink #4388

@Selegant

Description

@Selegant

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Dinky 版本 1.2.3
Flink 版本 1.20 jdk11的版本
提交方式是 standalone
执行任务
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.255.158',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'nbods.ab_beddict',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.sink.db' = 'ods_copy',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
报错
025-06-06 01:38:56,435 WARN org.apache.flink.runtime.taskmanager.Task [] - SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.InvalidClassException: org.apache.flink.connector.jdbc.dialect.AbstractDialect; local class incompatible: stream classdesc serialVersionUID = -5510372127161093008, local class serialVersionUID = -1079348137343042861
at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0]
... 12 more
2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851).
2025-06-06 01:38:56,435 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task SinkMaterializer[6] -> Sink: test_ab_beddict[6] (1/1)#1851 1c69df105b850081f72b1bed6fbe9522_a5d048cc3217147772867b2cb3a508fa_0_1851.
2025-06-06 01:38:56,437 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: MySQL CDC Source -> PartitionByPrimaryKey -> Shunt -> FlatMapRow -> anonymous_datastream_source$2[4] -> Calc[5] -> ConstraintEnforcer[6] (1/1)#1851 (1c69df105b850081f72b1bed6fbe9522_cbc357ccb763df2852fee8c4fc7d55f2_0_1851) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.0.jar:1.20.0]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamFlatMap
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0]
... 16 more

What you expected to happen

我仔细看过 dinkly1.2.3 版本的代码 发现应该是AbstractSqlSinkBuilder 里面buildRow 这个方法里面 flatMap 使用了 lambda 表达式写法造成的 希望能尽快修复

How to reproduce

Dinky 版本 1.2.3
Flink 版本 1.20 jdk11的版本
提交方式是 standalone
执行任务
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.255.158',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'nbods.ab_beddict',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://172.16.255.158:3306/ods_copy?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = 'root',
'sink.sink.db' = 'ods_copy',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '#{tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)

Anything else

No response

Version

1.2.3

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

BugSomething isn't workingInvalidInvalid

Type

Projects

Status

ToDo

Relationships

None yet

Development

No branches or pull requests

Issue actions