Skip to content

Commit

Permalink
Feedback from HP
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Dec 10, 2024
1 parent 07ddf2f commit eb7b2b8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
22 changes: 19 additions & 3 deletions flink-cdc/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mysql: [Warning] Using a password on the command line interface can be insecure.

=== Observe the automagically created Elasticsearch indices

_This uses https://httpie.io/[httpie]_.

[source,bash]
----
❯ http -b localhost:9200/_cat/indices
Expand Down Expand Up @@ -100,7 +102,7 @@ Change the source data:
[source,bash]
----
docker-compose exec mysql mysql -uroot -phunter2 -e "INSERT INTO customers VALUES (42,'foo','bar','');" inventory
docker-compose exec mysql mysql -uroot -phunter2 -e "UPDATE CUSTOMERS SET last_name='foo' WHERE id=1001;" inventory
docker-compose exec mysql mysql -uroot -phunter2 -e "UPDATE customers SET last_name='foo' WHERE id=1001;" inventory
----

Look at the resulting changes in Elasticsearch:
Expand Down Expand Up @@ -145,6 +147,8 @@ Look at the resulting changes in Elasticsearch:

== Let's do the same in Flink SQL

_You might want to clean up the running Flink CDC jobs before trying this—restarting the Docker Compose stack is easiest_.

The first step is to manually configure an instance of the [MySQL CDC connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mysql-cdc/) for each table.

=== Figure out source schema
Expand Down Expand Up @@ -173,6 +177,7 @@ Manually:
(if you miss PK you get `org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.`)

* ` `type` enum('SHIPPING','BILLING','LIVING') NOT NULL,` - use string instead
* Closing brackets, commas, etc etc

Handle data type conversions, e.g. `enum`, `geometry` and column characteristics e.g. `AUTO_INCREMENT`

Expand All @@ -194,6 +199,10 @@ In Flink SQL, add the JARs manually to avoid watch out for https://issues.apache
[source,sql]
----
ADD JAR '/opt/flink/jars/flink-sql-connector-mysql-cdc-3.2.1.jar';
----

[source,sql]
----
ADD JAR '/opt/flink/jars/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar';
----

Expand Down Expand Up @@ -376,7 +385,7 @@ Check the result:
"email": "[email protected]",
"first_name": "Sally",
"id": 1001,
"last_name": "bar"
"last_name": "Thomas"
},
"_type": "_doc"
}
Expand All @@ -392,7 +401,14 @@ Check the result:
}
----

Update the source MySQL row again
Update the source MySQL row again:

[source,bash]
----
docker-compose exec mysql mysql -uroot -phunter2 -e "UPDATE customers SET last_name='foo' WHERE id=1001;" inventory
----

Inspect the changed data:

[source,bash]
----
Expand Down
2 changes: 1 addition & 1 deletion flink-cdc/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/flink:1.18.1-scala_2.12-java11
FROM flink:1.18.1-scala_2.12-java11
SHELL ["/bin/bash", "-c"]


Expand Down

0 comments on commit eb7b2b8

Please sign in to comment.