Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Catalog options are stored in catalog_options table, it is used for Flink ETL to read lineage related options and create source and sink. There are key and value fields in the catalog_options and key is the primary key of the table.

Column Name

Column Type

Example

key

STRING

"table-lineage"

value

STRING

"true"

2. source_job_lineage

Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

3. sink_job_lineage

Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

4. source_snapshot_lineage

Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

5. sink_snapshot_lineage

Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

1. Query Table Lineage

...

However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based FLIP-314.

Similar to table lineage, we add a new option data-lineage in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.

draw.io Diagram
bordertrue
diagramNamedata lineage source etl
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision1

We would like to align data lineage in the same table for different ETL jobs. For example, when ETL 1 generates Snapshot 9 in Table 5 From Snapshot 13 in Table 2 and Snapshot 12 in Table 3, we want that the ETL 2 will consume the same Snapshot 13 in Table 2 and Snapshot 12 in Table 3 to produce result Snapshot 15 in Table 6. In this way, we ensure that the result snapshots in Table 5 and Table 6 are consistent, users can perform queries on Table 5 and Table 6 to get a consistency results from Snapshot 9 and Snapshot 15. On the other hand, when users submit a new ETL job to consume Table 5 and Table 6, they can manage the data pipeline from table 1/2/3/4 with a unified version.

Currently Flink does not support setting a unified version for all sources in the job, we have created an issue [2]. According to the exist ability of Flink, we introduce job_startup table which saves the table snapshot id for job when it starts.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

When submitting the job, based on the existing table and snapshot id, we will query the new source table's corresponding snapshot id from the data lineage table. When creating the source for job, we will put the relevant snapshot id in the configuration and write the corresponding data to the job_startup table. During the execution of streaming jobs, the source will directly consume data based on the corresponding snapshot id.

In the first step we only support snapshot in source and checkpoint in ETL job correspond one-to-one, that means ETL job will trigger checkpoint when it finishes reading one snapshot from source. This helps us simplify runtime management without the need to coordinate checkpoints for multiple jobs. We will support this ability in the future.

To align snapshot and checkpoint for table and ETL job, we introduce AlignedSnapshotSource for Flink which will provide AlignedSourceReader and AlignedEnumerator .

1. AlignedSourceReader implements ExternallyInducedSourceReader , the main functions are

a) Receive checkpoint events

b) Discover snapshots in source table

c) Send splits of snapshot to downstream operator ReadOperator which will read data from table for the split

d) Check whether the received checkpoint should be triggered based on the snapshot and splits

2. AlignedEnumerator will do nothing except sending a CheckpointEvent to AlignedSourceReader when it receives checkpoint from CheckpointCoordinator  to notify SourceReader to trigger checkpoint.

3. CheckpointEvent is a special SourceEvent which contains checkpoint id and will be sent to SourceReader .

Code Block
public class AlignedSnapshotSource implements Source {

    @Override
    public SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
        throws Exception {
        
    }
    
    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception {
        return new AlignedSourceReader(readerContext);
    }
}

public class AlignedSourceReader implements ExternallyInducedSourceReader {
    
    private Map<Long, List<Split>> discoveredSplits;
    private boolean alignedSnapshotSended;
    private Optional<Long> receivedCheckpoint;
    
    @Override
    public void start() {
        // start async scan to find new snapshot and add to discoveredSplits
    }

    @Override
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        if (!alignedSnapshotSended && !discoveredSplits.isEmpty()) {
            // send current aligned splits
            alignedSnapshotSended = true;
        }
        return InputStatus.NOTHING_AVAILABLE;
    }

    @Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        // handle checkpoint Events
        receivedCheckpoint = Optional.of(((CheckpointEvent)sourceEvent).checkpointID);
    }
    
    
    @Override
    public Optional<Long> shouldTriggerCheckpoint() {
        if (alignedSnapshotSended && receivedCheckpoint.isPresent()) {
            alignedSnapshotSended = false;
            Optional<Long> lastReceivedCheckpoint = receivedCheckpoint;
            receivedCheckpoint = Optional.empty();
            return lastReceivedCheckpoint;
        }
        return Optional.empty();
    }
}

public class AlignedEnumerator implements SplitEnumerator {

    private final SplitEnumeratorContext<SourceSplit> context;

    @Override
    public Void snapshotState(long checkpointId) throws Exception {
       CheckpointEvent event = new CheckpointEvent(checkpointId);
       for (int i = 0; i < context.currentParallelism(); i++) {
           context.sendEventToSourceReader(i, event);
       }
    }
}

public class CheckpointEvent implements SourceEvent {
    private final long checkpointId;
    public CheckpointEvent(long checkpointId) {
        this.checkpointId = checkpointId;
    }
}

The detailed processes are:

1. AlignedSourceReader discover the latest snapshot from source table and add it to the list.

2. When pollNext method in AlignedSourceReader is called, it will check whether the snapshot belongs to the current checkpoint, if yes, it will send all snapshots of the snapshot to downstream operator.

3. AlignedEnumerator will create CheckpointEvent with checkpoint id and send it to AlignedSourceReader when the method snapshotState is called.

4. When AlignedSourceReader has received CheckpointEvent and the splits of specific snapshot been sent to downstream operator, it will return the checkpoint id in shouldTriggerCheckpoint to trigger the checkpoint.

5. After checkpoint is completed, it will store checkpoint id and snapshot id to source_job_lineage table.

draw.io Diagram
bordertrue
diagramNamestore source snapshot checkpoint
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth569
revision1
 

Besides source data lineage, we need to save sink data lineage too. We introduce CommitListener for Committer interface. Flink sink will notify CommitListener to write checkpoint id and snapshot id to sink data lineage when the specific snapshot is committed in the sink table.

Code Block
public interface Committer extends AutoCloseable {
    void commit(List<ManifestCommittable> globalCommittables, CommitListener commitListener)
        throws IOException, InterruptedException;
}

public interface CommitListener {
    /** Notifies the listener that the commit with the given {@code identifier} completed. */
    void notifyCommitComplete(long identifier, long snapshotId, Snapshot.CommitKind commitKind);
}

There will be a system database in each catalog to manage table and data lineage. For the table and data lineage crossing multiple catalogs, users can join the lineage tables in different catalogs. For example, there are two catalogs, Flink ETL job reads data from catalog1 and writes data to catalog2. Users can query data lineage from source_snapshot_lineage in catalog1 and sink_snapshot_lineage in catalog2.

draw.io Diagram
bordertrue
diagramNamecross catalog
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth381
revision1

Code Block
SELECT k.snapshot_id FROM catalog2.paimon.sink_job_lineage k
    JOIN catalog1.paimon.source_job_lineage s
    ON s.job=k.job AND s.barrier_id=k.barrier_id
    WHERE s.job='job_name' AND s.snapshot_id=10

1. Implement lineage management based on FLIP-314

2. The number of snapshots that support checkpoint can be configured

3. Support branch in Paimon table that data management does not require copying data


[1] FLIP-314: Support Customized Job Lineage Listener

[2] https://issues.apache.org/jira/browse/FLINK-32042[1] FLIP-314: Support Customized Job Lineage Listener