Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Architecture

We introduce System Metadata Database named "paimon" for each catalog in Paimon, it stores the properties of catalog and lineage information. Users can create a Paimon catalog with table and data lineage options, and if the source or sink tables of ETL job are in the catalog, the lineage information will be saved. The overall process is as followed.

...

draw.io Diagram
bordertrue
diagramNameincremental correction
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth372
revision1

Proposed Changes

...

When users add option table-lineage or data-lineage to catalog DDL, Paimon will check and create system database Metadata Database 'paimon' automatically, then create options table, table lineage table and data lineage table.

...

Table souce_job_lineage stores relationship between source table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

3. sink_job_lineage

Table sink_job_lineage stores relationship between sink table and ETL job. The job, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

database

STRING

"myDatabase"

table

STRING

"myTableName"

create_time

DATETIME

"2023-06-05 13:29:01"

4. source_snapshot_lineage

Table source_snapshot_lineage stores the data lineage between source and ETL job, it will record which snapshot id in source table is consumed by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

5. sink_snapshot_lineage

Table sink_snapshot_lineage stores the data lineage between ETL job and sink, it will record which snapshot id in sink table is produced by the specific barrier id of ETL job. The job, barrier_id, database and table fields form a joint primary key.

Column Name

Column Type

Example

job

STRING

"myJob"

barrier_id

LONG

1

database

STRING

"myDatabase"

table

STRING

"myTableName"

snapshot_id

LONG

1

create_time

DATETIME

"2023-06-05 13:29:01"

1. Query Table Lineage

...

Code Block
-- Query upstream table snapshot for given table and snapshot
SELECT S.database, S.table, S.snapshot_id FROM
    source_snapshot_lineage S
    JOIN sink_snapshot_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where T.`database`='myDatabase' and T.`table`='myTable' and T.snapshot_id=123;

-- Query downstream table snapshot for given table and snapshot
SELECT T.database, T.table, T.snapshot_id FROM
    source_job_lineage S
    JOIN sink_job_lineage T
    ON S.job=T.job AND S.barrier_id=T.barrier_id
    where S.`database`='myDatabase' and S.`table`='myTable' and S.snapshot_id=123;

...

The tables in System Metadata Database can be divided into two type: the catalog options table and lineage information table. For catalog options table, we can store json format data in the catalog directory, and while for lineage tables the requirements are much more complex as follows:

1. Multiple jobs will concurrently write data to the same table, for example, the source operators will write data to source_snapshot_lineage  table.

2. One table has multiple writers, but no committer operator , which is very different from the current data write of writing in Paimon tables.

3. High data visibility requirements, with basic requirements for real-time visibility for each data write.

According to these requirements, we cannot use regular Paimon tables for system databaseMetadata Database. We design the system database api metadata store api for Metadata Database and introduce jdbc as default metadata store for system database. Users can config their jdbc parameters for system database Metadata Database in create catalog statement as follows

Code Block
CREATE CATALOG paimon_catalog1 WITH (
    ... // other options
    'metadata.store' = 'jdbc',
    'metadata.jdbc.driver' = 'com.mysql.jdbc.Driver',
    'metadata.jdbc.url' = 'XXXXX',
    'metadata.jdbc.database' = 'paimon_cata1',    // The default systemMetadata databaseDatabase name is `paimon`
    'metadata.jdbc.username' = 'XXX',
    'metadata.jdbc.password' = 'XXX'
);

...

Code Block
/** Factor to create metadata store. */
public interface MetadataStoreFactory {
    /* The identifier for metadata store factor. */
	String identifier();

    /* Create metadata store from specific context. */
	MetadataStore create(MetadataStoreContext context);

    /** The context used to create metadata store in the factory. */
	public interface MetadataStoreContext {
	    Options options();
    }
}

/* Metadata store will manage the options, table lineage and data lineage information for the catalog. */
public interface MetadataStore {
	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSourceTableLineage(TableLineageEntity entity);

	/**
	 * Delete the source table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSourceTableLineage(String job);

   /**
	 * Get source table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the sink table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSinkTableLineage(TableLineageEntity entity);

	/**
	 * Get sink table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate sinkTableLineages(@Nullable Predicate predicate);

	/**
	 * Delete the sink table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSinkTableLineage(String job);

	   /**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSourceDataLineage(DataLineageEntity entity);

	/**
	 * Get source data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSinkDataLineage(DataLineageEntity entity);

	/**
	 * Get sink data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate);
}

/**
 * Table lineage entity with database, table and job for table source and sink lineage.
 */
public interface TableLineageEntity {
	public String getDatabase();

	public String getTable();

	public String getJob();

    public String getCreateTime();
}

/**
 * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage.
 */
public interface DataLineageEntity extends TableLineageEntity {
	public long getBarrierId();

	public long getSnapshotId();
}

We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.

Code Block

public class JdbcMetadataStoreOptions {
	/**
	 * The jdbc driver class for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_DRIVER =
			key("metadata.jdbc.driver")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc driver class for metadata store.");
	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_URL =
			key("metadata.jdbc.url")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc url for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_USERNAME =
			key("metadata.jdbc.username")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc username for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_PASSWORD =
			key("metadata.jdbc.password")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc password for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_DATABASE =
			key("metadata.jdbc.database")
					.stringType()
					.defaultValue("paimon")
					.withDescription("The jdbc database for metadata store and default database is `paimon_system``paimon`");
}

/* Jdbc metadata store factory. */
public class JdbcMetadataStoreFactory implements MetadataStoreFactory {

	public static final String IDENTIFIER = "jdbc";

	@Override
	public String identifier() {
		return IDENTIFIER;
	}

	@Override
	public MetadataStore create(MetadataStoreContext context) {
        // Create DataSource from the context
		return new JdbcMetadataStore(`The DataSource of database`);
	}
}

/* Jdbc metadata store. */
public class JdbcMetadataStore implements MetadataStore {
    private DataSource dataSource;
}

Besides jdbc, users can implements their own metadata store and factory to manage the lineage information in their own external system.

...

We can implement ReadOnlyTable for tables in System Metadata Database, then users can query data from these table as regular Paimon table in computation engine such as Flink, Spark and Trino. 

Code Block
/** Catalog options table for Paimon. */
public class CatalogOptionsTable implements ReadOnlyTable {
}

/** Source/sink table and job lineage table for Paimon. */
public class TableLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore;
}

/** Source/sink data and job lineage table for Paimon. */
public class DataLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore; 
}

 Tables in system database Metadata Database are read-only and users can query data from system metadata tables by SQL just like a regular table. But users can not alter the system metadata tables and update the data in them. We provide Flink actions for users to delete data from system metadata tables as follows

action

argument

note

delete-table-lineage--job <job-name>: specify name of the job.delete table lineage created by the given job.
delete-data-lineage--job <job-name>: specify name of job.delete data lineage created by the given job.

...

There will be a system database Metadata Database in each catalog to manage table and data lineage. For the table and data lineage crossing multiple catalogs, users can join the lineage tables in different catalogs. For example, there are two catalogs, Flink ETL job reads data from catalog1 and writes data to catalog2. Users can query data lineage from source_snapshot_lineage in catalog1 and sink_snapshot_lineage in catalog2.

...