Skip to content

Commit 2f52c66

Browse files
committed
add iceberg support
1 parent c1cd338 commit 2f52c66

File tree

7 files changed

+943
-0
lines changed

7 files changed

+943
-0
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>alink_connectors_iceberg</artifactId>
7+
<groupId>com.alibaba.alink</groupId>
8+
<version>1.5-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>alink_iceberg_bridge_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
13+
<packaging>jar</packaging>
14+
<name>alink-iceberg-bridge</name>
15+
16+
<properties>
17+
<hivemetastore.hadoop.version>2.7.5</hivemetastore.hadoop.version>
18+
<flink.shaded.version>9.0</flink.shaded.version>
19+
<hive.version>2.3.4</hive.version>
20+
<iceberg.version>0.12.0</iceberg.version>
21+
</properties>
22+
23+
<dependencies>
24+
<dependency>
25+
<groupId>org.apache.flink</groupId>
26+
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
27+
<version>${hivemetastore.hadoop.version}-${flink.shaded.version}</version>
28+
<scope>provided</scope>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.commons</groupId>
32+
<artifactId>commons-lang3</artifactId>
33+
<version>3.4</version>
34+
<scope>provided</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-runtime_${alink.scala.major.version}</artifactId>
39+
<version>${flink.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.flink</groupId>
44+
<artifactId>flink-core</artifactId>
45+
<version>${flink.version}</version>
46+
<scope>provided</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-table-api-java</artifactId>
51+
<version>${flink.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-table-common</artifactId>
57+
<version>${flink.version}</version>
58+
<scope>provided</scope>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-connector-hive_${alink.scala.major.version}</artifactId>
63+
<version>${flink.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-table-runtime-blink_${alink.scala.major.version}</artifactId>
69+
<version>${flink.version}</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.apache.hive</groupId>
75+
<artifactId>hive-exec</artifactId>
76+
<version>${hive.version}</version>
77+
<scope>provided</scope>
78+
<exclusions>
79+
<exclusion>
80+
<groupId>org.apache.hive</groupId>
81+
<artifactId>hive-vector-code-gen</artifactId>
82+
</exclusion>
83+
<exclusion>
84+
<groupId>org.apache.hive</groupId>
85+
<artifactId>hive-llap-tez</artifactId>
86+
</exclusion>
87+
<exclusion>
88+
<groupId>org.apache.hive</groupId>
89+
<artifactId>hive-shims</artifactId>
90+
</exclusion>
91+
<exclusion>
92+
<groupId>commons-codec</groupId>
93+
<artifactId>commons-codec</artifactId>
94+
</exclusion>
95+
<exclusion>
96+
<groupId>commons-httpclient</groupId>
97+
<artifactId>commons-httpclient</artifactId>
98+
</exclusion>
99+
<exclusion>
100+
<groupId>org.apache.logging.log4j</groupId>
101+
<artifactId>log4j-slf4j-impl</artifactId>
102+
</exclusion>
103+
<exclusion>
104+
<groupId>org.antlr</groupId>
105+
<artifactId>antlr-runtime</artifactId>
106+
</exclusion>
107+
<exclusion>
108+
<groupId>org.antlr</groupId>
109+
<artifactId>ST4</artifactId>
110+
</exclusion>
111+
<exclusion>
112+
<groupId>org.apache.ant</groupId>
113+
<artifactId>ant</artifactId>
114+
</exclusion>
115+
<exclusion>
116+
<groupId>org.apache.commons</groupId>
117+
<artifactId>commons-compress</artifactId>
118+
</exclusion>
119+
<exclusion>
120+
<groupId>org.apache.ivy</groupId>
121+
<artifactId>ivy</artifactId>
122+
</exclusion>
123+
<exclusion>
124+
<groupId>org.apache.zookeeper</groupId>
125+
<artifactId>zookeeper</artifactId>
126+
</exclusion>
127+
<exclusion>
128+
<groupId>org.apache.curator</groupId>
129+
<artifactId>apache-curator</artifactId>
130+
</exclusion>
131+
<exclusion>
132+
<groupId>org.apache.curator</groupId>
133+
<artifactId>curator-framework</artifactId>
134+
</exclusion>
135+
<exclusion>
136+
<groupId>org.codehaus.groovy</groupId>
137+
<artifactId>groovy-all</artifactId>
138+
</exclusion>
139+
<exclusion>
140+
<groupId>org.apache.calcite</groupId>
141+
<artifactId>calcite-core</artifactId>
142+
</exclusion>
143+
<exclusion>
144+
<groupId>org.apache.calcite</groupId>
145+
<artifactId>calcite-druid</artifactId>
146+
</exclusion>
147+
<exclusion>
148+
<groupId>org.apache.calcite.avatica</groupId>
149+
<artifactId>avatica</artifactId>
150+
</exclusion>
151+
<exclusion>
152+
<groupId>org.apache.calcite</groupId>
153+
<artifactId>calcite-avatica</artifactId>
154+
</exclusion>
155+
<exclusion>
156+
<groupId>com.google.code.gson</groupId>
157+
<artifactId>gson</artifactId>
158+
</exclusion>
159+
<exclusion>
160+
<groupId>stax</groupId>
161+
<artifactId>stax-api</artifactId>
162+
</exclusion>
163+
<exclusion>
164+
<groupId>com.google.guava</groupId>
165+
<artifactId>guava</artifactId>
166+
</exclusion>
167+
<exclusion>
168+
<groupId>log4j</groupId>
169+
<artifactId>log4j</artifactId>
170+
</exclusion>
171+
<exclusion>
172+
<groupId>log4j</groupId>
173+
<artifactId>apache-log4j-extras</artifactId>
174+
</exclusion>
175+
<exclusion>
176+
<groupId>org.slf4j</groupId>
177+
<artifactId>slf4j-log4j12</artifactId>
178+
</exclusion>
179+
</exclusions>
180+
</dependency>
181+
182+
<dependency>
183+
<groupId>org.apache.iceberg</groupId>
184+
<artifactId>iceberg-flink-runtime</artifactId>
185+
<version>${iceberg.version}</version>
186+
<scope>provided</scope>
187+
</dependency>
188+
189+
<dependency>
190+
<groupId>org.slf4j</groupId>
191+
<artifactId>slf4j-api</artifactId>
192+
<version>1.7.21</version>
193+
<scope>provided</scope>
194+
</dependency>
195+
</dependencies>
196+
197+
<build>
198+
<plugins>
199+
<plugin>
200+
<groupId>org.apache.maven.plugins</groupId>
201+
<artifactId>maven-compiler-plugin</artifactId>
202+
<configuration>
203+
<source>1.8</source>
204+
<target>1.8</target>
205+
</configuration>
206+
</plugin>
207+
</plugins>
208+
</build>
209+
</project>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.apache.iceberg.flink;
2+
3+
import org.apache.flink.api.common.io.RichInputFormat;
4+
import org.apache.flink.api.common.typeinfo.TypeInformation;
5+
import org.apache.flink.api.java.tuple.Tuple2;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
import org.apache.flink.table.catalog.Catalog;
8+
import org.apache.flink.table.catalog.ObjectPath;
9+
import org.apache.flink.table.data.RowData;
10+
import org.apache.flink.util.Preconditions;
11+
import org.apache.iceberg.Schema;
12+
import org.apache.iceberg.Table;
13+
import org.apache.iceberg.flink.source.FlinkInputFormat;
14+
import org.apache.iceberg.flink.source.FlinkSource;
15+
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
16+
17+
import java.io.IOException;
18+
import java.io.UncheckedIOException;
19+
20+
public class InputOutputFormat {
21+
22+
public static Tuple2<TypeInformation<RowData>, RichInputFormat<RowData, ?>> createInputFormat(
23+
StreamExecutionEnvironment execEnv, Catalog catalog, ObjectPath objectPath) {
24+
25+
if (!(catalog instanceof FlinkCatalog)) {
26+
throw new RuntimeException("Catalog should be iceberg catalog.");
27+
}
28+
29+
TableLoader tableLoader = createTableLoader((FlinkCatalog) catalog, objectPath);
30+
31+
Table table;
32+
Schema icebergSchema;
33+
tableLoader.open();
34+
try (TableLoader loader = tableLoader) {
35+
table = loader.loadTable();
36+
icebergSchema = table.schema();
37+
} catch (IOException e) {
38+
throw new UncheckedIOException(e);
39+
}
40+
41+
TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(icebergSchema));
42+
FlinkInputFormat flinkInputFormat = FlinkSource.forRowData()
43+
.env(execEnv)
44+
.tableLoader(tableLoader)
45+
.table(table)
46+
.buildFormat();
47+
return Tuple2.of(typeInfo, flinkInputFormat);
48+
}
49+
50+
private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
51+
Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
52+
return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
53+
}
54+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<project xmlns="http://maven.apache.org/POM/4.0.0"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
6+
<parent>
7+
<artifactId>alink_connectors</artifactId>
8+
<groupId>com.alibaba.alink</groupId>
9+
<version>1.5-SNAPSHOT</version>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>alink_connectors_iceberg</artifactId>
14+
<packaging>pom</packaging>
15+
<name>alink-connector-iceberg</name>
16+
17+
<modules>
18+
<module>iceberg-bridge</module>
19+
</modules>
20+
</project>

connectors/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<module>connector-odps</module>
2020
<module>connector-jdbc</module>
2121
<module>connector-hive</module>
22+
<module>connector-iceberg</module>
2223
<module>filesystem</module>
2324
</modules>
2425

0 commit comments

Comments
 (0)