THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
/** * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage. */ @PublicEvolvig public interface LineageGraph { /* Source lineage vertex list. */ List<SourceLineageVertex> sources(); /* Sink lineage vertex list. */ List<LineageVertex> sinks(); /* lineage edges from sources to sinks. */ List<LineageEdge> relations(); } /** Lineage vertex represents the connectors in lineage graph, including source and sink. */ @PublicEvolving public interface LineageVertex { /* List of input (for source) or output (for sink) datasets interacted with by the connector */ List<LineageDataset> datasets(); } /** Lineage dataset represents the source or sink in the job. */ @PublicEvolving public interface LineageDataset { /* Name for this particular dataset. */ String name; /* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */ String namespace; /* Facets for the lineage vertex to describe the particular information of dataset, such as schema and config. */ Map<String, Facet> facets; } /** Facet interface for dataset. */ @PublicEvolving public interface LineageDatasetFacet { /** Name for the facet which will be used as key in facets; } of LineageDataset. */ String name(); } /** Builtin config facet for dataset. */ @PublicEvolving public interface DatasetConfigFacet extends LineageDatasetFacet { Map<String, String> config(); } /** Field for schema in dataset. */ public interface DatasetSchemaField<T> { /** The name of the field. */ String name(); /** BuiltinThe configtype facetof forthe datasetfield. */ @PublicEvolving public interface DatasetConfigFacet { Map<String, String> configT type(); } /** Builtin schema facet for dataset. */ @PublicEvolving public interface DatasetSchemaFacet extends LineageDatasetFacet { <T> Map<String, String>DatasetSchemaField<T>> fields(); } /** Lineage vertex for source which has boundedness. */ @PublicEvolving public interface SourceLineageVertex extends LineageVertex { /** * The boundedness for the source connector, users can get boundedness for each sources in * the lineage and determine the job execution mode with RuntimeExecutionMode. */ Boundedness boundedness(); } /** Lineage edge from sources to sink. */ @PublicEvolving public interface LineageEdge { LineageVertex source(); LineageVertex sink(); } |
...
For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plan, the entire processing has nothing to do with users. CatalogContext
in TableLineageVertex
TableLineageDataset
is defined in FLIP-294 [1] which identify the context for physical table in different catalog.
Code Block |
---|
/** Basic table lineage vertexdataset which has catalog context and table in it. */ public interface TableLineageVertexTableLineageDataset extends LineageVertexLineageDataset { /* The catalog context of the table lineage vertex. */ public CatalogContext catalogContext(); /* The table of the lineage vertex. */ public CatalogBaseTable table(); /* Database name and table name for the table lineage vertex name and table name for the table lineage vertex. */ public ObjectPath objectPath(); } /** Schema field for sql/table jobs which has `LogicalType` for field type. */ public interface TableSchemaField extends public ObjectPath objectPath();DatasetSchemaField<LogicalType> { } /** Source lineage vertex for table. */ @PublicEvolving public interface TableSourceLineageVertex extends TableLineageVertex, SourceLineageVertex { } /** Sink lineage vertex for table. */ @PublicEvolving public interface TableSinkLineageVertex extends TableLineageVertex { /* Modify type, INSERT/UPDATE/DELETE statement, listener can identify different sink by the type to determine whether the sink should be added to the lineage. */ ModifyType modifyType(); } /* Table lineage edges from source table to sink table. */ @PublicEvolving public interface TableLineageEdge extends LineageEdge { /* Table column lineage edges from source columns to sink columns. */ List<TableColumnLineageEdge> columnEdges(); } /* Column lineage from source table columns to each sink table column, one sink column may be aggregated by multiple tables and columns. */ @PublicEvolving public interface TableColumnLineageEdge { /** The dataset for source dataset. */ LineageDataset source(); /** * Columns from one source table of {@link LineageEdge} to the sink column. Each sink column may be computed from multiple columns * from source, for example, avg operator from two columns in the source. */ List<String> sourceColumns(); /* Sink table column. */ String sinkColumn(); } /** The existing `ModifyType` should be marked as `PublicEvolving` and users can get it from table sink lineage vertex. */ @PublicEvolving public enum ModifyType { } |
...
Code Block |
---|
/** User defined vector source and sink lineage vertex. */
public class KafkaVectorLineageVertex extends LineageVertex {
/* The capacity of source lineage. */
int capacity();
/* The value type in the vector. */
String valueType();
List<LineageDataset> datasets();
}
// Create kafka source class with lineage vertex
public class KafkaVectorSource extends KafkaSource implements LineageVertexProvider {
int capacity;
String valueType;
public LineageVertex LineageVertex() {
return new KafkaVectorLineageVertex(capacity, valueType);
}
}
// Create kafka sink class with lineage vertex
public class KafkaVectorSink extends FlinkKafkaProducerBase implements LineageVertexProvider {
int capacity;
String valueType;
public LineageVertex LineageVertex() {
return new KafkaVectorLineageVertex(capacity, valueType);
}
}
/* User can use vector source/sink lineages in datastream job. */
StreamExecutionEnvironment env = ...;
KafkaSource source = new KafkaVectorSource();
KafkaSink sink = new KafkaVectorSink();
env.fromSource(source)
.map(...).keyBy(..).reduce(..)...
.sinkTo(sink);
env.addLineageEdges(/* Build lineage edge from source and sink vertex */);
env.execute(); |
...
Code Block |
---|
// Datahub lineage listener factory. public class DatahubLineageListenerFactory implements JobStatusChangedListenerFactory { private static final String DATAHUB_REST_ADDRESS = "datahub.rest.url"; @Override public JobStatusChangedListener createListener(Context context) { Map<String, String> config = context.getConfiguration().toMap(); String url = checkNotNull(config.get(DATAHUB_REST_ADDRESS)); return new DatahubLineageListener(url); } } // Datahub lineage listener for paimon tables. public class DatahubLineageListener implements JobStatusChangedListener { private static final Logger LOG = LoggerFactory.getLogger(DatahubLineageListener.class); private final String url; public DatahubLineageListener(String url) { this.url = url; } @Override public void onEvent(JobStatusChangedEvent event) { if (event instanceof JobCreatedEvent) { JobCreatedEvent createdEvent = (JobCreatedEvent) event; LineageVertex jobLineage = createdEvent.lineage(); for (LineageEdge relation : jobLineage.relations()) { LineageVertex source = relation.sources(); LineageVertex sink = relation.sink(); checkArgument( sink instanceof TableLineageVertex, String.format( "Only support table sink lineage vertex: %s", sink.getClass().getName())); TableLineageVertex tableSink = (TableLineageVertex) sink; String sinkPhysicalPath = getPhysicalPath(tableSink); ObjectIdentifier sinkIdentifier = tableSink.identifier(); checkArgument( source instanceof TableLineageVertex, String.format( "Only support table source lineage vertex: %s", source.getClass().getName())); TableLineageVertex tableSource = (TableLineageVertex) source; // Get physical table path for paimon catalog. String sourcePhysicalPath = getPhysicalPath(tableSource); ObjectIdentifier sourceIdentifier = tableSource.identifier(); createRelation( createdEvent.jobId(), createdEvent.jobName(), sourcePhysicalPath, sourceIdentifier, sinkPhysicalPath, sinkIdentifier); } } else if (event instanceof JobExecutionStatusEvent) { JobExecutionStatusEvent executionStatusEvent = (JobExecutionStatusEvent) event; if (executionStatusEvent.oldStatus().isGloballyTerminalState()) { deleteRelation(executionStatusEvent.jobId(), executionStatusEvent.jobName(), executionStatusEvent.exception()); } } else { LOG.error( "Receive unsupported job status changed event: {}", event.getClass().getName()); } } private String getPhysicalPath(TableLineageVertex LineageVertexlineageVertex) { CatalogContext sinkCatalogContext = LineageVertexlineageVertex.datasets().get(0).catalogContext(); String sinkCatalogIdentifier = sinkCatalogContext .getFactoryIdentifier() .orElseThrow(() -> { throw new UnsupportedOperationException( "Only support catalog table connector."); }); checkArgument( sinkCatalogIdentifier.equals("paimon"), "Only support paimon connector for lineage"); return checkNotNull(sinkCatalogContext .getConfiguration() .toMap() .get("path")); } private void createRelation( JobID jobId, String jobName, String sourcePhysicalPath, ObjectIdentifier sourceIdentifier, String sinkPhysicalPath, ObjectIdentifier sinkIdentifier) { // Create dataset from physical path and identifier for source and sink, then create relation from source to sink for given job } private void deleteRelation(JobID jobId, String jobName, @Nullable Throwable exception) { // Delete relation created by given job } } |
...