Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Diagram
bordertrue
diagramNamecorrect
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth335
revision1

a) Full Recover

1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1

2> According to table lineage, create empty downstream table as table_s3 

3> Submit new ETL job to read full data of Snapshot 4 from table_s1 and Snapshot 6 from table2, then read incremental data

draw.io Diagram
bordertrue
diagramNamefull_correction
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth372
revision1

Full recover will read the full data first, then read the incremental data, which may result in a large amount of computation and a lot of time. To avoid this situation, incremental recover can be considered.

b) Increment Recover

Increment recover will start a new Flink ETL job with savepoint, suppose that in the above data lineage, Snapshot 4 in table1 is related to Savepoint 5 in Flink ETL job, then the steps of increment recover:

1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1

2> Create downstream table table_s3 from table3 and insert all data of snapshot 2 from table3 to table_s3

3> Submit new Flink ETL job with Savepoint 5 and start to consume increment data from Snapshot 4 in table_s1 and Snapshot 6 in table2.

draw.io Diagram
bordertrue
diagramNameincremental correction
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth372
revision1

Proposed Changes

When users add option table-lineage or data-lineage to catalog DDL, Paimon will check and create system database 'paimon' automatically, then create options table, table lineage table and data lineage table.

1. catalog_options

Catalog options are stored in catalog_options table, it is used for Flink ETL to read lineage related options and create source and sink. There are key and value fields in the catalog_options and key is the primary key of the table.

Column Name

Column Type

Example

key

STRING

"table-lineage"

value

STRING

"true"

2. source_job_lineage

Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

3. sink_job_lineage

Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

4. source_snapshot_lineage

Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

5. sink_snapshot_lineage

Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

1. Query Table Lineage

Code Block
-- Query upstream tables for given table
SELECT S.database, S.table FROM
    source_job_lineage S
    JOIN sink_job_lineage T ON S.job=T.job
    where T.`database`='myDatabase' and T.`table`='myTable';

-- Query downstream tables for given table
SELECT T.database, T.table FROM
    source_job_lineage S
    JOIN sink_job_lineage T ON S.job=T.job
    where S.`database`='myDatabase' and S.`table`='myTable';

2. Query Data Lineage

Code Block
-- Query upstream table snapshot for given table and snapshot
SELECT S.database, S.table, S.snapshot_id FROM
    source_snapshot_lineage S
    JOIN sink_snapshot_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123;

-- Query downstream table snapshot for given table and snapshot
SELECT T.database, T.table, T.snapshot_id FROM
    source_job_lineage S
    JOIN sink_job_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123;

We introduce a new option table-lineage for Paimon catalog, users can set this option when they create a new catalog.

FlinkTableFactory  in Paimon will create source and sink for Flink ETL job, it can read pipeline.name from Context and save table lineage information for the ETL job.

draw.io Diagram
bordertrue
diagramNametable lineage
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision1

However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based FLIP-314.

Similar to table lineage, we add a new option data-lineage in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.

draw.io Diagram
bordertrue
diagramNamedata lineage source etl
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision1





[1] FLIP-314: Support Customized Job Lineage Listener