Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage.
 */
@PublicEvolvig
public interface LineageGraph {
    /* Source lineage vertex list. */
    List<SourceLineageVertex> sources();

    /* Sink lineage vertex list. */
    List<LineageVertex> sinks();

    /* lineage edges from sources to sinks. */
    List<LineageEdge> relations();
}

/** Lineage vertex represents the connectors in lineage graph, including source and sink. */
@PublicEvolving
public interface LineageVertex {
    /* List of input (for source) or output (for sink) datasets interacted with by the connector */
    List<LineageDataset> datasets();
}

/** Lineage dataset represents the source or sink in the job. */
@PublicEvolving
public interface LineageDataset {
    /* Name for this particular dataset. */
    String name;
    /* Unique name for this dataset's storage, for example, url for jdbc connector and location for lakehouse connector. */
    String namespace;
    /* Facets for the lineage vertex to describe the particular information of dataset, such as schema and config. */ 
    Map<String, Facet> facets;
}

/** Facet interface for dataset. */
@PublicEvolving
public interface LineageDatasetFacet {
    /** Name for the facet which will be used as key in facets of LineageDataset. */
    String name();
}

/** Builtin config facet for dataset. */
@PublicEvolving
public interface DatasetConfigFacet extends LineageDatasetFacet {
    Map<String, String> config();
}

/** Field for schema in dataset. */
public interface DatasetSchemaField<T> {
    /** The name of the field. */
    String name();
    /** The type of the field. */
    T type();
}

/** Builtin schema facet for dataset. */
@PublicEvolving
public interface DatasetSchemaFacet extends LineageDatasetFacet {
    <T> Map<String, DatasetSchemaField<T>> fields();
}

/** Lineage vertex for source which has boundedness. */
@PublicEvolving
public interface SourceLineageVertex extends LineageVertex {
    /**
     * The boundedness for the source connector, users can get boundedness for each sources in
     * the lineage and determine the job execution mode with RuntimeExecutionMode.
     */
    Boundedness boundedness();
}

/** Lineage edge from sources to sink. */
@PublicEvolving
public interface LineageEdge {
    LineageVertex source();
    LineageVertex sink();
}

...

For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plan, the entire processing has nothing to do with users. CatalogContext in TableLineageVertex TableLineageDataset is defined in FLIP-294 [1] which identify the context for physical table in different catalog.

Code Block
/** Basic table lineage vertexdataset which has catalog context and table in it. */
public interface TableLineageVertexTableLineageDataset extends LineageVertexLineageDataset {
    /* The catalog context of the table lineage vertex. */
    public CatalogContext catalogContext();

    /* The table of the lineage vertex. */
    public CatalogBaseTable table();

    /* Database name and table name for the table lineage vertex. */
    public ObjectPath objectPath();
}

/** Schema field for sql/table jobs which has `LogicalType` for field type. */
public interface TableSchemaField extends DatasetSchemaField<LogicalType> {
}

/** Source lineage vertex for table. */
@PublicEvolving
public interface TableSourceLineageVertex extends TableLineageVertex, SourceLineageVertex {
}

/** Sink lineage vertex for table. */
@PublicEvolving
public interface TableSinkLineageVertex extends TableLineageVertex {
    /* Modify type, INSERT/UPDATE/DELETE statement, listener can identify different sink by the type to determine whether the sink should be added to the lineage. */
    ModifyType modifyType();
}

/* Table lineage edges from source table to sink table. */
@PublicEvolving
public interface TableLineageEdge extends LineageEdge {
    /* Table column lineage edges from source columns to sink columns. */
    List<TableColumnLineageEdge> columnEdges();
}

/* Column lineage from source table columns to each sink table column, one sink column may be aggregated by multiple tables and columns. */
@PublicEvolving
public interface TableColumnLineageEdge {
    /** The dataset for source dataset. */
    LineageDataset source();

    /**
     * Columns from one source table of {@link LineageEdge} to the sink column. Each sink column may be computed from multiple columns
     * from source, for example, avg operator from two columns in the source.
     */
    List<String> sourceColumns();

    /* Sink table column. */
    String sinkColumn();
}

/** The existing `ModifyType` should be marked as `PublicEvolving` and users can get it from table sink lineage vertex. */
@PublicEvolving
public enum ModifyType {
}

...

Code Block
/** User defined vector source and sink lineage vertex. */
public class KafkaVectorLineageVertex extends LineageVertex {
    /* The capacity of source lineage. */
    int capacity();

    /* The value type in the vector. */
    String valueType();

    List<LineageDataset> datasets();
}

// Create kafka source class with lineage vertex
public class KafkaVectorSource extends KafkaSource implements LineageVertexProvider {
    int capacity;
    String valueType;

    public LineageVertex LineageVertex() {
        return new KafkaVectorLineageVertex(capacity, valueType);
    }
}

// Create kafka sink class with lineage vertex
public class KafkaVectorSink extends FlinkKafkaProducerBase implements LineageVertexProvider {
    int capacity;
    String valueType;

    public LineageVertex LineageVertex() {
        return new KafkaVectorLineageVertex(capacity, valueType);
    }
}
 
/* User can use vector source/sink lineages in datastream job. */
StreamExecutionEnvironment env = ...;
KafkaSource source = new KafkaVectorSource();
KafkaSink sink = new KafkaVectorSink();
env.fromSource(source)
	.map(...).keyBy(..).reduce(..)...
	.sinkTo(sink);
env.addLineageEdges(/* Build lineage edge from source and sink vertex */);
env.execute();

...

Code Block
// Datahub lineage listener factory.
public class DatahubLineageListenerFactory implements JobStatusChangedListenerFactory {
    private static final String DATAHUB_REST_ADDRESS = "datahub.rest.url";

    @Override
    public JobStatusChangedListener createListener(Context context) {
        Map<String, String> config = context.getConfiguration().toMap();
        String url = checkNotNull(config.get(DATAHUB_REST_ADDRESS));
        return new DatahubLineageListener(url);
    }
}

// Datahub lineage listener for paimon tables.
public class DatahubLineageListener implements JobStatusChangedListener {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubLineageListener.class);
    private final String url;

    public DatahubLineageListener(String url) {
        this.url = url;
    }

    @Override
    public void onEvent(JobStatusChangedEvent event) {
        if (event instanceof JobCreatedEvent) {
            JobCreatedEvent createdEvent = (JobCreatedEvent) event;
            LineageVertex jobLineage = createdEvent.lineage();
            for (LineageEdge relation : jobLineage.relations()) {
                LineageVertex source = relation.sources();                 LineageVertex sink = relation.sink();
                checkArgument(
                        sink instanceof TableLineageVertex,
                        String.format(
                                "Only support table sink lineage vertex: %s",
                                sink.getClass().getName()));
                TableLineageVertex tableSink = (TableLineageVertex) sink;
                String sinkPhysicalPath = getPhysicalPath(tableSink);
                ObjectIdentifier sinkIdentifier = tableSink.identifier();

                checkArgument(
                        source instanceof TableLineageVertex,
                        String.format(
                                "Only support table source lineage vertex: %s",
                                source.getClass().getName()));
                TableLineageVertex tableSource = (TableLineageVertex) source;

                // Get physical table path for paimon catalog.
                String sourcePhysicalPath = getPhysicalPath(tableSource);
                ObjectIdentifier sourceIdentifier = tableSource.identifier();
                createRelation(
                            createdEvent.jobId(),
                            createdEvent.jobName(),
                            sourcePhysicalPath,
                            sourceIdentifier,
                            sinkPhysicalPath,
                            sinkIdentifier);
            }
        } else if (event instanceof JobExecutionStatusEvent) {
            JobExecutionStatusEvent executionStatusEvent = (JobExecutionStatusEvent) event;
            if (executionStatusEvent.oldStatus().isGloballyTerminalState()) {
                deleteRelation(executionStatusEvent.jobId(), executionStatusEvent.jobName(), executionStatusEvent.exception());
            }
        } else {
            LOG.error(
                    "Receive unsupported job status changed event: {}",
                    event.getClass().getName());
        }
    }

    private String getPhysicalPath(TableLineageVertex LineageVertexlineageVertex) {
        CatalogContext sinkCatalogContext = LineageVertexlineageVertex.datasets().get(0).catalogContext();
        String sinkCatalogIdentifier = sinkCatalogContext
                .getFactoryIdentifier()
                .orElseThrow(() -> {
                    throw new UnsupportedOperationException(
                            "Only support catalog table connector.");
                });
        checkArgument(
                sinkCatalogIdentifier.equals("paimon"),
                "Only support paimon connector for lineage");
        return checkNotNull(sinkCatalogContext
                .getConfiguration()
                .toMap()
                .get("path"));
    }

    private void createRelation(
            JobID jobId,
            String jobName,
            String sourcePhysicalPath,
            ObjectIdentifier sourceIdentifier,
            String sinkPhysicalPath,
            ObjectIdentifier sinkIdentifier) {
        // Create dataset from physical path and identifier for source and sink, then create relation from source to sink for given job
    }

    private void deleteRelation(JobID jobId, String jobName, @Nullable Throwable exception) {
        // Delete relation created by given job
    }
}

...