Discussion thread | |
---|---|
Vote thread | |
Issue | |
Release | - |
This PIP is co-worked by Ming Li , Guojun Li and Fang Yong .
Motivation
Paimon is a Streaming Lakehouse, Flink streaming and batch jobs can use Paimon tables as data sources and sinks. Flink can be combined with Paimon to complete the entire ETL processing, and the overall process is as follows.
We would like to manage Flink Streaming & Batch ETL processing in Paimon based on the current ability of Flink, including:
1. Table lineage in Paimon. We want to create lineage between Paimon source and sink tables based on Flink job name, then we can the entire ETL topology for Paimon tables and Flink jobs.
2. Data lineage for Table. Flink streaming job creates a snapshot for Paimon table when it completes a checkpoint, we can create the snapshot lineage for source and sink tables.
Based on the above lineage, we could support the following capabilities in the integrated Flink ETL processing and Paimon tables
1. Query Paimon tables dependency, which allows users to manage their tables and Flink jobs better.
2. Locating and analyzing issues. When there is a delay in the data of a Paimon table, users can get the upstream tables and jobs, check whether there are any issues in them.
3. Data management for streaming and batch ETL, for example, according to the data lineage, users can easily submit debug jobs, compare the results between the debug job and streaming job.
4. Data correction for streaming ETL. When users need to correct data, they can rollback all tables and jobs to a unify "version" based on the data lineage, and recover the ETL after data correction.
Architecture
Overall
We would like to introduce System Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.
Use Case
1. Create a catalog with options table-lineage
and data-lineage
. NOTICE: these two options are immutable.
-- Create a catalog with lineage options CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'warehouse path', 'table-lineage' = 'true', 'data-lineage' = 'true' ); USE CATALOG my_catalog; -- Select the options for the current catalog SELECT * FROM paimon.catalog_options; +-----------------------+ | name | value | +-----------------------+ | table-lineage | true | +-----------------------+ | data-lineage | true | +-----------------------+ 2 row in set
2. Create two tables and submit Flink ETL job.
-- Create a word table CREATE TABLE word_table (word STRING); -- Create a word count table CREATE TABLE word_count_table ( word STRING PRIMARY KEY NOT ENFORCED, cnt BITING ); -- Submit a flink job to ingest data from kafka to word_table INSERT INTO word_table SELECT FROM ...; SET 'pipeline.name'='job1'; INSERT INTO word_count_table SELECT word, count(*) FROM word_table GROUP BY word;
Paimon will create a relationship between word_table
and word_count_table
with job1
, as mentioned above, it includes table lineage and data lineage. For example, Checkpoint 1 in job1 consumes Snapshot 3 of word_table, and products Snapshot 5 in word_count_table, and so on. The information of table and data lineage is shown as follows.
3. Users can debug and validate their data of tables based on the table and data lineage, the steps are
a) Users can get the information that "Snapshot 5 of word_table generates Snapshot 7 in word_count_table" from lineage tables.
b) According to the time travel ability of Paimon, users can submit a Flink or Spark batch job to read Snapshot 5 from word_table, generates data to a new table word_count_table_snapshot_result whose schema is same as word_count_table.
c) Compare the data in word_count_table_snapshot_result and Snapshot 7 of word_count_table to validate the result of streaming ETL.
-- Create table word_count_table_snapshot_result from word_count_table CREATE TABLE word_count_table_snapshot_result LIKE word_count_table; -- Read snapshot 5 from word_table and write data to word_count_table_snapshot_result -- in batch mode INSERT INTO word_count_table_snapshot_result SELECT word, count(*) FROM word_table /*+ OPTIONS('scan.snapshot-id'='5') */ GROUP BY word; -- Compare the data in word_count_table_snapshot_result with -- snapshot 7 in word_count_table and get the diffs SELECT * FROM word_count_table_snapshot_result as L FULL OUTER JOIN word_count_table /*+ OPTIONS('scan.snapshot-id'='7') */ as R ON L.word=R.word where L.word IS NULL OR R.word IS NULL OR L.cnt != R.cnt;
In addition to the above data validation, users can also debug their jobs. For example, users can modify and submit a new job which will read Snapshot 5 of word_table and product results in a new table. Then they can compare the data with Snapshot 7 in word_count_table to check whether the new job meets their expectations.
4. Users can also correct data based on table and data lineages. Suppose that users want to correct data for table1 from Snapshot 4, and recompute data for downstream tables.
a) Full Recover
1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1
2> According to table lineage, create empty downstream table as table_s3
3> Submit new ETL job to read full data of Snapshot 4 from table_s1 and Snapshot 6 from table2, then read incremental data
Full recover will read the full data first, then read the incremental data, which may result in a large amount of computation and a lot of time. To avoid this situation, incremental recover can be considered.
b) Increment Recover
Increment recover will start a new Flink ETL job with savepoint, suppose that in the above data lineage, Snapshot 4 in table1 is related to Savepoint 5 in Flink ETL job, then the steps of increment recover:
1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1
2> Create downstream table table_s3 from table3 and insert all data of snapshot 2 from table3 to table_s3
3> Submit new Flink ETL job with Savepoint 5 and start to consume increment data from Snapshot 4 in table_s1 and Snapshot 6 in table2.
Proposed Changes
System Database
When users add option table-lineage
or data-lineage
to catalog DDL, Paimon will check and create system database 'paimon' automatically, then create options table, table lineage table and data lineage table.
Table Schema
1. catalog_options
Catalog options are stored in catalog_options table, it is used for Flink ETL to read lineage related options and create source and sink. There are key and value fields in the catalog_options and key is the primary key of the table.
Column Name | Column Type | Example |
key | STRING | "table-lineage" |
value | STRING | "true" |
2. source_job_lineage
Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.
Column Name | Column Type | Example |
job | STRING | "myJob" |
database | STRING | "myDatabase" |
table | STRING | "myTableName" |
3. sink_job_lineage
Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.
Column Name | Column Type | Example |
job | STRING | "myJob" |
database | STRING | "myDatabase" |
table | STRING | "myTableName" |
4. source_snapshot_lineage
Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.
Column Name | Column Type | Example |
job | STRING | "myJob" |
barrier_id | LONG | 1 |
database | STRING | "myDatabase" |
table | STRING | "myTableName" |
snapshot_id | LONG | 1 |
5. sink_snapshot_lineage
Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.
Column Name | Column Type | Example |
job | STRING | "myJob" |
barrier_id | LONG | 1 |
database | STRING | "myDatabase" |
table | STRING | "myTableName" |
snapshot_id | LONG | 1 |
Lineage Use Case
1. Query Table Lineage
-- Query upstream tables for given table SELECT S.database, S.table FROM source_job_lineage S JOIN sink_job_lineage T ON S.job=T.job where T.`database`='myDatabase' and T.`table`='myTable'; -- Query downstream tables for given table SELECT T.database, T.table FROM source_job_lineage S JOIN sink_job_lineage T ON S.job=T.job where S.`database`='myDatabase' and S.`table`='myTable';
2. Query Data Lineage
-- Query upstream table snapshot for given table and snapshot SELECT S.database, S.table, S.snapshot_id FROM source_snapshot_lineage S JOIN sink_snapshot_lineage T ON S.job=T.job AND S.barrier_id=T.barrier_id where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123; -- Query downstream table snapshot for given table and snapshot SELECT T.database, T.table, T.snapshot_id FROM source_job_lineage S JOIN sink_job_lineage T ON S.job=T.job AND S.barrier_id=T.barrier_id where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123;
Table Lineage
We introduce a new option table-lineage
for Paimon catalog, users can set this option when they create a new catalog.
FlinkTableFactory
in Paimon will create source and sink for Flink ETL job, it can read pipeline.name
from Context
and save table lineage information for the ETL job.
However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based FLIP-314.
Data Lineage
Similar to table lineage, we add a new option data-lineage
in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.
Cross Catalog Lineage
Plan For The Future
[1] FLIP-314: Support Customized Job Lineage Listener