You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state"Under Discussion"

Discussion thread: To be added

JIRA: https://issues.apache.org/jira/browse/FLINK-22915

Released: 1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

The following use-cases are not supported yet. And we would like to address these use-cases with the changes proposed in this FLIP.


1) Express an Estimator/Transformer that has multiple inputs/outputs.

For example, some graph embedding algorithms need to take two tables as inputs. These two tables represent nodes and edges of the graph respectively. This logic can be expressed as an Estimator with 2 input tables.

And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.

2) Compose a directed-acyclic-graph Estimator/Transformer into an Estimator/Transformer.

For example, the workflow may involve the join of 2 tables, where each table could be generated by a chain of Estimator/Transformer. The entire workflow is therefore a DAG of Estimator/Transformer.

3) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

In this scenario, we need to allow the Estimator to be run on a different machine than the Transformer. So that Estimator could consume sufficient computation resource in a cluster while the Transformer could be deployed on edge devices.


In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:

4) The Model interface does not provide any added value (given that we already have Transformer). The added class hierarchy complexity is not justified.

5) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

6) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

7) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each other.


Public Interfaces

This FLIP proposes quite a few changes and addition to the existing Flink ML APIs. We first describe the final APIs of the classes updated by this FLIP, followed by the detailed explanation of changes we have made to the Flink ML APIs.

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * This method checks the compatibility between input schemas, stage's parameters and stage's
     * logic. It should raise an exception if there is any mismatch, e.g. the number of input
     * schemas is wrong, or if a required field is missing from a schema.
     *
     * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * <p>Note that the output schemas of a given Estimator instance should equal the output schemas
     * of the Transformer instance fitted by this Estimator instance, suppose the same list of input
     * schemas are used as inputs to the fit/transform methods respectively.
     *
     * @param schemas the list of schemas of the input tables.
     * @return the list of schemas of the output tables.
     */
    TableSchema[] transformSchemas(TableSchema... schemas);

    /** Skipped */
    default String toJson() {...}

    /** Skipped */
    default void loadJson(String json) {...}
}

@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {

    /**
     * Applies the Transformer on the given input tables, and returns the result tables.
     *
     * @param inputs a list of tables
     * @return a list of tables
     */
    Table[] transform(Table... inputs);

    /**
     * Uses the given list of tables to update internal states. This can be useful for e.g. online
     * learning where an Estimator fits an infinite stream of training samples and streams the model
     * diff data to this Transformer.
     *
     * <p>This method may be called at most once.
     *
     * @param inputs a list of tables
     */
    default void setStateStreams(Table... inputs) {
        throw new UnsupportedOperationException("this method is not implemented");
    }

    /**
     * Gets a list of tables representing changes of internal states of this Transformer. These
     * tables might come from the Estimator that instantiated this Transformer.
     *
     * @return a list of tables
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}

@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {

    /**
     * Trains on the given inputs and produces a Transformer. If this Estimator may be used to
     * compose a Pipeline, the transform method of the returned Transformer should be able to accept
     * a list of tables of the same length and schemas as the fit method of this Estimator.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

    public PipelineModel(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}


The following code block shows the interface of Graph, GraphModel and GraphBuilder that we propose to add.

/**
 * A Graph acts as an Estimator. It consists of a DAG of stages, each of which is either an
 * Estimator or Transformer.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {
    public Graph(...) {...}

    @Override
    public GraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
        return schemas;
    }

    /** Skipped a few methods, including the implementations of some Estimator APIs. */
}

/** A GraphModel acts as a Transformer. It consists of a DAG of Transformers. */
@PublicEvolving
public final class GraphModel implements Transformer<GraphModel> {
    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

/** A GraphBuilder helps connect Stage instances into a Graph or GraphModel. */
@PublicEvolving
public final class GraphBuilder {
    /**
     * Specifies the upper bound (could be loose) of the number of output tables that can be
     * returned by the Transformer::getStateStreams and Transformer::transform methods, for any
     * stage involved in this Graph.
     *
     * <p>The default upper bound is 20.
     */
    public GraphBuilder setMaxOutputLength(int maxOutputLength) {...}

    /**
     * Creates a TableId associated with this GraphBuilder. It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated
     * by this builder.
     */
    public TableId createTableId() {...}

    /**
     * The Graph::fit and GraphModel::transform should invoke the fit/transform of the corresponding
     * stage with the corresponding inputs.
     *
     * <p>Returns a list of TableIds, which represents outputs of the Transformer::transform
     * invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId... inputs) {...}

    /**
     * The GraphModel::setStateStreams should invoke the setStateStreams of the corresponding stage
     * with the corresponding inputs.
     */
    void setStateStreams(Stage<?> stage, TableId... inputs) {...}

    /**
     * The GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.
     *
     * <p>Returns a list of TableIds, which represents outputs of the getStateStreams invocation.
     */
    TableId[] getStateStreams(Stage<?> stage) {...}

    /**
     * Returns a Graph instance which the following API specification: - Graph::fit should take
     * inputs and returns a GraphModel with the following specification. - GraphModel::transform
     * should take inputs and returns outputs. - GraphModel::setStateStreams should take
     * inputStates. - GraphModel::getStateStreams should return outputStates.
     *
     * <p>The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal
     * stages in the order specified by the DAG of stages.
     */
    Graph build(] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a GraphModel instance which the following API specification: - GraphModel::transform
     * should take inputs and returns outputs. - GraphModel::setStateStreams should take
     * inputStates. - GraphModel::getStateStreams should return outputStates.
     *
     * <p>The transform/setStateStreams/getStateStreams should invoke the APIs of the internal
     * stages in the order specified by the DAG of stages.
     *
     * <p>This method throws exception if any stage of this graph is an Estimator.
     */
    GraphModel buildModel(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    // The TableId is necessary to pass the inputs/outputs of various API calls across the
    // Graph/GraphModel stags.
    static class TableId {}

}


Here we list the changes and addition to the Flink ML API:

1) Removed TableEnvironment from the parameter list of fit/transform APIs.

2) Added PipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

3) Removed Pipeline::appendStage from the Pipeline class.

4) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

5) Removed the Model interface.

6) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

7) Added transformSchemas()  to the Stage interface.

This is needed to validate the compatibility of input schemas with a given Estimator/Transformer instance.

8) Added setStateStreams and getStateStreams to the Transformer interface.

9) Added Graph, GraphModel and GraphBuilder.


Example Usage

In this section, we provide an example of using GraphBuilder to construct a DAG of Estimator/Transformer.

Suppose we have the following Transformer and Estimator classes:

  • TransformerA whose transform(...) takes 1 input table and has 1 output table.
  • TransformerB whose transform(...) takes 2 input tables and has 1 output table.
  • EstimatorB whose fit(...) takes 2 input tables and returns an instance of TransformerB.

And we want to compose an Estimator (e.g. Graph) as the following DAG:

TODO: add a graph


Here is the code snippet to complete this goal


GraphBuilder builder = new GraphBuilder();

// Creates nodes
Stage<?> stage1 = new TransformerA();
Stage<?> stage2 = new TransformerA();
Stage<?> stage3 = new EstimatorB();
// Creates inputs and inputStates
TableId input1 = builder.createTableId();
TableId input2 = builder.createTableId();
TableId inputState1 = builder.createTableId();
// Feeds input states to nodes.
builder.setStateStreams(stage1, inputState1);
// Feeds inputs to nodes and gets outputs.
TableId output1 = builder.getOutputs(stage1, input1)[0];
TableId output2 = builder.getOutputs(stage2, input2)[0];
TableId output3 = builder.getOutputs(stage3, output1, output2)[0];
// Gets output states
TableId outputState1 = builder.getStateStreams(stage3)[0];

// Specifies the ordered lists of inputs, outputs, input states and output states that will
// be used as the inputs/outputs of the corresponding Graph and GraphModel APIs.
TableId[] inputs = new TableId[] {input1, input2};
TableId[] outputs = new TableId[] {output3};
TableId[] inputStates = new TableId[] {inputState1};
TableId[] outputStates = new TableId[] {outputState1};
// Generates the Graph instance.
Graph graph = builder.build(inputs, outputs, inputStates, outputStates);

// Use the Graph instance as an Estimator.
GraphModel model = graph.fit(...);
Table[] results = model.transform(...);



Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will manually migrate the existing open source projects which use the existing Flink ML API to use the proposed APIs.

Note that there is no implementation of Estimator/Transformer (excluding test-only implementations) in the existing Flink codebase. So the effort to migrate the existing Flink codebase is zero.

To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives


To be added

  • No labels