Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

...

Architecture

We introduce System Metadata Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.

...

draw.io Diagram
bordertrue
diagramNameincremental correction
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth372
revision1

Proposed Changes

...

When users add option table-lineage or data-lineage to catalog DDL, Paimon will check and create system database Metadata Database 'paimon' automatically, then create options table, table lineage table and data lineage table.

...

Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

3. sink_job_lineage

Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

4. source_snapshot_lineage

Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

5. sink_snapshot_lineage

Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

1. Query Table Lineage

...

Code Block
-- Query upstream table snapshot for given table and snapshot
SELECT S.database, S.table, S.snapshot_id FROM
    source_snapshot_lineage S
    JOIN sink_snapshot_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123;

-- Query downstream table snapshot for given table and snapshot
SELECT T.database, T.table, T.snapshot_id FROM
    source_job_lineage S
    JOIN sink_job_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123;

...

1. Concurrently Writing In Lineage Tables

There will be multiple jobs concurrently writing records to data lineage The tables in System Database. To avoid conflicts, the data lineage tables such as source_snapshot_lineage and sink_snapshot_lineage use job name as partition field. The source reader operator and commit operator for each Flink ETL job writes data to different partitions independently. draw.io DiagrambordertruediagramNamedata lineage partitionsimpleViewerfalsewidthlinksautotbstyletoplboxtruediagramWidth981revision2

2. Snapshots In Lineage Tables

3. Actions For Lineage Tables

Metadata Database can be divided into two type: the catalog options table and lineage information table. For catalog options table, we can store json format data in the catalog directory, while for lineage tables the requirements are much more complex:

1. Multiple jobs will concurrently write data to the same table, for example, the source operators will write data to source_snapshot_lineage  table.

2. One table has multiple writers, but no committer operator , which is very different from the current data writing in Paimon tables.

3. High data visibility requirements, with basic requirements for real-time visibility for each data write.

According to these requirements, we cannot use regular Paimon tables for Metadata Database. We design the metadata store api for Metadata Database and introduce jdbc as default store. Users can config their jdbc parameters for Metadata Database in create catalog statement as follows

Code Block
CREATE CATALOG paimon_catalog1 WITH (
    ... // other options
    'lineage-meta' = 'jdbc',
    'jdbc.driver' = 'com.mysql.jdbc.Driver',
    'jdbc.url' = 'XXXXX',
    'jdbc.database' = 'paimon_cata1',    // The default Metadata Database name is `paimon`
    'jdbc.username' = 'XXX',
    'jdbc.password' = 'XXX'
);

Paimon catalog will create metadata store based on the options of metadata to store and get lineage information. The metadata store and factory classes are as follows.

Code Block
/** Factor to create metadata store. */
public interface LineageMetaFactory {
    /* The identifier for metadata store factor. */
	String identifier();

    /* Create metadata store from specific context. */
    LineageMeta create(LineageMetaContext context);

    /** The context used to create metadata store in the factory. */
	public interface LineageMetaContext {
	    Options options();
    }
}

/* Metadata store will manage the options, table lineage and data lineage information for the catalog. */
public interface LineageMeta {
	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSourceTableLineage(TableLineageEntity entity);

	/**
	 * Delete the source table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSourceTableLineage(String job);

   /**
	 * Get source table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the sink table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSinkTableLineage(TableLineageEntity entity);

	/**
	 * Get sink table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate);

	/**
	 * Delete the sink table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSinkTableLineage(String job);

   /**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSourceDataLineage(DataLineageEntity entity);

	/**
	 * Get source data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSinkDataLineage(DataLineageEntity entity);

	/**
	 * Get sink data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate);
}

/**
 * Table lineage entity with database, table and job for table source and sink lineage.
 */
public interface TableLineageEntity {
	public String getDatabase();

	public String getTable();

	public String getJob();

    public String getCreateTime();
}

/**
 * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage.
 */
public interface DataLineageEntity extends TableLineageEntity {
	public long getBarrierId();

	public long getSnapshotId();
}

We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.

Code Block
public class JdbcLineageMetaOptions {
	/**
	 * The jdbc driver class for lineage meta.
	 */
	public static ConfigOption<String> JDBC_DRIVER =
			key("jdbc.driver")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc driver class for lineage meta.");
	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_URL =
			key("jdbc.url")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc url for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_USERNAME =
			key("jdbc.username")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc username for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_PASSWORD =
			key("jdbc.password")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc password for lineage meta.");

	/**
	 * The jdbc url for lineage meta.
	 */
	public static ConfigOption<String> JDBC_DATABASE =
			key("jdbc.database")
					.stringType()
					.defaultValue("paimon")
					.withDescription("The jdbc database for lineage meta and default database is `paimon`");
}

/* Jdbc metadata store factory. */
public class JdbcLineageMetaFactory implements LineageMetaFactory {

	public static final String IDENTIFIER = "jdbc";

	@Override
	public String identifier() {
		return IDENTIFIER;
	}

	@Override
	public LineageMeta create(LineageMetaContext context) {
        // Create DataSource from the context
		return new LineageMeta(`The DataSource of database`);
	}
}

/* Jdbc lineage meta. */
public class JdbcLineageMeta implements LineageMeta {
    private DataSource dataSource;
}

Besides jdbc, users can implements their own metadata store and factory to manage the lineage information in their own external system.

We can implement ReadOnlyTable for tables in Metadata Database, then users can query data from these table as regular Paimon table in computation engine such as Flink, Spark and Trino. 

Code Block
/** Catalog options table for Paimon. */
public class CatalogOptionsTable implements ReadOnlyTable {
}

/** Source/sink table and job lineage table for Paimon. */
public class TableLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore;
}

/** Source/sink data and job lineage table for Paimon. */
public class DataLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore; 
}

 Tables in Metadata Database Tables in system database are read-only and users can query data from system metadata tables by SQL just like a regular table. But users can not alter the system metadata tables and update the data in them. We provide Flink actions for users to delete data from system metadata tables as follows

action

argument

note

delete-table-lineage--job <job-name>: specify name of the job.delete table lineage created by the given job.
delete-data-lineage--job <job-name>: specify name of job.delete data lineage created by the given job.

...

There will be a system database Metadata Database in each catalog to manage table and data lineage. For the table and data lineage crossing multiple catalogs, users can join the lineage tables in different catalogs. For example, there are two catalogs, Flink ETL job reads data from catalog1 and writes data to catalog2. Users can query data lineage from source_snapshot_lineage in catalog1 and sink_snapshot_lineage in catalog2.

...