THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
CREATE CATALOG paimon_catalog1 WITH ( ... // other options 'metadata.storelineage-meta' = 'jdbc', 'metadata.jdbc.driver' = 'com.mysql.jdbc.Driver', 'metadata.jdbc.url' = 'XXXXX', 'metadata.jdbc.database' = 'paimon_cata1', // The default Metadata Database name is `paimon` 'metadata.jdbc.username' = 'XXX', 'metadata.jdbc.password' = 'XXX' ); |
...
Code Block |
---|
/** Factor to create metadata store. */ public interface MetadataStoreFactoryLineageMetaFactory { /* The identifier for metadata store factor. */ String identifier(); /* Create metadata store from specific context. */ MetadataStore LineageMeta create(MetadataStoreContextLineageMetaContext context); /** The context used to create metadata store in the factory. */ public interface MetadataStoreContextLineageMetaContext { Options options(); } } /* Metadata store will manage the options, table lineage and data lineage information for the catalog. */ public interface MetadataStoreLineageMeta { /** * Store the source table and job lineage. * * @param entity the table lineage entity */ void storeSourceTableLineage(TableLineageEntity entity); /** * Delete the source table lineage for given job. * * @param job the job for table lineage */ void deleteSourceTableLineage(String job); /** * Get source table and job lineages. * * @param predicate the predicate for the table lineages * @return the iterator for source table and job lineages */ Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate); /** * Store the sink table and job lineage. * * @param entity the table lineage entity */ void storeSinkTableLineage(TableLineageEntity entity); /** * Get sink table and job lineages. * * @param predicate the predicate for the table lineages * @return the iterator for source table and job lineages */ Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate); /** * Delete the sink table lineage for given job. * * @param job the job for table lineage */ void deleteSinkTableLineage(String job); /** * Store the source table and job lineage. * * @param entity the data lineage entity */ void storeSourceDataLineage(DataLineageEntity entity); /** * Get source data and job lineages. * * @param predicate the predicate for the table lineages * @return the iterator for source table and job lineages */ Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate); /** * Store the source table and job lineage. * * @param entity the data lineage entity */ void storeSinkDataLineage(DataLineageEntity entity); /** * Get sink data and job lineages. * * @param predicate the predicate for the table lineages * @return the iterator for source table and job lineages */ Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate); } /** * Table lineage entity with database, table and job for table source and sink lineage. */ public interface TableLineageEntity { public String getDatabase(); public String getTable(); public String getJob(); public String getCreateTime(); } /** * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage. */ public interface DataLineageEntity extends TableLineageEntity { public long getBarrierId(); public long getSnapshotId(); } |
We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.
Code Block |
---|
public class JdbcMetadataStoreOptionsJdbcLineageMetaOptions { /** * The jdbc driver class for metadatalineage storemeta. */ public static ConfigOption<String> METADATA_JDBC_DRIVER = key("metadata.jdbc.driver") .stringType() .noDefaultValue() .withDescription("The jdbc driver class for metadatalineage storemeta."); /** * The jdbc url for metadatalineage storemeta. */ public static ConfigOption<String> METADATA_JDBC_URL = key("metadata.jdbc.url") .stringType() .noDefaultValue() .withDescription("The jdbc url for metadatalineage storemeta."); /** * The jdbc url for metadatalineage storemeta. */ public static ConfigOption<String> METADATA_JDBC_USERNAME = key("metadata.jdbc.username") .stringType() .noDefaultValue() .withDescription("The jdbc username for metadatalineage storemeta."); /** * The jdbc url for metadatalineage storemeta. */ public static ConfigOption<String> METADATA_JDBC_PASSWORD = key("metadata.jdbc.password") .stringType() .noDefaultValue() .withDescription("The jdbc password for metadatalineage storemeta."); /** * The jdbc url for metadatalineage storemeta. */ public static ConfigOption<String> METADATA_JDBC_DATABASE = key("metadata.jdbc.database") .stringType() .defaultValue("paimon") .withDescription("The jdbc database for metadatalineage storemeta and default database is `paimon`"); } /* Jdbc metadata store factory. */ public class JdbcMetadataStoreFactoryJdbcLineageMetaFactory implements MetadataStoreFactoryLineageMetaFactory { public static final String IDENTIFIER = "jdbc"; @Override public String identifier() { return IDENTIFIER; } @Override public MetadataStoreLineageMeta create(MetadataStoreContextLineageMetaContext context) { // Create DataSource from the context return new JdbcMetadataStoreLineageMeta(`The DataSource of database`); } } /* Jdbc metadatalineage storemeta. */ public class JdbcMetadataStoreJdbcLineageMeta implements MetadataStoreLineageMeta { private DataSource dataSource; } |
...