You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state"Under Discussion"

Discussion thread: To be added

JIRA: https://issues.apache.org/jira/browse/FLINK-22915

Released: 1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

The following use-cases are not supported yet. And we would like to address these use-cases with the changes proposed in this FLIP.


1) Express an Estimator/Transformer that has multiple inputs/outputs.

For example, some graph embedding algorithms need to take two tables as inputs. These two tables represent nodes and edges of the graph respectively. This logic can be expressed as an Estimator with 2 input tables.

And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.

2) Compose a directed-acyclic-graph Estimator/Transformer into an Estimator/Transformer.

For example, the workflow may involve the join of 2 tables, where each table could be generated by a chain of Estimator/Transformer. The entire workflow is therefore a DAG of Estimator/Transformer.

3) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

In this scenario, we need to allow the Estimator to be run on a different machine than the Transformer. So that Estimator could consume sufficient computation resource in a cluster while the Transformer could be deployed on edge devices.


In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:

4) The Model interface does not provide any added value (given that we already have Transformer). The added class hierarchy complexity is not justified.

5) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

6) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

7) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each other.


Public Interfaces

This FLIP proposes quite a few changes and addition to the existing Flink ML APIs. We first describe the final APIs of the classes updated by this FLIP, followed by the detailed explanation of changes we have made to the Flink ML APIs.


The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {

    /**
     * This method checks the compatibility between input schemas, stage's parameters and stage's
     * logic. It should raise an exception if there is any mismatch, e.g. the number of input
     * schemas is wrong, or if a required field is missing from a schema.
     *
     * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * <p>Note that the output schemas of a given Estimator instance should equal the output schemas
     * of the Transformer instance fitted by this Estimator instance, suppose the same list of input
     * schemas are used as inputs to the fit/transform methods respectively.
     *
     * @param schemas the list of schemas of the input tables.
     * @return the list of schemas of the output tables.
     */
    TableSchema[] transformSchemas(TableSchema... schemas);

    /** Skipped */
    default String toJson() {...}

    /** Skipped */
    default void loadJson(String json) {...}
}


@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {

    /**
     * Applies the Transformer on the given input tables, and returns the result tables.
     *
     * @param inputs a list of tables
     * @return a list of tables
     */
    Table[] transform(Table... inputs);

    /**
     * Uses the given list of tables to update internal states. This can be useful for e.g. online
     * learning where an Estimator fits an infinite stream of training samples and streams the model
     * diff data to this Transformer.
     *
     * <p>This method may be called at most once.
     *
     * @param inputs a list of tables
     */
    default void setStateStreams(Table... inputs) {
        throw new UnsupportedOperationException("this method is not implemented");
    }

    /**
     * Gets a list of tables representing changes of internal states of this Transformer. These
     * tables might come from the Estimator that instantiated this Transformer.
     *
     * @return a list of tables
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}


@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {

    /**
     * Trains on the given inputs and produces a Transformer. If this Estimator may be used to
     * compose a Pipeline, the transform method of the returned Transformer should be able to accept
     * a list of tables of the same length and schemas as the fit method of this Estimator.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}


@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

    public PipelineModel(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}



The following code block shows the interface of Graph, GraphModel and GraphBuilder proposed by this FLIP.


/**
 * A Graph acts as an Estimator. It consists of a DAG of stages, each of which is either an
 * Estimator or Transformer.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {
    public Graph(...) {...}

    @Override
    public GraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
        return schemas;
    }

    /** Skipped a few methods, including the implementations of some Estimator APIs. */
}


/** A GraphBuilder helps connect Stage instances into a Graph or GraphModel. */
@PublicEvolving
public final class GraphBuilder {
    private int maxOutputLength = 20;

    public GraphBuilder() {

    }

    /**
     * Specifies the upper bound (could be loose) of the number of output tables that can be returned by the
     * Transformer::getStateStreams and Transformer::transform methods, for any stage involved in this Graph.
     *
     * The default upper bound is 20.
     */
    public GraphBuilder setMaxOutputLength(int maxOutputLength) {
        this.maxOutputLength = maxOutputLength;
        return this;
    }

    /**
     * Creates a TableId associated with this GraphBuilder. It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated by
     * this builder.
     */
    public TableId createTableId() {
        return new TableId();
    }

    /**
     * The Graph::fit and GraphModel::transform should invoke the fit/transform of the corresponding stage with the
     * corresponding inputs.
     *
     * Returns a list of TableIds, which represents outputs of the Transformer::transform invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId... inputs) {
        return new TableId[maxOutputLength];
    }

    /**
     * The GraphModel::setStateStreams should invoke the setStateStreams of the corresponding stage with the
     * corresponding inputs.
     */
    void setStateStreams(Stage<?> stage, TableId... inputs) {}


    /**
     * The GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.
     *
     * Returns a list of TableIds, which represents outputs of the getStateStreams invocation.
     */
    TableId[] getStateStreams(Stage<?> stage) {
        return new TableId[maxOutputLength];
    }

    /**
     * Returns a Graph instance which the following API specification:
     * - Graph::fit should take inputs and returns a GraphModel with the following specification.
     * - GraphModel::transform should take inputs and returns outputs.
     * - GraphModel::setStateStreams should take inputStates.
     * - GraphModel::getStateStreams should return outputStates.
     *
     * The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in
     * the order specified by the DAG of stages.
     */
    Graph build(
            TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {
        return new Graph();
    }

    /**
     * Returns a GraphModel instance which the following API specification:
     * - GraphModel::transform should take inputs and returns outputs.
     * - GraphModel::setStateStreams should take inputStates.
     * - GraphModel::getStateStreams should return outputStates.
     *
     * The transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in
     * the order specified by the DAG of stages.
     *
     * This method throws exception if any stage of this graph is an Estimator.
     */
    GraphModel buildModel(
            TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {
        return new GraphModel();
    }

    static class TableId {}
}






Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will manually migrate the existing open source projects which use the existing Flink ML API to use the proposed APIs.

Note that there is no implementation of Estimator/Transformer (excluding test-only implementations) in the existing Flink codebase. So the effort to migrate the existing Flink codebase is zero.

To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

To be added

  • No labels