Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.

Flink TableSource TableSink

Jörn Franke edited this page Nov 21, 2021 · 5 revisions

THIS IS DEPRECATED DUE TO LACK OF SUPPORT OF THE TABLE API IN NEWER FLINK VERSIONS. SUPPORTED UP TO FLINK 1.10.x

The Flink TableSource/Table Sink uses the Table API of Apache Flink. to read/write Office documents, such as MS Excel, using SQL (recommended for most of the Flink applications). It supports all features of the HadoopOffice library, such as encryption, signing, linked workbooks, templates or low footprint mode.

The following TableSources/TableSinks are provided:

  • ExcelFlinkTableSource - to read an Excel file into a Flink table that can be queried using SQL
  • ExcelFlinkTableSink - to write an Excel from a Flink Table

The Table Source is available for Scala 2.11 and Flink 1.3+ and for Scala 2.12 (starting with Flink 1.7 and HadoopOffice 1.2.3).

Build

Note this Flink TableSource/TableSink is available on Maven Central and you do not need to build and publish it to a local Maven anymore to use it. The Flink TableSource/TableSink is based on the Flink DataSource DataSink provided by the HadoopOfficeLibrary.

Flink-DataSource-DataSink Execute:

git clone https://github.com/ZuInnoTe/hadoopoffice.git hadoopoffice

You can build the application by changing to the directory hadoopoffice/flinkts and using the following command:

sbt +clean +it:test +publishLocal

This command also publishes it to the local Maven so you easily use it in your own projects.

Use

Configure

ExcelFlinkTableSource

This TableSource can use the following parameters:

  • path the path to the Excel file to read
  • conf Configuration of the reading options (e.g. encryption etc.), see HadoopOfficeReadConfiguration.java and Hadoop File Format options
  • field - for each column you need to define its name and datetype. The order of the field commands describe which column of the Excel is meant, e.g. the first field refers to the first column, the second field to the second column. The following Flink types are supported: BOOLEAN, STRING, SQL_DATE, SQL_TIMESTAMP, DECIMAL, BYTE, SHORT, INT, LONG, DOUBLE, FLOAT.
  • dateFormat: Format of dates. Note that in most cases - even for Excels written using Excel for Non-US countries you may want to choose US, because this is the Excel default format. However, if your dates are Excel strings you may need to select other formats. Default: Locale.US. By using SimpleDateFormat you can define any date format pattern
  • decimalFormat: Format of decimals. Those can vary from country to country. For instance, in Germany the comma is the equivalent to the US dot. Default: Locale of the system
  • dateTimeFormat (as of 1.2.0): Format of timestamps. You can define any Locale here to read timestamps. However using SimpleDateFormat you can define any date format pattern. Default: null (this means the pattern defined in java.sql.Timestamp will be used).

Example for the file testsimple.xlsx. This file has dates as Excel standard in US format and decimals stored in German format (e.g. compared to us it uses a comma instead of a dot). It skips the first line of the first sheet in the Excel, because it is the header line and does not contain data.

val hocr: HadoopOfficeReadConfiguration = new HadoopOfficeReadConfiguration()
    val dateFormat: SimpleDateFormat = DateFormat.getDateInstance(DateFormat.SHORT, Locale.US).asInstanceOf[SimpleDateFormat]
    val decimalFormat: DecimalFormat = NumberFormat.getInstance(Locale.GERMANY).asInstanceOf[DecimalFormat]
    hocr.setReadHeader(true)
    val source: ExcelFlinkTableSource = ExcelFlinkTableSource.builder()
      .path('hdfs:///user/input/office')
      .field("decimalsc1", Types.DECIMAL)
      .field("booleancolumn", Types.BOOLEAN)
      .field("datecolumn", Types.SQL_DATE)
      .field("stringcolumn", Types.STRING)
      .field("decimalp8sc3", Types.DECIMAL)
      .field("bytecolumn", Types.BYTE)
      .field("shortcolumn", Types.SHORT)
      .field("intcolumn", Types.INT)
      .field("longcolumn", Types.LONG)
      .conf(hocr)
      .dateFormat(dateFormat)
      .decimalFormat(decimalFormat)
      .build()
    tableEnvironment.registerTableSource("testsimple", source)

ExcelFlinkTableSink

This TableSink can use the following parameters:

  • path the path to where the Excel file should be written
  • conf Configuration of the writing options (e.g. encryption etc.), see HadoopOfficeWriteConfiguration.java and Hadoop File Format options
  • useHeader: if true then the field names of the FlinkTable are written as the first row of the Excel file
  • defaultSheetName: Name of the Sheet where the data should be written
  • dateFormat: Format of dates. Note that in most cases - even for Excels written using Excel for Non-US countries you may want to choose US, because this is the Excel default format. However, if your dates are Excel strings you may need to select other formats. Default: Locale.US
  • decimalFormat: Format of decimals. Those can vary from country to country. For instance, in Germany the comma is the equivalent to the US dot. Default: Locale of the system

The following example describes how a result from a query ("testSimpleResult") of a FlinkTable is stored as a file in new Excel Format (see mimetype in Hadoop File Format). It uses as dateFormat locale US format and as decimalFormat Germany. It writes the field names of the Flink Table as the first row of the Excel (useHeader: true). All data is written in the Sheet "Sheet 1".

    tableEnvironment.registerTableSource("testsimple", source)
    val testSimpleScan = tableEnvironment.scan("testsimple")
    val testSimpleResult = testSimpleScan.select("*")
    // write table using sink
    val howc = new HadoopOfficeWriteConfiguration(DFS_OUTPUT_DIR_NAME)
    howc.setMimeType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
    val useHeader = true
    val defaultSheetName = "Sheet1"
    val dateFormat: SimpleDateFormat = DateFormat.getDateInstance(DateFormat.SHORT, Locale.US).asInstanceOf[SimpleDateFormat]
    val decimalFormat: DecimalFormat = NumberFormat.getInstance(Locale.GERMANY).asInstanceOf[DecimalFormat]
    val sink = new ExcelFlinkTableSink('hdfs:///user/output/office', true, useHeader, defaultSheetName, dateFormat, decimalFormat, Some(WriteMode.NO_OVERWRITE))
    testSimpleResult.writeToSink(sink)

Clone this wiki locally