Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
CREATE CATALOG paimon_catalog1 WITH (
    ... // other options
    'metadata.storelineage-meta' = 'jdbc',
    'metadata.jdbc.driver' = 'com.mysql.jdbc.Driver',
    'metadata.jdbc.url' = 'XXXXX',
    'metadata.jdbc.database' = 'paimon_cata1',    // The default Metadata Database name is `paimon`
    'metadata.jdbc.username' = 'XXX',
    'metadata.jdbc.password' = 'XXX'
);

...

Code Block
/** Factor to create metadata store. */
public interface MetadataStoreFactoryLineageMetaFactory {
    /* The identifier for metadata store factor. */
	String identifier();

    /* Create metadata store from specific context. */
	MetadataStore    LineageMeta create(MetadataStoreContextLineageMetaContext context);

    /** The context used to create metadata store in the factory. */
	public interface MetadataStoreContextLineageMetaContext {
	    Options options();
    }
}

/* Metadata store will manage the options, table lineage and data lineage information for the catalog. */
public interface MetadataStoreLineageMeta {
	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSourceTableLineage(TableLineageEntity entity);

	/**
	 * Delete the source table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSourceTableLineage(String job);

   /**
	 * Get source table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate);

	/**
	 * Store the sink table and job lineage.
	 *
	 * @param entity the table lineage entity
	 */
	void storeSinkTableLineage(TableLineageEntity entity);

	/**
	 * Get sink table and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate);

	/**
	 * Delete the sink table lineage for given job.
	 *
	 * @param job the job for table lineage
	 */
	void deleteSinkTableLineage(String job);

   /**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSourceDataLineage(DataLineageEntity entity);

	/**
	 * Get source data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate);

	/**
	 * Store the source table and job lineage.
	 *
	 * @param entity the data lineage entity
	 */
	void storeSinkDataLineage(DataLineageEntity entity);

	/**
	 * Get sink data and job lineages.
	 *
	 * @param predicate the predicate for the table lineages
	 * @return the iterator for source table and job lineages
	 */
	Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate);
}

/**
 * Table lineage entity with database, table and job for table source and sink lineage.
 */
public interface TableLineageEntity {
	public String getDatabase();

	public String getTable();

	public String getJob();

    public String getCreateTime();
}

/**
 * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink lineage.
 */
public interface DataLineageEntity extends TableLineageEntity {
	public long getBarrierId();

	public long getSnapshotId();
}

We default to implementing the jdbc metadata store, which supports the use of external databases to store this information.

Code Block
public class JdbcMetadataStoreOptionsJdbcLineageMetaOptions {
	/**
	 * The jdbc driver class for metadatalineage storemeta.
	 */
	public static ConfigOption<String> METADATA_JDBC_DRIVER =
			key("metadata.jdbc.driver")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc driver class for metadatalineage storemeta.");
	/**
	 * The jdbc url for metadatalineage storemeta.
	 */
	public static ConfigOption<String> METADATA_JDBC_URL =
			key("metadata.jdbc.url")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc url for metadatalineage storemeta.");

	/**
	 * The jdbc url for metadatalineage storemeta.
	 */
	public static ConfigOption<String> METADATA_JDBC_USERNAME =
			key("metadata.jdbc.username")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc username for metadatalineage storemeta.");

	/**
	 * The jdbc url for metadatalineage storemeta.
	 */
	public static ConfigOption<String> METADATA_JDBC_PASSWORD =
			key("metadata.jdbc.password")
					.stringType()
					.noDefaultValue()
					.withDescription("The jdbc password for metadatalineage storemeta.");

	/**
	 * The jdbc url for metadatalineage storemeta.
	 */
	public static ConfigOption<String> METADATA_JDBC_DATABASE =
			key("metadata.jdbc.database")
					.stringType()
					.defaultValue("paimon")
					.withDescription("The jdbc database for metadatalineage storemeta and default database is `paimon`");
}

/* Jdbc metadata store factory. */
public class JdbcMetadataStoreFactoryJdbcLineageMetaFactory implements MetadataStoreFactoryLineageMetaFactory {

	public static final String IDENTIFIER = "jdbc";

	@Override
	public String identifier() {
		return IDENTIFIER;
	}

	@Override
	public MetadataStoreLineageMeta create(MetadataStoreContextLineageMetaContext context) {
        // Create DataSource from the context
		return new JdbcMetadataStoreLineageMeta(`The DataSource of database`);
	}
}

/* Jdbc metadatalineage storemeta. */
public class JdbcMetadataStoreJdbcLineageMeta implements MetadataStoreLineageMeta {
    private DataSource dataSource;
}

...