Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34930] Fork existing code from bahir-flink #1

Merged
merged 44 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
afd3f13
[FLINK-34930] Initialize tools/ and vcs.xml
ferenc-csaky Mar 26, 2024
3e1045b
[FLINK-34930] Remove Python CI
ferenc-csaky Apr 11, 2024
64df367
[BAHIR-99] kudu connector
eskabetxe Jul 25, 2018
380d979
[BAHIR-179] Fail Docker integration tests silently
eskabetxe Jan 9, 2019
d09e3d7
[BAHIR-194] bump kudu version to 1.8.0
eskabetxe Feb 10, 2019
cee8a73
[BAHIR-180] Improve eventual consistence for Kudu connector
eskabetxe Jan 15, 2019
90c0660
[BAHIR-199] Bump kudu version to 1.9.0
eskabetxe Mar 7, 2019
2b34864
[BAHIR-202] Improve KuduSink throughput using async FlushMode
suxinglee Mar 22, 2019
83ae04b
[BAHIR-200] Move tests from docker to kudu-test-utils (#49)
eskabetxe May 26, 2019
1799957
[BAHIR-209] upgrade kudu version to 1.10.0 (#61)
eskabetxe Jul 3, 2019
4e71c17
[BAHIR-207] Add tests for scala 2.12 on travis (#59)
eskabetxe Jul 3, 2019
eb7d959
[BAHIR-214] Improve speed and solve eventual consistence issues (#64)
eskabetxe Sep 3, 2019
e9c1f21
Add support "upsert part of columns of a kudu table" (#70)
lintingbin Nov 29, 2019
e58d42f
Fix "the client has already been closed" bug (#75)
CruxWW Jan 19, 2020
113e78c
Fix NotSerializableException in Kudu connector (#74)
lintingbin Jan 19, 2020
f58f0d9
Kudu Connector rework (#78)
mbalassi May 19, 2020
0bb7c4f
Add batch table env support and filter push down to Kudu connector (#82)
sebastianliu Jul 14, 2020
0edb468
BAHIR-240: replace docker test by testcontainer (#89)
eskabetxe Jul 28, 2020
1344d41
[BAHIR-241] Upgrade all connectors to Flink 1.11 (#99)
gyfora Sep 24, 2020
4f38647
[BAHIR-263] Update flink to 1.12.2 (#115)
eskabetxe Mar 12, 2021
49cbc96
[BAHIR-260] Add kudu table writer config (#109)
hackergin Apr 7, 2021
625568e
[BAHIR-293] Fix documentation tables
eskabetxe Dec 7, 2021
29fdcf7
[BAHIR-291] Bump flink to 1.14.0 (#136)
RocMarshal Dec 26, 2021
0619722
[BAHIR-296] Unify log4j libs
eskabetxe Dec 26, 2021
e22b414
BAHIR-296: Unify mockito version to 1.10.19
eskabetxe Dec 27, 2021
cc90226
[BAHIR-302] Add enforcers
eskabetxe Mar 28, 2022
02d2990
[BAHIR-243] Change KuduTestHarness with TestContainers
eskabetxe Mar 11, 2021
a45f81d
[BAHIR-302] Group declaration of flink dependencies on parent pom
eskabetxe May 2, 2022
191070a
[maven-release-plugin] prepare for next development iteration
eskabetxe Jun 16, 2022
575eb1a
[BAHIR-305] Kudu Flink SQL Support DynamicSource/Sink&LookupFunction
eskabetxe Jul 17, 2022
258b8fe
[BAHIR-312] Add license header to README.md files
eskabetxe Jul 30, 2022
606b16f
[BAHIR-321] Make KuduFilterInfo handle String literals
thebalu Jan 3, 2023
011e8fa
[BAHIR-308] Remove support for scala 2.11
eskabetxe Jul 6, 2022
228330f
[BAHIR-308] Bump flink version to 1.15.3
eskabetxe Mar 8, 2023
1970751
[BAHIR-308] Remove scala prefix where we can
eskabetxe Mar 23, 2023
b58b690
[BAHIR-324] Closing KuduReader at JobManager
collabH May 30, 2023
a4d2fb9
[FLINK-34930] Adapt POM files to the new structure, remove flink-shad…
ferenc-csaky Mar 28, 2024
c70031d
[FLINK-34930] Apply spotless code format and checkstyle rules
ferenc-csaky Mar 28, 2024
58bf05e
[FLINK-34930] Rename base package
ferenc-csaky Apr 10, 2024
0d8e1c4
[FLINK-34930] Fix KuduTableSourceITCase
ferenc-csaky Mar 28, 2024
73178c8
[FLINK-34930] State Bahir fork
ferenc-csaky Apr 2, 2024
2385c8e
[FLINK-34930] Skip spotless for JDK 21+
ferenc-csaky Apr 8, 2024
121eecb
[FLINK-34930] Enable module opens for tests for newer JDKs
ferenc-csaky Apr 11, 2024
519e0af
[FLINK-34930] Migrate Bahir NOTICE
ferenc-csaky Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,3 @@ jobs:
with:
flink_version: ${{ matrix.flink }}
jdk_version: ${{ matrix.jdk }}
python_test:
strategy:
matrix:
flink: [ 1.17.2, 1.18.1, 1.19-SNAPSHOT ]
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[submodule "tools/releasing/shared"]
path = tools/releasing/shared
url = https://github.com/apache/flink-connector-shared-utils
branch = release_utils
25 changes: 25 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Flink Kudu Connector
Copyright 2014-2024 The Apache Software Foundation
Apache Bahir
Copyright (c) 2016-2024 The Apache Software Foundation.

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand All @@ -12,5 +12,3 @@ ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUT
DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
USE OR PERFORMANCE OF THIS SOFTWARE.


6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

This repository contains the official Apache Flink Kudu connector.

## Forked from Bahir

The connector code is forked from [Apache Bahir](https://bahir.apache.org/) project after its retirement.

## Apache Flink

Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities.
Expand Down Expand Up @@ -65,4 +69,4 @@ This article describes [how to contribute to Apache Flink](https://flink.apache.
## About

Apache Flink is an open source project of The Apache Software Foundation (ASF).
The Apache Flink project originated from the [Stratosphere](http://stratosphere.eu) research project.
The Apache Flink project originated from the [Stratosphere](http://stratosphere.eu) research project.
324 changes: 324 additions & 0 deletions flink-connector-kudu/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
<!--
{% comment %}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
{% endcomment %}
-->

# Flink Kudu Connector

This connector provides a source (```KuduInputFormat```), a sink/output
(```KuduSink``` and ```KuduOutputFormat```, respectively),
as well a table source (`KuduTableSource`), an upsert table sink (`KuduTableSink`), and a catalog (`KuduCatalog`),
to allow reading and writing to [Kudu](https://kudu.apache.org/).

To use this connector, add the following dependency to your project:

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-kudu_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
*Version Compatibility*: This module is compatible with Apache Kudu *1.11.1* (last stable version) and Apache Flink 1.10.+.

Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/dependencies.html).

## Installing Kudu

Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html).
Optionally, you can use the docker images provided in dockers folder.

## SQL and Table API

The Kudu connector is fully integrated with the Flink Table and SQL APIs. Once we configure the Kudu catalog (see next section)
we can start querying or inserting into existing Kudu tables using the Flink SQL or Table API.

For more information about the possible queries please check the [official documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/)

### Kudu Catalog

The connector comes with a catalog implementation to handle metadata about your Kudu setup and perform table management.
By using the Kudu catalog, you can access all the tables already created in Kudu from Flink SQL queries. The Kudu catalog only
allows users to create or access existing Kudu tables. Tables using other data sources must be defined in other catalogs such as
in-memory catalog or Hive catalog.

When using the SQL CLI you can easily add the Kudu catalog to your environment yaml file:

```
catalogs:
- name: kudu
type: kudu
kudu.masters: <host>:7051
```
Once the SQL CLI is started you can simply switch to the Kudu catalog by calling `USE CATALOG kudu;`

You can also create and use the KuduCatalog directly in the Table environment:

```java
String KUDU_MASTERS="host1:port1,host2:port2"
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tableEnv.registerCatalog("kudu", catalog);
tableEnv.useCatalog("kudu");
```
### DDL operations using SQL

It is possible to manipulate Kudu tables using SQL DDL.

When not using the Kudu catalog, the following additional properties must be specified in the `WITH` clause:

* `'connector.type'='kudu'`
* `'kudu.masters'='host1:port1,host2:port2,...'`: comma-delimitered list of Kudu masters
* `'kudu.table'='...'`: The table's name within the Kudu database.

If you have registered and are using the Kudu catalog, these properties are handled automatically.

To create a table, the additional properties `kudu.primary-key-columns` and `kudu.hash-columns` must be specified
as comma-delimited lists. Optionally, you can set the `kudu.replicas` property (defaults to 1).
Other properties, such as range partitioning, cannot be configured here - for more flexibility, please use
`catalog.createTable` as described in [this](#Creating-a-KuduTable-directly-with-KuduCatalog) section or create the table directly in Kudu.

The `NOT NULL` constraint can be added to any of the column definitions.
By setting a column as a primary key, it will automatically by created with the `NOT NULL` constraint.
Hash columns must be a subset of primary key columns.

Kudu Catalog

```
CREATE TABLE TestTable (
first STRING,
second STRING,
third INT NOT NULL
) WITH (
'kudu.hash-columns' = 'first',
'kudu.primary-key-columns' = 'first,second'
)
```
Other catalogs

```
CREATE TABLE TestTable (
first STRING,
second STRING,
third INT NOT NULL
) WITH (
'connector.type' = 'kudu',
'kudu.masters' = '...',
'kudu.table' = 'TestTable',
'kudu.hash-columns' = 'first',
'kudu.primary-key-columns' = 'first,second'
)
```
Renaming a table:

```
ALTER TABLE TestTable RENAME TO TestTableRen
```
Dropping a table:

```sql
DROP TABLE TestTableRen
```
#### Creating a KuduTable directly with KuduCatalog

The KuduCatalog also exposes a simple `createTable` method that required only the where table configuration,
including schema, partitioning, replication, etc. can be specified using a `KuduTableInfo` object.

Use the `createTableIfNotExists` method, that takes a `ColumnSchemasFactory` and
a `CreateTableOptionsFactory` parameter, that implement respectively `getColumnSchemas()`
returning a list of Kudu [ColumnSchema](https://kudu.apache.org/apidocs/org/apache/kudu/ColumnSchema.html) objects;
and `getCreateTableOptions()` returning a
[CreateTableOptions](https://kudu.apache.org/apidocs/org/apache/kudu/client/CreateTableOptions.html) object.

This example shows the creation of a table called `ExampleTable` with two columns,
`first` being a primary key; and configuration of replicas and hash partitioning.

```java
KuduTableInfo tableInfo = KuduTableInfo
.forTable("ExampleTable")
.createTableIfNotExists(
() ->
Lists.newArrayList(
new ColumnSchema
.ColumnSchemaBuilder("first", Type.INT32)
.key(true)
.build(),
new ColumnSchema
.ColumnSchemaBuilder("second", Type.STRING)
.build()
),
() -> new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(Lists.newArrayList("first"), 2));

catalog.createTable(tableInfo, false);
```
The example uses lambda expressions to implement the functional interfaces.

Read more about Kudu schema design in the [Kudu docs](https://kudu.apache.org/docs/schema_design.html).

### Supported data types


| Flink/SQL | Kudu |
| ---------------- | :---------------: |
| `STRING` | STRING |
| `BOOLEAN` | BOOL |
| `TINYINT` | INT8 |
| `SMALLINT` | INT16 |
| `INT` | INT32 |
| `BIGINT` | INT64 |
| `FLOAT` | FLOAT |
| `DOUBLE` | DOUBLE |
| `BYTES` | BINARY |
| `TIMESTAMP(3)` | UNIXTIME_MICROS |

Note:

* `TIMESTAMP`s are fixed to a precision of 3, and the corresponding Java conversion class is `java.sql.Timestamp`
* `BINARY` and `VARBINARY` are not yet supported - use `BYTES`, which is a `VARBINARY(2147483647)`
* `CHAR` and `VARCHAR` are not yet supported - use `STRING`, which is a `VARCHAR(2147483647)`
* `DECIMAL` types are not yet supported

### Lookup Cache

Kudu connector can be used in temporal join as a lookup source (aka. dimension table). Currently, only sync lookup mode is supported.

By default, lookup cache is not enabled. You can enable it by setting both `lookup.cache.max-rows` and `lookup.cache.ttl`.

The lookup cache is used to improve performance of temporal join theKudu connector. By default, lookup cache is not enabled, so all the requests are sent to external database. When lookup cache is enabled, each process (i.e. TaskManager) will hold a cache. Flink will lookup the cache first, and only send requests to external database when cache missing, and update cache with the rows returned. The oldest rows in cache will be expired when the cache hit to the max cached rows `kudu.lookup.cache.max-rows` or when the row exceeds the max time to live `kudu.lookup.cache.ttl`. The cached rows might not be the latest, users can tune `kudu.lookup.cache.ttl` to a smaller value to have a better fresh data, but this may increase the number of requests send to database. So this is a balance between throughput and correctness.

Reference :[Flink Jdbc Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/#lookup-cache)


### Known limitations

* Data type limitations (see above).
* SQL Create table: primary keys can only be set by the `kudu.primary-key-columns` property, using the
`PRIMARY KEY` constraint is not yet possible.
* SQL Create table: range partitioning is not supported.
* When getting a table through the Catalog, NOT NULL and PRIMARY KEY constraints are ignored. All columns
are described as being nullable, and not being primary keys.
* Kudu tables cannot be altered through the catalog other than simple renaming

## DataStream API

It is also possible to use the Kudu connector directly from the DataStream API however we
encourage all users to explore the Table API as it provides a lot of useful tooling when working
with Kudu data.

### Reading tables into a DataStreams

There are 2 main ways of reading a Kudu Table into a DataStream

1. Using the `KuduCatalog` and the Table API
2. Using the `KuduRowInputFormat` directly

Using the `KuduCatalog` and Table API is the recommended way of reading tables as it automatically
guarantees type safety and takes care of configuration of our readers.

This is how it works in practice:

```java
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableSettings);

tableEnv.registerCatalog("kudu", new KuduCatalog("master:port"));
tableEnv.useCatalog("kudu");

Table table = tableEnv.sqlQuery("SELECT * FROM MyKuduTable");
DataStream<Row> rows = tableEnv.toAppendStream(table, Row.class);
```
The second way of achieving the same thing is by using the `KuduRowInputFormat` directly.
In this case we have to manually provide all information about our table:

```java
KuduTableInfo tableInfo = ...
KuduReaderConfig readerConfig = ...
KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig, tableInfo);

DataStream<Row> rowStream = env.createInput(inputFormat, rowTypeInfo);
```
At the end of the day the `KuduTableSource` is just a convenient wrapper around the `KuduRowInputFormat`.

### Kudu Sink

The connector provides a `KuduSink` class that can be used to consume DataStreams
and write the results into a Kudu table.

The constructor takes 3 or 4 arguments.

* `KuduWriterConfig` is used to specify the Kudu masters and the flush mode.
* `KuduTableInfo` identifies the table to be written
* `KuduOperationMapper` maps the records coming from the DataStream to a list of Kudu operations.
* `KuduFailureHandler` (optional): If you want to provide your own logic for handling writing failures.

The example below shows the creation of a sink for Row type records of 3 fields. It Upserts each record.
It is assumed that a Kudu table with columns `col1, col2, col3` called `AlreadyExistingTable` exists. Note that if this were not the case,
we could pass a `KuduTableInfo` as described in the [Catalog - Creating a table](#creating-a-table) section,
and the sink would create the table with the provided configuration.

```java
KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(KUDU_MASTERS).build();

KuduSink<Row> sink = new KuduSink<>(
writerConfig,
KuduTableInfo.forTable("AlreadyExistingTable"),
new RowOperationMapper<>(
new String[]{"col1", "col2", "col3"},
AbstractSingleOperationMapper.KuduOperation.UPSERT)
)
```
#### KuduOperationMapper

This section describes the Operation mapping logic in more detail.

The connector supports insert, upsert, update, and delete operations.
The operation to be performed can vary dynamically based on the record.
To allow for more flexibility, it is also possible for one record to trigger
0, 1, or more operations.
For the highest level of control, implement the `KuduOperationMapper` interface.

If one record from the DataStream corresponds to one table operation,
extend the `AbstractSingleOperationMapper` class. An array of column
names must be provided. This must match the Kudu table's schema.

The `getField` method must be overridden, which extracts the value for the table column whose name is
at the `i`th place in the `columnNames` array.
If the operation is one of (`CREATE, UPSERT, UPDATE, DELETE`)
and doesn't depend on the input record (constant during the life of the sink), it can be set in the constructor
of `AbstractSingleOperationMapper`.
It is also possible to implement your own logic by overriding the
`createBaseOperation` method that returns a Kudu [Operation](https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html).

There are pre-defined operation mappers for Pojo, Flink Row, and Flink Tuple types for constant operation, 1-to-1 sinks.

* `PojoOperationMapper`: Each table column must correspond to a POJO field
with the same name. The `columnNames` array should contain those fields of the POJO that
are present as table columns (the POJO fields can be a superset of table columns).
* `RowOperationMapper` and `TupleOperationMapper`: the mapping is based on position. The
`i`th field of the Row/Tuple corresponds to the column of the table at the `i`th
position in the `columnNames` array.

## Building the connector

The connector can be easily built by using maven:

```
cd bahir-flink
mvn clean install
```
### Running the tests

The integration tests rely on the Kudu test harness which requires the current user to be able to ssh to localhost.

This might not work out of the box on some operating systems (such as Mac OS X).
To solve this problem go to *System Preferences/Sharing* and enable Remote login for your user.
Loading
Loading