This PIP is co-worked by Ming Li , Guojun Li and Fang Yong .

Motivation

Flink streaming and batch jobs can use Paimon tables as data sources and sinks. Flink can be combined with Paimon to complete the entire ETL processing, and the overall process is as follows.

We would like to manage Flink Streaming & Batch ETL processing in Paimon based on the current ability of Flink, including:

1. Table lineage in Paimon. We want to create lineage between Paimon source and sink tables based on Flink job name, then we can manage the entire ETL topology for Paimon tables and Flink jobs.

2. Data lineage for Table. Flink streaming job creates a snapshot for Paimon table when it completes a checkpoint, we can create the snapshot lineage for source and sink tables.

Based on the above lineage, we could support the following capabilities in the integrated Flink ETL processing and Paimon tables

1. Query Paimon tables dependency, which allows users to manage their tables and Flink jobs better.

2. Locating and analyzing issues. When there is a delay in the data of a Paimon, users can get the upstream tables and jobs, check whether there are any issues in them.

3. Data management for streaming and batch ETL, for example, according to the data lineage, users can easily submit debug jobs, compare the results between the debug job and running job.

4. Data correction for streaming ETL. When users need to correct data, they can rollback all tables and jobs to a unify "version" based on the data lineage, and recover the ETL after data correction.

Architecture

We introduce Metadata Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.

1. Create a catalog with options table-lineage and data-lineage . NOTICE: these two options are immutable after a catalog is created.

-- Create a catalog with lineage options
CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'warehouse path',
    'table-lineage' = 'true',
    'data-lineage' = 'true'
);

USE CATALOG my_catalog;

-- Select the options for the current catalog
SELECT * FROM paimon.catalog_options;
+-----------------------+
|     name      | value |
+-----------------------+
| table-lineage | true  |
+-----------------------+
| data-lineage  | true  |
+-----------------------+
2 row in set

2. Create two tables and submit Flink ETL job.

-- Create a word table
CREATE TABLE word_table (word STRING);

-- Create a word count table
CREATE TABLE word_count_table (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BITING
);

-- Submit a flink job to ingest data from kafka to word_table
INSERT INTO word_table SELECT FROM ...;

SET 'pipeline.name'='job1';
INSERT INTO word_count_table
    SELECT word, count(*) FROM word_table 
    GROUP BY word; 

Paimon will create a relationship between word_table  and word_count_table  with job1 , as mentioned above, it includes table lineage and data lineage. For example, Checkpoint 1 in job1 consumes Snapshot 3 of word_table, and products Snapshot 5 in word_count_table, and so on. The information of table and data lineage is shown as follows.

3. Users can debug and validate their data of tables based on the table and data lineage, the steps are

a) Users can get the information that "Snapshot 5 of word_table generates Snapshot 7 in word_count_table" from lineage tables.

b) According to the time travel ability of Paimon, users can submit a Flink or Spark batch job to read Snapshot 5 from word_table, generates data to a new table word_count_table_snapshot_result whose schema is same as word_count_table.

c) Compare the data in word_count_table_snapshot_result and Snapshot 7 of word_count_table to validate the result of streaming ETL.

-- Create table word_count_table_snapshot_result from word_count_table
CREATE TABLE word_count_table_snapshot_result LIKE word_count_table;

-- Read snapshot 5 from word_table and write data to word_count_table_snapshot_result
-- in batch mode
INSERT INTO word_count_table_snapshot_result
    SELECT word, count(*)
    FROM word_table /*+ OPTIONS('scan.snapshot-id'='5') */
    GROUP BY word;

-- Compare the data in word_count_table_snapshot_result with
-- snapshot 7 in word_count_table and get the diffs
SELECT * FROM word_count_table_snapshot_result as L 
    FULL OUTER JOIN word_count_table /*+ OPTIONS('scan.snapshot-id'='7') */ as R
    ON L.word=R.word
    where L.word IS NULL OR R.word IS NULL OR L.cnt != R.cnt;

In addition to the above data validation, users can also debug their jobs. For example, users can modify and submit a new job which will read Snapshot 5 of word_table and product results in a new table. Then they can compare the data with Snapshot 7 in word_count_table to check whether the new job meets their expectations.

4. Users can also correct data based on table and data lineages. Suppose that users want to correct data for table1 from Snapshot 4, and recompute data for downstream tables.

a) Full Recover

1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1

2> According to table lineage, create empty downstream table as table_s3 

3> Submit new ETL job to read full data of Snapshot 4 from table_s1 and Snapshot 6 from table2, then read incremental data

Full recover will read the full data first, then read the incremental data, which may result in a large amount of computation and a lot of time. To avoid this situation, incremental recover can be considered.

b) Increment Recover

Increment recover will start a new Flink ETL job with savepoint, suppose that in the above data lineage, Snapshot 4 in table1 is related to Savepoint 5 in Flink ETL job, then the steps of increment recover:

1> Create new table_s1 based on table1 and insert Snapshot 4 of table1 to table_s1, then users can correct data in table_s1

2> Create downstream table table_s3 from table3 and insert all data of snapshot 2 from table3 to table_s3

3> Submit new Flink ETL job with Savepoint 5 and start to consume increment data from Snapshot 4 in table_s1 and Snapshot 6 in table2.

Proposed Changes

When users add option table-lineage or data-lineage to catalog DDL, Paimon will check and create Metadata Database 'paimon' automatically, then create options table, table lineage table and data lineage table.

1. catalog_options

Catalog options are stored in catalog_options table, it is used for Flink ETL to read lineage related options and create source and sink. There are key and value fields in the catalog_options and key is the primary key of the table.

Column Name

Column Type

Example

key

STRING

"table-lineage"

value

STRING

"true"

2. source_job_lineage

Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

3. sink_job_lineage

Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

4. source_snapshot_lineage

Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

5. sink_snapshot_lineage

Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

1. Query Table Lineage

-- Query upstream tables for given table
SELECT S.database, S.table FROM
    source_job_lineage S
    JOIN sink_job_lineage T ON S.job=T.job
    where T.`database`='myDatabase' and T.`table`='myTable';

-- Query downstream tables for given table
SELECT T.database, T.table FROM
    source_job_lineage S
    JOIN sink_job_lineage T ON S.job=T.job
    where S.`database`='myDatabase' and S.`table`='myTable';

2. Query Data Lineage

-- Query upstream table snapshot for given table and snapshot
SELECT S.database, S.table, S.snapshot_id FROM
    source_snapshot_lineage S
    JOIN sink_snapshot_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123;

-- Query downstream table snapshot for given table and snapshot
SELECT T.database, T.table, T.snapshot_id FROM
    source_job_lineage S
    JOIN sink_job_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123;

The tables in Metadata Database can be divided into two type: the catalog options table and lineage information table. For catalog options table, we can store json format data in the catalog directory, while for lineage tables the requirements are much more complex:

1. Multiple jobs will concurrently write data to the same table, for example, the source operators will write data to source_snapshot_lineage  table.

2. One table has multiple writers, but no committer operator , which is very different from the current data writing in Paimon tables.

3. High data visibility requirements, with basic requirements for real-time visibility for each data write.

According to these requirements, we cannot use regular Paimon tables for Metadata Database. We design the metadata store api for Metadata Database and introduce jdbc as default store. Users can config their jdbc parameters for Metadata Database in create catalog statement as follows

CREATE CATALOG paimon_catalog1 WITH (
    ... // other options
    'lineage-meta' = 'jdbc',
    'jdbc.driver' = 'com.mysql.jdbc.Driver',
    'jdbc.url' = 'XXXXX',
    'jdbc.database' = 'paimon_cata1',    // The default Metadata Database name is `paimon`
    'jdbc.username' = 'XXX',
    'jdbc.password' = 'XXX'
);

Paimon catalog will create metadata store based on the options of metadata to store and get lineage information. The metadata store and factory classes are as follows.

/** Factor to create metadata store. */
public interface LineageMetaFactory {
    /* The identifier for metadata store factor. */
	String identifier();

    /* Create metadata store from specific context. */
    LineageMeta create(LineageMetaContext context);

    /** The context used to create metadata store in the factory. */
	public interface LineageMetaContext {
	    Options options();
    }
}

/* Metadata store will manage the options, table lineage and data lineage information for the catalog. */
public interface LineageMeta {
	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSourceTableLineage(TableLineageEntity entity);

	/**
	 * Delete the source table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSourceTableLineage(String job);

   /**
	 * Get source table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the sink table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSinkTableLineage(TableLineageEntity entity);

	/**
	 * Get sink table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate);

	/**
	 * Delete the sink table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSinkTableLineage(String job);

   /**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSourceDataLineage(DataLineageEntity entity);

	/**
	 * Get source data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSinkDataLineage(DataLineageEntity entity);

	/**
	 * Get sink data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate);
}

/**
 * Table lineage entity with database, table and job for table source and sink lineage.
 */
public interface TableLineageEntity {
	public String getDatabase();

	public String getTable();

	public String getJob();

    public String getCreateTime();
}

/**
 * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage.
 */
public interface DataLineageEntity extends TableLineageEntity {
	public long getBarrierId();

	public long getSnapshotId();
}

We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.

public class JdbcLineageMetaOptions {
	/**
	 * The jdbc driver class for lineage meta.
	 */
	public static ConfigOption<String> JDBC_DRIVER =
			key("jdbc.driver")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc driver class for lineage meta.");
	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_URL =
			key("jdbc.url")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc url for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_USERNAME =
			key("jdbc.username")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc username for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_PASSWORD =
			key("jdbc.password")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc password for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_DATABASE =
			key("jdbc.database")
					.stringType()
					.defaultValue("paimon")
					.withDescription("The jdbc database for lineage meta and default database is `paimon`");
}

/* Jdbc metadata store factory. */
public class JdbcLineageMetaFactory implements LineageMetaFactory {

	public static final String IDENTIFIER = "jdbc";

	@Override
	public String identifier() {
		return IDENTIFIER;
	}

	@Override
	public LineageMeta create(LineageMetaContext context) {
        // Create DataSource from the context
		return new LineageMeta(`The DataSource of database`);
	}
}

/* Jdbc lineage meta. */
public class JdbcLineageMeta implements LineageMeta {
    private DataSource dataSource;
}

Besides jdbc, users can implements their own metadata store and factory to manage the lineage information in their own external system.

We can implement ReadOnlyTable for tables in Metadata Database, then users can query data from these table as regular Paimon table in computation engine such as Flink, Spark and Trino. 

/** Catalog options table for Paimon. */
public class CatalogOptionsTable implements ReadOnlyTable {
}

/** Source/sink table and job lineage table for Paimon. */
public class TableLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore;
}

/** Source/sink data and job lineage table for Paimon. */
public class DataLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore; 
}

 Tables in Metadata Database are read-only and users can query data from metadata tables by SQL just like a regular table. But users can not alter the metadata tables and update the data in them. We provide Flink actions for users to delete data from metadata tables as follows

action

argument

note

delete-table-lineage--job <job-name>: specify name of the job.delete table lineage created by the given job.
delete-data-lineage--job <job-name>: specify name of job.delete data lineage created by the given job.

We introduce a new option table-lineage for Paimon catalog, users can set this option when they create a new catalog.

FlinkTableFactory  in Paimon will create source and sink for Flink ETL job, it can read pipeline.name from Context and save table lineage information for the ETL job.

However, managing Paimon table lineage in this way will have one issue: the lifecycle management of table lineage. When ETL job goes to termination, the table lineage should be deleted manually. Currently we would like to support customized job status listener in FLIP-314 [1], we can solve this issue based on FLIP-314 in the future.

Similar to table lineage, we add a new option data-lineage in Paimon catalog for data lineage. One Paimon table may be consumed by multiple Flink ETL jobs and produce different sink table.

We would like to align data lineage in the same table for different ETL jobs. For example, when ETL 1 generates Snapshot 9 in Table 5 From Snapshot 13 in Table 2 and Snapshot 12 in Table 3, we want that the ETL 2 will consume the same Snapshot 13 in Table 2 and Snapshot 12 in Table 3 to produce result Snapshot 15 in Table 6. In this way, we ensure that the result snapshots in Table 5 and Table 6 are consistent, users can perform queries on Table 5 and Table 6 to get a consistency results from Snapshot 9 and Snapshot 15. On the other hand, when users submit a new ETL job to consume Table 5 and Table 6, they can manage the data pipeline from table 1/2/3/4 with a unified version.

Currently Flink does not support setting a unified version for all sources in the job, we have created an issue [2]. According to the exist ability of Flink, we introduce job_startup table which saves the table snapshot id for job when it starts.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

When submitting the job, based on the existing table and snapshot id, we will query the new source table's corresponding snapshot id from the data lineage table. When creating the source for job, we will put the relevant snapshot id in the configuration and write the corresponding data to the job_startup table. During the execution of streaming jobs, the source will directly consume data based on the corresponding snapshot id.

In the first step we only support snapshot in source and checkpoint in ETL job correspond one-to-one, that means ETL job will trigger checkpoint when it finishes reading one snapshot from source. This helps us simplify runtime management without the need to coordinate checkpoints for multiple jobs. We will support this ability in the future.

To align snapshot and checkpoint for table and ETL job, we introduce AlignedSnapshotSource for Flink which will provide AlignedSourceReader and AlignedEnumerator .

1. AlignedSourceReader implements ExternallyInducedSourceReader , the main functions are

a) Receive checkpoint events

b) Discover snapshots in source table

c) Send splits of snapshot to downstream operator ReadOperator which will read data from table for the split

d) Check whether the received checkpoint should be triggered based on the snapshot and splits

2. AlignedEnumerator will do nothing except sending a CheckpointEvent to AlignedSourceReader when it receives checkpoint from CheckpointCoordinator  to notify SourceReader to trigger checkpoint.

3. CheckpointEvent is a special SourceEvent which contains checkpoint id and will be sent to SourceReader .

public class AlignedSnapshotSource implements Source {

    @Override
    public SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
        throws Exception {
        
    }
    
    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception {
        return new AlignedSourceReader(readerContext);
    }
}

public class AlignedSourceReader implements ExternallyInducedSourceReader {
    
    private Map<Long, List<Split>> discoveredSplits;
    private boolean alignedSnapshotSended;
    private Optional<Long> receivedCheckpoint;
    
    @Override
    public void start() {
        // start async scan to find new snapshot and add to discoveredSplits
    }

    @Override
    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        if (!alignedSnapshotSended && !discoveredSplits.isEmpty()) {
            // send current aligned splits
            alignedSnapshotSended = true;
        }
        return InputStatus.NOTHING_AVAILABLE;
    }

    @Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        // handle checkpoint Events
        receivedCheckpoint = Optional.of(((CheckpointEvent)sourceEvent).checkpointID);
    }
    
    
    @Override
    public Optional<Long> shouldTriggerCheckpoint() {
        if (alignedSnapshotSended && receivedCheckpoint.isPresent()) {
            alignedSnapshotSended = false;
            Optional<Long> lastReceivedCheckpoint = receivedCheckpoint;
            receivedCheckpoint = Optional.empty();
            return lastReceivedCheckpoint;
        }
        return Optional.empty();
    }
}

public class AlignedEnumerator implements SplitEnumerator {

    private final SplitEnumeratorContext<SourceSplit> context;

    @Override
    public Void snapshotState(long checkpointId) throws Exception {
       CheckpointEvent event = new CheckpointEvent(checkpointId);
       for (int i = 0; i < context.currentParallelism(); i++) {
           context.sendEventToSourceReader(i, event);
       }
    }
}

public class CheckpointEvent implements SourceEvent {
    private final long checkpointId;
    public CheckpointEvent(long checkpointId) {
        this.checkpointId = checkpointId;
    }
}

The detailed processes are:

1. AlignedSourceReader discover the latest snapshot from source table and add it to the list.

2. When pollNext method in AlignedSourceReader is called, it will check whether the snapshot belongs to the current checkpoint, if yes, it will send all snapshots of the snapshot to downstream operator.

3. AlignedEnumerator will create CheckpointEvent with checkpoint id and send it to AlignedSourceReader when the method snapshotState is called.

4. When AlignedSourceReader has received CheckpointEvent and the splits of specific snapshot been sent to downstream operator, it will return the checkpoint id in shouldTriggerCheckpoint to trigger the checkpoint.

5. After checkpoint is completed, it will store checkpoint id and snapshot id to source_job_lineage table.

 

Besides source data lineage, we need to save sink data lineage too. We introduce CommitListener for Committer interface. Flink sink will notify CommitListener to write checkpoint id and snapshot id to sink data lineage when the specific snapshot is committed in the sink table.

public interface Committer extends AutoCloseable {
    void commit(List<ManifestCommittable> globalCommittables, CommitListener commitListener)
        throws IOException, InterruptedException;
}

public interface CommitListener {
    /** Notifies the listener that the commit with the given {@code identifier} completed. */
    void notifyCommitComplete(long identifier, long snapshotId, Snapshot.CommitKind commitKind);
}

There will be a Metadata Database in each catalog to manage table and data lineage. For the table and data lineage crossing multiple catalogs, users can join the lineage tables in different catalogs. For example, there are two catalogs, Flink ETL job reads data from catalog1 and writes data to catalog2. Users can query data lineage from source_snapshot_lineage in catalog1 and sink_snapshot_lineage in catalog2.

SELECT k.snapshot_id FROM catalog2.paimon.sink_job_lineage k
    JOIN catalog1.paimon.source_job_lineage s
    ON s.job=k.job AND s.barrier_id=k.barrier_id
    WHERE s.job='job_name' AND s.snapshot_id=10

1. Implement lineage management based on FLIP-314

    a) Currently we store table lineage when job creates its source and sink based on pipeline name, in FLIP-314 we get the the source/sink for job directly in the customized listener.

    b) The table lineage should be deleted when user submit job failed or job goes to termination, we can get the job status and do the specific operations in FLIP-314

2. The number of snapshots that support checkpoint can be configured

3. Support branch in Paimon table that data management does not require copying data


[1] FLIP-314: Support Customized Job Lineage Listener

[2] https://issues.apache.org/jira/browse/FLINK-32042




  • No labels