Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This PIP is co-worked by Ming Li , Guojun Li and Fang Yong .

Motivation

Paimon is a Streaming Lakehouse, Flink streaming and batch jobs can use Paimon tables as data sources and sinks. Flink can be combined with Paimon to complete the entire ETL processing, and the overall process is as follows.

...

1. Table lineage in Paimon. We want to create lineage between Paimon source and sink tables based on Flink job name, then we can manage the entire ETL topology for Paimon tables and Flink jobs.

...

2. Locating and analyzing issues. When there is a delay in the data of a Paimon table, users can get the upstream tables and jobs, check whether there are any issues in them.

3. Data management for streaming and batch ETL, for example, according to the data lineage, users can easily submit debug jobs, compare the results between the debug job and streaming running job.

4. Data correction for streaming ETL. When users need to correct data, they can rollback all tables and jobs to a unify "version" based on the data lineage, and recover the ETL after data correction.

Architecture

We would like to introduce System Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.

...

1. Create a catalog with options table-lineage and data-lineage . NOTICE: these two options are immutable after a catalog is created.

Code Block
-- Create a catalog with lineage options
CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'warehouse path',
    'table-lineage' = 'true',
    'data-lineage' = 'true'
);

USE CATALOG my_catalog;

-- Select the options for the current catalog
SELECT * FROM paimon.catalog_options;
+-----------------------+
|     name      | value |
+-----------------------+
| table-lineage | true  |
+-----------------------+
| data-lineage  | true  |
+-----------------------+
2 row in set

...

However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based on FLIP-314 in the future.

...

Currently Flink does not support setting a unified version for all sources in the job, we have created an issue [2]. According to the exist ability of Flink, we introduce job_startup table which saves the table snapshot id for job when it starts.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

When submitting the job, based on the existing table and snapshot id, we will query the new source table's corresponding snapshot id from the data lineage table. When creating the source for job, we will put the relevant snapshot id in the configuration and write the corresponding data to the job_startup table. During the execution of streaming jobs, the source will directly consume data based on the corresponding snapshot id.

...