Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Concurrently Writing In Lineage Tables

There will be multiple jobs concurrently writing records to data lineage The tables in System Database . To avoid conflicts, the data lineage tables such as source_snapshot_lineage and sink_snapshot_lineage use job name as partition field. The source reader operator and commit operator for each Flink ETL job writes data to different partitions independently. draw.io DiagrambordertruediagramNamedata lineage partitionsimpleViewerfalsewidthlinksautotbstyletoplboxtruediagramWidth981revision2

2. Snapshots In Lineage Tables

3. Actions For Lineage Tables

can be divided into two type: the catalog options table and lineage information table. For catalog options table, we can store json format data in the catalog directory, and for lineage tables the requirements are much more complex as follows:

1. Multiple jobs will concurrently write data to the same table, for example, the source operators will write data to source_snapshot_lineage  table.

2. One table has multiple writers, but no committer operator , which is very different from the current data write of Paimon tables.

3. High data visibility requirements, with basic requirements for real-time visibility for each data write.

According to these requirements, we cannot use regular Paimon tables for system database. We design the system database api and introduce jdbc as default metadata store for system database. Users can config their jdbc parameters for system database in create catalog statement as follows

Code Block
CREATE CATALOG paimon_catalog1 WITH (
    ... // other options
    'metadata.store' = 'jdbc',
    'metadata.jdbc.driver' = 'com.mysql.jdbc.Driver',
    'metadata.jdbc.url' = 'XXXXX',
    'metadata.jdbc.database' = 'paimon_cata1',    // The default system database name is `paimon`
    'metadata.jdbc.username' = 'XXX',
    'metadata.jdbc.password' = 'XXX'
);

Paimon catalog will create metadata store based on the options of metadata to store and get lineage information. The metadata store and factory classes are as follows.

Code Block
/** Factor to create metadata store. */
public interface MetadataStoreFactory {
    /* The identifier for metadata store factor. */
	String identifier();

    /* Create metadata store from specific context. */
	MetadataStore create(MetadataStoreContext context);

    /** The context used to create metadata store in the factory. */
	public interface MetadataStoreContext {
	    Options options();
    }
}

/* Metadata store will manage the options, table lineage and data lineage information for the catalog. */
public interface MetadataStore {
	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSourceTableLineage(TableLineageEntity entity);

	/**
	 * Get source table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the sink table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSinkTableLineage(TableLineageEntity entity);

	/**
	 * Get sink table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSourceDataLineage(DataLineageEntity entity);

	/**
	 * Get source data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSinkDataLineage(DataLineageEntity entity);

	/**
	 * Get sink data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate);
}

/**
 * Table lineage entity with database, table and job for table source and sink lineage.
 */
public interface TableLineageEntity {
	public String getDatabase();

	public String getTable();

	public String getJob();
}

/**
 * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage.
 */
public interface DataLineageEntity extends TableLineageEntity {
	public long getBarrierId();

	public long getSnapshotId();
}

We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.

Code Block

public class JdbcMetadataStoreOptions {
	/**
	 * The jdbc driver class for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_DRIVER =
			key("metadata.jdbc.driver")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc driver class for metadata store.");
	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_URL =
			key("metadata.jdbc.url")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc url for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_USERNAME =
			key("metadata.jdbc.username")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc username for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_PASSWORD =
			key("metadata.jdbc.password")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc password for metadata store.");

	/**
	 * The jdbc url for metadata store.
	 */
	public static ConfigOption<String> METADATA_JDBC_DATABASE =
			key("metadata.jdbc.database")
					.stringType()
					.defaultValue("paimon")
					.withDescription("The jdbc database for metadata store and default database is `paimon_system`");
}

/* Jdbc metadata store factory. */
public class JdbcMetadataStoreFactory implements MetadataStoreFactory {

	public static final String IDENTIFIER = "jdbc";

	@Override
	public String identifier() {
		return IDENTIFIER;
	}

	@Override
	public MetadataStore create(MetadataStoreContext context) {
        // Create DataSource from the context
		return new JdbcMetadataStore(`The DataSource of database`);
	}
}

Besides jdbc, users can implements their own metadata store and factory to manage the lineage information in their own external system.

We can implement ReadOnlyTable for tables in System Database, then users can query data from these table as regular Paimon table in computation engine such as Flink, Spark and Trino. 

Code Block
/** Catalog options table for Paimon. */
public class CatalogOptionsTable implements ReadOnlyTable {
}

/** Source/sink table and job lineage table for Paimon. */
public class TableLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore;
}

/** Source/sink data and job lineage table for Paimon. */
public class DataLineageTable implements ReadOnlyTable {
    private final MetadataStore metadataStore; 
}

 Tables Tables in system database are read-only and users can query data from system tables by SQL just like a regular table. But users can not alter the system tables and update the data in them. We provide Flink actions for users to delete data from system tables as follows

...