...
Code Block |
---|
-- Query upstream table snapshot for given table and snapshot SELECT S.database, S.table, S.snapshot_id FROM source_snapshot_lineage S JOIN sink_snapshot_lineage T ON S.job=T.job AND S.barrier_id=T.barrier_id where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123; -- Query downstream table snapshot for given table and snapshot SELECT T.database, T.table, T.snapshot_id FROM source_job_lineage S JOIN sink_job_lineage T ON S.job=T.job AND S.barrier_id=T.barrier_id where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123; |
Table Lineage
We introduce a new option table-lineage
for Paimon catalog, users can set this option when they create a new catalog.
FlinkTableFactory
in Paimon will create source and sink for Flink ETL job, it can read pipeline.name
from Context
and save table lineage information for the ETL job.
...
System Database Action
Tables in system database are read-only and users can query data from system tables by SQL just like a regular table. But users can not alter the system tables and update the data in them. We provide Flink actions for users to delete data from system tables as follows
action | argument | note |
---|---|---|
delete-table-lineage | --job <job-name>: specify name of the job. | delete table lineage created by the given job. |
delete-data-lineage | --job <job-name>: specify name of job. | delete data lineage created by the given job. |
Table Lineage
We introduce a new option table-lineage
for Paimon catalog, users can set this option when they create a new catalog.
FlinkTableFactory
in Paimon will create source and sink for Flink ETL job, it can read pipeline.name
from Context
and save table lineage information for the ETL job.
draw.io Diagram | ||||||
---|---|---|---|---|---|---|
|
However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based on FLIP-314 in the future.
Data Lineage
Job Submission
Similar to table lineage, we add a new option data-lineage
in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.
draw.io Diagram | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based on FLIP-314 in the future.
Data Lineage
Job Submission
Similar to table lineage, we add a new option data-lineage
in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
We would like to align data lineage in the same table for different ETL jobs. For example, when ETL 1 generates Snapshot 9 in Table 5 From Snapshot 13 in Table 2 and Snapshot 12 in Table 3, we want that the ETL 2 will consume the same Snapshot 13 in Table 2 and Snapshot 12 in Table 3 to produce result Snapshot 15 in Table 6. In this way, we ensure that the result snapshots in Table 5 and Table 6 are consistent, We would like to align data lineage in the same table for different ETL jobs. For example, when ETL 1 generates Snapshot 9 in Table 5 From Snapshot 13 in Table 2 and Snapshot 12 in Table 3, we want that the ETL 2 will consume the same Snapshot 13 in Table 2 and Snapshot 12 in Table 3 to produce result Snapshot 15 in Table 6. In this way, we ensure that the result snapshots in Table 5 and Table 6 are consistent, users can perform queries on Table 5 and Table 6 to get a consistency results from Snapshot 9 and Snapshot 15. On the other hand, when users submit a new ETL job to consume Table 5 and Table 6, they can manage the data pipeline from table 1/2/3/4 with a unified version.
...
Code Block |
---|
SELECT k.snapshot_id FROM catalog2.paimon.sink_job_lineage k
JOIN catalog1.paimon.source_job_lineage s
ON s.job=k.job AND s.barrier_id=k.barrier_id
WHERE s.job='job_name' AND s.snapshot_id=10 |
Plan For The Future
WHERE s.job='job_name' AND s.snapshot_id=10 |
Plan For The Future
1. Implement lineage management based on FLIP-314
a) Currently we store table lineage when job creates its source and sink based on pipeline name, in FLIP-314 we get the the source/sink for job directly in the customized listener.
b) The table lineage should be deleted when user submit job failed or job goes to termination, we can get the job status and do the specific operations in 1. Implement lineage management based on FLIP-314
2. The number of snapshots that support checkpoint can be configured
...