Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP aims to contribute the existing Apache Iceberg Flink Connector to Flink. 

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table. 

Iceberg avoids unpleasant surprises. Schema evolution works and won’t inadvertently un-delete data. Users don’t need to know about partitioning to get fast queries. Iceberg was designed to solve correctness problems in eventually-consistent cloud object stores.

Iceberg supports both Flink’s DataStream API and Table API. Based on the guideline of the Flink community, only the latest 2 minor versions are actively maintained. See the Multi-Engine Support#apache-flink for further details.

Public Interfaces

Iceberg connector implements below Flink interfaces for source and sink. 

  • Source:

          See Iceberg Source Design for detailed design. Its based on FLIP-27 and implements:

    • Split Enumerator that runs on the job manager and is responsible for (a) discovering new splits and (b) assigning splits to Readers.
    • Split Readers that run in parallel on task managers and are responsible for emitting records by reading splits assigned to them.
  • Sink

           See Iceberg Sink Design for detailed design. It extends AbstractStreamOperator

    • Writer extends AbstractStreamOperator
      • processElement()
        • append element/record to a partition file
      • prepareSnapshotPreBarrier()
        • flush the file and complete upload to S3
        • Emit the file metadata (including path) to downstream committer operator
        • Writer itself doesn’t have any checkpoint state
    • Committer extends AbstractStreamOperator
    • snapshotState()
      • Just collect the list of files (metadata including path)
      • Note that upstream writers flush and emit the file metadata before barrier. When committer executes snapshotState() method, it has received all files from upstream writers.
      • Create a single manifest file that bundles the list of data files in the current checkpoint cycle. Append the meta info of the new manifest file (like path) in Flink state backend. 
    • notifyCheckpointComplete()
      • Commit the manifest file (representing a list of data files)  

Proposed Changes

In this FLIP we propose to externalize the connector out of Iceberg repo into an external repo and contribute it to Flink as part of Externalize Flink Connectors effort.

We are open to make any changes needed to existing code to use stable and public Flink’s connector interfaces where applicable.

The Flink community will create a new connector repo, similar to ElasticSearch. The repository name will be:

  • flink-connector-iceberg

When created, it will be at: https://github.com/apache/flink-connector-iceberg

Iceberg connector supports:

  • Source and sink
  • Usable in both DataStream and Table API/SQL
  • DataStream read/append/overwrite
  • SQL create/alter/drop table, select, insert into, insert overwrite
  • Streaming or batch read in Java API 
  • Support for Flink’s Python API

See Iceberg Flink for detailed usage instructions.

Compatibility, Deprecation, and Migration Plan

Based on the guidelines of the Flink community, only the latest 2 minor versions are actively maintained. See the Multi-Engine Support#apache-flink for further details. Current supported versions are:

Flink Version

Lifecycle Stage

Initial Iceberg Support

Latest Iceberg Support

1.11

End of Life

0.9.0

0.12.1

1.12

End of Life

0.12.0

0.13.1

1.13

Deprecated

0.13.0

0.14.1

1.14

Maintained

0.13.0

0.14.1

1.15

Maintained

0.14.0

0.14.1

Test Plan

Iceberg Flink Connector source and sink are tested with unit tests and integration tests by running Flink in a LocalStreamEnvironment. 

Rejected Alternatives

None yet.