Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32068] connector jdbc support clickhouse #49

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

WenDing-Y
Copy link

  1. the clickhouse in batch mode ,as sink and source
  2. the clickhouse in streaming mode ,as sink

@boring-cyborg
Copy link

boring-cyborg bot commented May 18, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@WenDing-Y WenDing-Y changed the title FLINK-32068 connector jdbc support clickhouse [FLINK-32068] connector jdbc support clickhouse May 18, 2023
@MartijnVisser
Copy link
Contributor

@WenDing-Y Could you please rebase?

@WenDing-Y
Copy link
Author

Okay, I have rebased, but I found that there were quite a few modifications in the testing section. I have already submitted some code, but the unit test cannot pass now. I am still modifying this section. @MartijnVisser

@WenDing-Y
Copy link
Author

the pr is ready

@WenDing-Y
Copy link
Author

Please help me review, thank you very much @snuyanzin

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>1.18.1</version>
Copy link
Member

@eskabetxe eskabetxe May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed as testcontainers-bom is defined on parent..

if needed should be used ${testcontainers.version} to use same version defined

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testcontainer in the parent pom corresponds to ru.yandex.clickhouse.ClickHouseDriver, but this driver has stopped maintenance, so I have declared a new version of the image here, corresponding to the driver com.clickhouse.jdbc.ClickHouseDriver

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope you can give me some advice on how to handle it. Thank you very much

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can update the version on parent

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I modify this version number in parent pom.xml, it will affect other testcontainers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parent pom is

<dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>testcontainers-bom</artifactId>
                <version>${testcontainers.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have update master.. if you rebase will be available the last version o testcontainers

Copy link
Author

@WenDing-Y WenDing-Y Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,I have deleted the specified version @eskabetxe

return fields;
}

public String getName() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already exist a getTableName()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have deleted it

getTableName(),
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
"ENGINE = MergeTree",
getStreamFields().map(TableField::getName).findFirst().get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are ignoring the PKs defined in table definition with this...

return String.format("truncate table %s", getTableName());
}

private Stream<TableField> getStreamFields() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableBase already have a getStreamFields()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is private, can I change it to protected

return String.format(
"CREATE TABLE %s (%s) %s PRIMARY KEY (%s)",
getTableName(),
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this is also used on TableBase could we create a method for it there?

Copy link
Author

@WenDing-Y WenDing-Y May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syntax for creating a table in the clickhouse like
CREATE TABLE my_first_table ( user_id UInt32, message String, timestamp DateTime, metric Float32 ) ENGINE = MergeTree PRIMARY KEY (user_id, timestamp)
somewhat different from standard SQL, so I think it is reasonable for ClickhouseTableRow to extend TableBase and then Override the getCreateQuery method. If I put it in TableBase, I also need to choose different methods based on the dialect to obtain the statement for creating the table

@@ -322,6 +325,9 @@ void testBatchSink() throws Exception {

@Test
void testReadingFromChangelogSource() throws Exception {
if (!getMetadata().supportUpsert()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the problem is the deletion no? (I have this same problem in Trino dialect)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remove this judgment, this test case will fail. I would like to hear your opinion on how to handle this issue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the approach is fine, I only dont know if the name of method is the correct (supportUpsert)..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will modify this issue later

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the method name

@@ -291,4 +291,12 @@ protected <T> T getNullable(ResultSet rs, FunctionWithException<ResultSet, T, SQ
protected <T> T getNullable(ResultSet rs, T value) throws SQLException {
return rs.wasNull() ? null : value;
}

public TableField[] getFields() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the use I see you should use getStreamFields()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used, could be removed

@WenDing-Y
Copy link
Author

compile_and_test (1.18-SNAPSHOT) fail, It doesn't seem like the problem was caused by my submission

@WenDing-Y
Copy link
Author

Please help me review, where else can I improve? Thank you very much @eskabetxe

@WenDing-Y
Copy link
Author

What other tasks are needed for this PR to merge

@MartijnVisser
Copy link
Contributor

What other tasks are needed for this PR to merge

You will need to rebase, because there are a couple of merge commits when you merged main into your branch. Those merge commits prevent merging; by rebasing, those commits should not be needed anymore

@WenDing-Y
Copy link
Author

Ok, I have rebase @MartijnVisser

@MartijnVisser
Copy link
Contributor

Ok, I have rebase @MartijnVisser

I don't think you have, given that there are 4 merge-commits in this PR which shouldn't be there when rebased properly: bb13e1f bced473 9abdfaf 2655b6b

Comment on lines 18 to 19
// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we refer here to Clickhouse doc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

private static final int MIN_TIMESTAMP_PRECISION = 1;

// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we refer here to Clickhouse doc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/** */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well it's better to have some description about the class rather than an empty comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;

/** */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well it's better to have some description about the class rather than an empty comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

import java.util.Optional;
import java.util.Set;

/** */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well it's better to have some description about the class rather than an empty comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

} else if (jdbcField instanceof Long) {
return ((Long) jdbcField).longValue();
}
// UINT64 is not supported,the uint64 range exceeds the long range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use BigInteger for UINT64?

Copy link
Author

@WenDing-Y WenDing-Y Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the clickhouse data types range
UInt8 — [0 : 255]
UInt16 — [0 : 65535]
UInt32 — [0 : 4294967295]
UInt64 — [0 : 18446744073709551615]
UInt128 — [0 : 340282366920938463463374607431768211455]
UInt256 — [0 : 115792089237316195423570985008687907853269984665640564039457584007913129639935]

Copy link
Contributor

@snuyanzin snuyanzin Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still don't see the issue why for UInt64, UInt128, UInt256 we can not return BigInteger ?

@snuyanzin
Copy link
Contributor

snuyanzin commented Jun 5, 2023

Sorry i was late here, thanks for your PR @WenDing-Y
The thing that I do not understand...
why do we need 2 ClickhouseDialect classes

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/clickhouse/ClickhouseDialect.java
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java

and
2 ClickhouseDialectFactory

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectFactory.java
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/clickhouse/ClickhouseDialectFactory.java

@WenDing-Y
Copy link
Author

Ok, I have rebase @MartijnVisser

I don't think you have, given that there are 4 merge-commits in this PR which shouldn't be there when rebased properly: bb13e1f bced473 9abdfaf 2655b6b

Sorry, it's my problem. I have rebase it again @MartijnVisser

@WenDing-Y
Copy link
Author

Sorry i was late here, thanks for your PR @WenDing-Y The thing that I do not understand... why do we need 2 ClickhouseDialect classes

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/clickhouse/ClickhouseDialect.java
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialect.java

and 2 ClickhouseDialectFactory

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/clickhouse/dialect/ClickHouseDialectFactory.java
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/clickhouse/ClickhouseDialectFactory.java

Sorry, there was a slight error in my rebase just now, which caused this to happen. Now this has always been resolved

@MartijnVisser
Copy link
Contributor

Now this has always been resolved

Unfortunately it hasn't been resolved. In case you want to rebase, what you should do is the following (this is assuming that you've added this repository as a remote called upstream.)

  1. Run git fetch upstream
  2. Run git rebase upstream/main and complete this until the rebase has successfully completed
  3. Force push your changes to your branch

@WenDing-Y
Copy link
Author

WenDing-Y commented Jun 6, 2023

Now this has always been resolved

Unfortunately it hasn't been resolved. In case you want to rebase, what you should do is the following (this is assuming that you've added this repository as a remote called upstream.)

  1. Run git fetch upstream
  2. Run git rebase upstream/main and complete this until the rebase has successfully completed
  3. Force push your changes to your branch

It seems like this, my local submit history, the new feature started working after [Flink-31551]
@MartijnVisser

image

@snuyanzin
Copy link
Contributor

snuyanzin commented Jun 6, 2023

if you execute this command on your branch

git log --pretty-oneline

you'll se several merge remote-tracking commits like

c9e316596dc29fe3cb60578f53dce3b2b7fa8c1a (HEAD, WenDing/jdbc-clickhouse) Merge remote-tracking branch 'origin/jdbc-clickhouse' into jdbc-clickhouse
9098ce56b0d703cda0a1f24f69058b9ab15af556 [FLINK-32068]connector jdbc support clickhouse
8f6eff4e98003d568da8e722c06167dd491d4b53 [FLINK-32068]connector jdbc support clickhouse
79cc7987be514afd0580d377208c798ec04a9415 [FLINK-32068] jdbc support clickhouse
210968dca5a1fb47cd3268fa130f0ff5e54f299c [FLINK-32068]connector jdbc support clickhouse
c49485360a58e12ce7aad13ee627dcecc74c891e [FLINK-32068] connector jdbc support clickhouse
00c94f815e8fda71777a83d63bd1e14111bc5650 [FLINK-32068] connector jdbc support clickhouse
cd268a2111ecfd0602f4150d5b1cc44b99779456 [FLINK-32068] connector jdbc support clickhouse
86b5eeee7b88c1ef46f7d768875762ae253eec97 [FLINK-32068] connector jdbc support clickhouse
b81bc533fb98da2e5a76bddb22a4da8a52fb411e [FLINK-32068]connector jdbc support clickhouse
55f79f215456c0dbb5fdd3d99a8be6985be092ef [FLINK-32068]connector jdbc support clickhouse
1ff9e70dfbeefaa2e92c11db36fcca3c27bf9b49 Merge remote-tracking branch 'origin/jdbc-clickhouse' into jdbc-clickhouse
b6663a5fced2e4703c263827ca67c2dbacbf96bd [FLINK-32068]connector jdbc support clickhouse
2655b6badefbea699ddb2465803076ec229b884b Merge remote-tracking branch 'origin/main' into jdbc-clickhouse
9abdfafac76b17dcce371239ff5e16d9b90b9980 Merge branch 'main' into jdbc-clickhouse
d67de5f0f94b1de912470028ea1fc74376bd64b8 [FLINK-31551] Add support for CrateDB (#29)
7b8617e03048d32d44dc453bd556d9f59d58a3e1 [FLINK-32068]connector jdbc support clickhouse,delete testcontainers version
bced473fbb8a606dca287fcf9329c7f052b9c4be Merge remote-tracking branch 'upstream/main' into jdbc-clickhouse

and this the issue

@WenDing-Y
Copy link
Author

if you execute this command on your branch

git log --pretty-oneline

you'll se several merge remote-tracking commits like

c9e316596dc29fe3cb60578f53dce3b2b7fa8c1a (HEAD, WenDing/jdbc-clickhouse) Merge remote-tracking branch 'origin/jdbc-clickhouse' into jdbc-clickhouse
9098ce56b0d703cda0a1f24f69058b9ab15af556 [FLINK-32068]connector jdbc support clickhouse
8f6eff4e98003d568da8e722c06167dd491d4b53 [FLINK-32068]connector jdbc support clickhouse
79cc7987be514afd0580d377208c798ec04a9415 [FLINK-32068] jdbc support clickhouse
210968dca5a1fb47cd3268fa130f0ff5e54f299c [FLINK-32068]connector jdbc support clickhouse
c49485360a58e12ce7aad13ee627dcecc74c891e [FLINK-32068] connector jdbc support clickhouse
00c94f815e8fda71777a83d63bd1e14111bc5650 [FLINK-32068] connector jdbc support clickhouse
cd268a2111ecfd0602f4150d5b1cc44b99779456 [FLINK-32068] connector jdbc support clickhouse
86b5eeee7b88c1ef46f7d768875762ae253eec97 [FLINK-32068] connector jdbc support clickhouse
b81bc533fb98da2e5a76bddb22a4da8a52fb411e [FLINK-32068]connector jdbc support clickhouse
55f79f215456c0dbb5fdd3d99a8be6985be092ef [FLINK-32068]connector jdbc support clickhouse
1ff9e70dfbeefaa2e92c11db36fcca3c27bf9b49 Merge remote-tracking branch 'origin/jdbc-clickhouse' into jdbc-clickhouse
b6663a5fced2e4703c263827ca67c2dbacbf96bd [FLINK-32068]connector jdbc support clickhouse
2655b6badefbea699ddb2465803076ec229b884b Merge remote-tracking branch 'origin/main' into jdbc-clickhouse
9abdfafac76b17dcce371239ff5e16d9b90b9980 Merge branch 'main' into jdbc-clickhouse
d67de5f0f94b1de912470028ea1fc74376bd64b8 [FLINK-31551] Add support for CrateDB (#29)
7b8617e03048d32d44dc453bd556d9f59d58a3e1 [FLINK-32068]connector jdbc support clickhouse,delete testcontainers version
bced473fbb8a606dca287fcf9329c7f052b9c4be Merge remote-tracking branch 'upstream/main' into jdbc-clickhouse

and this the issue

What else do I need to do? I executed it
1.Run git fetch upstream
2.Run git rebase upstream/main and complete this until the rebase has successfully completed
3.Force push your changes to your branch

@snuyanzin
Copy link
Contributor

snuyanzin commented Jun 6, 2023

how do you recognize that rebase has successfully completed?
I tried to do it with your branch (git rebase upstream/main) and got a number of conflicts

@WenDing-Y
Copy link
Author

One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no?

if this is true, I think we should have a note on documentation

I tried to work normally in cluster mode

My steps are as follows
1.create ck table

create table if not exists default.ck_flink_test_local on cluster clicks_cluster
(
    user_id Int32,
    message String
)
engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}')
PRIMARY KEY (user_id)




create table if not exists default.ck_flink_test on cluster clicks_cluster
(
user_id Int32,
    message String
)
engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id);
  1. in flink sql client

CREATE TABLE ck_flink_test(
   user_id INTEGER,
  message String
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:clickhouse://xx:xx/default',
 'table-name' = 'ck_flink_test',
 'username’='xx',
 'password’='xx'
);


  1. exec insert sql
INSERT INTO ck_flink_test (user_id, message) VALUES
    (101, 'Hello');



  1. thd data can be viewed on the ck client
select * from ck_flink_test;

5.the flink sql client

image

@WenDing-Y
Copy link
Author

One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no?
if this is true, I think we should have a note on documentation

I tried to work normally in cluster mode

My steps are as follows 1.create ck table

create table if not exists default.ck_flink_test_local on cluster clicks_cluster
(
    user_id Int32,
    message String
)
engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}')
PRIMARY KEY (user_id)




create table if not exists default.ck_flink_test on cluster clicks_cluster
(
user_id Int32,
    message String
)
engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id);
  1. in flink sql client

CREATE TABLE ck_flink_test(
   user_id INTEGER,
  message String
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:clickhouse://xx:xx/default',
 'table-name' = 'ck_flink_test',
 'username’='xx',
 'password’='xx'
);
  1. exec insert sql
INSERT INTO ck_flink_test (user_id, message) VALUES
    (101, 'Hello');
  1. thd data can be viewed on the ck client
select * from ck_flink_test;

5.the flink sql client

image

@WenDing-Y WenDing-Y closed this Jul 11, 2023
@WenDing-Y WenDing-Y reopened this Jul 11, 2023
@WenDing-Y
Copy link
Author

One question that comes to my mind now.. This will only work with a 1 server no? if we have a Clickhouse cluster with more than 1 server (that will have a zookeeper envolved) will not work no?
if this is true, I think we should have a note on documentation

I tried to work normally in cluster mode

My steps are as follows 1.create ck table

create table if not exists default.ck_flink_test_local on cluster clicks_cluster
(
    user_id Int32,
    message String
)
engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/ck_flink_test_local', '{replica}')
PRIMARY KEY (user_id)




create table if not exists default.ck_flink_test on cluster clicks_cluster
(
user_id Int32,
    message String
)
engine = Distributed('clicks_cluster', 'default', 'ck_flink_test_local', user_id);
  1. in flink sql client

CREATE TABLE ck_flink_test(
   user_id INTEGER,
  message String
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:clickhouse://xx:xx/default',
 'table-name' = 'ck_flink_test',
 'username’='xx',
 'password’='xx'
);
  1. exec insert sql
INSERT INTO ck_flink_test (user_id, message) VALUES
    (101, 'Hello');
  1. thd data can be viewed on the ck client
select * from ck_flink_test;

5.the flink sql client

image

@eskabetxe I guess you might have defined the table incorrectly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants