...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP aims to contribute the existing Apache Iceberg Flink Connector to Flink.
...
Iceberg supports both Flink’s DataStream API and Table API. Based on the guideline of the Flink community, only the latest 2 minor versions are actively maintained. See the Multi-Engine Support#apache-flink for further details.
Public Interfaces
Iceberg connector implements below Flink interfaces for source and sink.
Source:
See Iceberg Source Design for detailed design. Its based on FLIP-27 and implements:
- Split Enumerator that runs on the job manager and is responsible for (a) discovering new splits and (b) assigning splits to Readers.
- Split Readers that run in parallel on task managers and are responsible for emitting records by reading splits assigned to them.
Sink:
See Iceberg Sink Design for detailed design. It extends AbstractStreamOperator
- Writerextends AbstractStreamOperator
- processElement()
- append element/record to a partition file
- prepareSnapshotPreBarrier()
- flush the file and complete upload to S3
- Emit the file metadata (including path) to downstream committer operator
- Writer itself doesn’t have any checkpoint state
- processElement()
- Committerextends AbstractStreamOperator
- snapshotState()
- Just collect the list of files (metadata including path)
- Note that upstream writers flush and emit the file metadata before barrier. When committer executes snapshotState() method, it has received all files from upstream writers.
- Create a single manifest file that bundles the list of data files in the current checkpoint cycle. Append the meta info of the new manifest file (like path) in Flink state backend.
- notifyCheckpointComplete()
- Commit the manifest file (representing a list of data files)
- Writerextends AbstractStreamOperator
Proposed Changes
In this FLIP we propose to externalize the connector out of Iceberg repo into an external repo and contribute it to Flink as part of Externalize Flink Connectors effort.
We are open to make any changes needed to existing code to use stable and public Flink’s connector interfaces where applicable.
Iceberg connector supports:
- Source and sink
- Usable in both DataStream and Table API/SQL
- DataStream read/append/overwrite
- SQL create/alter/drop table, select, insert into, insert overwrite
- Streaming or batch read in Java API
- Support for Flink’s Python API
See Iceberg Flink for detailed usage instructions.
Compatibility, Deprecation, and Migration Plan
Based on the guidelines of the Flink community, only the latest 2 minor versions are actively maintained. See the Multi-Engine Support#apache-flink for further details. Current supported versions are:
Flink Version | Lifecycle Stage | Initial Iceberg Support | Latest Iceberg Support |
1.11 | End of Life | 0.9.0 | 0.12.1 |
1.12 | End of Life | 0.12.0 | 0.13.1 |
1.13 | Deprecated | 0.13.0 | 0.14.1 |
1.14 | Maintained | 0.13.0 | 0.14.1 |
1.15 | Maintained | 0.14.0 | 0.14.1 |
Test Plan
Iceberg Flink Connector source and sink are tested via integration tests by running Flink in a LocalStreamEnvironment.