You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state"Under Discussion"

Discussion thread: To be added

JIRA: <To be added>

Released: 1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).



Note: Part of the proposed modification or use case is the same as that in FLIP-173. To be self-contained, we copy-paste it here. 

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

The following use-cases are not supported yet. And we would like to address these use-cases with the changes proposed in this FLIP.


1) Express the training/prediction process that takes multiple inputs/outputs.

For example, some graph embedding algorithms need to take two input tables for training. In Metapath2vec, these two tables are node labels (schema: nodeId, nodeLabel) and edges (nodeId, nodeId, edgeWeight). 

Moreover, the output of the FpGrowth algorithm contains two outputs, i.e., the association rules and frequent item sets.

The existing Transformer/Estimator API cannot be used to describe these cases since they assumed single input and single output.


2) Express the machine learning algorithms that take inputs with different schemas during the training and prediction process.

For example, the Word2vec algorithm uses a list of words (i.e. sentence) as the input schema for training. And the fitted model should be able to use a word (instead of a list of words) as the input schema. 

Also, the Metapath2vec algorithm is expected to take 2 tables as input for training, and take 1 table as input for inference. The input schemas are also different between the training and prediction process.

Though we may still express some of them (e.g., Word2vec) as a transformer and an estimator, adding them into a pipeline will 100% lead to a bug since they need to be able to consume exactly the same data from upstream stages (The current API allows us to do this).


3) Existing ML APIs not only support concrete algorithms through estimators/transformers, but also related ML tasks, such as evaluation, and statistics. These tasks are not implemented as transformers or estimators by convention.

For example, given a trained model and a validation dataset, we want to make predictions on the validation dataset and evaluate how good the model is (e.g., AUC value, F1-score.).

There is also some computation logic that is not covered by training and prediction, for example the Spearman Correlation.

In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:


4) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.


5) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

Background

Note: Readers who are familiar with the existing Estimator/Transformer/Pipeline APIs can skip this section.

The design proposed in this doc is built on top of the design proposed in FLIP-39: Flink ML pipeline and ML libs, which in turn is motivated by the Estimator/Transformer/Pipeline design proposed in Scikit-learn. Please feel free to read FLIP-39 and Scikit-learn paper for more detail. In this section, we explain the key background information to help better understand the motivation and benefits of the proposed design, as well as the definition of some terminology.

1) What is the motivation of defining the Estimator and the Transformer interfaces? How are these interfaces related to machine learning?

We expect most classic machine learning algorithms to have a training logic and inference logic. The training logic of the algorithm reads data (e.g. labeled data) and updates its variables. The inference logic of the algorithm makes prediction (e.g. label) for the unseen data.

The inference logic of this algorithm could be represented as a subclass of Transformer (say TransformerA). The Transformer subclass has a transform() method, which reads data (as Table) and outputs data (as Table).

The training logic of this algorithm could be represented as a subclass of Estimator (say EstimatorA). The Estimator subclass has a fit() method, which reads data (as Table) and outputs an instance of Transformer (e.g. TransformerA in this case). The TransformerA instance could be constructed using variables from the EstimatorA instance.

Then, in order to do training and prediction using this algorithm, a user could write the following code:

Estimator estimator = new EstimatorA(...)

Table prediction_result = estimator.fit(training_data).transform(inference_data)


2) How does Pipeline work? What is the motivation of the Pipeline class?

The Pipeline is created from a linear chain of Estimator/Transformer (referred to as stage below). And it is used as an Estimator (since it implements Estimator). 

The execution of Pipeline::fit is a bit more complicated than it looks, since the output of an Estimator stage is not used directly as the input of the next stage in the Pipeline.

We highlight the key execution detail of Pipeline::fit below:

  • The input of Pipeline::fit is given to the first stage in the Pipeline.
  • If a stage is Transformer, the given input is used to invoke Transformer::transform(...), whose output is used as the input of the next stage.
  • If a stage is Estimator, the given input is used to invoke Estimator::fit(...), which produces a Transformer instance. The same input is then given to the Transformer::transform of the fitted instance, whose output is used as the input of the next stage.
  • Eventually, any Estimator stage of this Pipeline will produce a Transformer, which are combined with the original Transformer stage of the Pipeline into a linear chain of Transformers. This linear chain of Transformers are composed into an instance of Transformer.

The motivation of the Pipeline class is to enable user to do the following:

  • Compose an Estimator from a linear chain of Estimator/Transformer
  • Use the Transformer instance fitted by this linear chain of Estimator/Transformer without additionally defining this Transformer instance (which should itself be a linear chain of Transformer) again.

It is important to make the following observation: if we don't provide the Pipeline class, users can still accomplish the same use-cases targeted by Pipeline by explicitly writing the training logic and inference logic separately using Estimator/Transformer APIs. But users would have to construct this chain of Estimator/Transformer twice (for training and inference respectively).

Design Principles

Multiple choices exist to address the use-cases targeted by this design doc. In the following, we explain the design principles followed by the proposed design, to hopefully make the understanding of the design choices more intuitive.


1) When some use cases can only be supported by modifying some well-recognized APIs and breaking the user experience, we prefer not to change the semantic or function signature of existing API.

As a result of this philosophy, to support complex use cases (e.g., Word2vec, MetaPath2vec) we do not modify the existing estimator/transformer/pipeline APIs. Rather we propose to use a simple and easy-to-understand abstraction --- AlgoOperator, which abstracts the machine learning computation logic that takes multiple tables as input and outputs multiple tables. Thus users can still enjoy the benefits of transformer/estimator/pipeline since we did not break any of them.


2) The proposed API should be able to cover all of the machine learning semantics while being easy to understand and has high usability.

As a result of this philosophy, we believe the Flink ML API should cover not only training and prediction, but also evaluation, computation of statistics, and so on. As a result, we propose AlgoOperator, an abstraction for machine learning computation logic to describe all machine learning algorithms.

To enable high usability, we wrap the computation logic with a chaining method for efficiently constructing the machine learning job and make the code more human-readable.

Public Interfaces

This FLIP proposes quite a few changes and additions to the existing Flink ML APIs. We first describe the proposed API additions and changes, followed by the API code of interfaces and classes after making the proposed changes.

API additions and changes

Here we list the additions and the changes to the Flink ML API.


1) Removed TableEnvironment from the parameter list of fit/transform APIs.

This change simplifies the usage of fit/transform APIs.


2) Added PipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

This change makes the experience of using Pipeline consistent with the experience of using Estimator/Transformer, where a class is either an Estimator or a Transformer.


3) Added the AlgoOperator abstraction.

AlgoOperator is essentially an encapsulation for computation logic including training, prediction, evaluation and others like some math operations on tables (e.g., Spearman Correlation), which takes multiple input and multiple output tables.

In detail, (1) AlgoOperator exposes linkFrom() and link() functions to machine learning users for constructing the computation logic into a machine learning job; (2) AlgoOperator exposes compute() function to machine learning developers for implementing the detailed logic.

For each algorithm, developers need to implement a train operator and a prediction operator. Take Kmeans as an example, developers need to implement a KmeansTrainOp and KmeansPredictOp to describe the training and prediction logic. 

The input and output of a train operator are the training data and the model data, respectively. The inputs of a prediction operator are the model data and the data used for prediction. The output of a prediction operator is the prediction results.


Interfaces and classes after the proposed API changes

The following code block shows the PipelineStage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.


@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {
 
    /**
     * Applies the transformer on the input table, and returns the result table.
     *
     * @param input the table to be transformed
     * @return the transformed table
     */
    Table transform(Table input);
}
 
@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {
 
    /**
     * Train and produce a {@link Model} which fits the records in the given {@link Table}.
     *
     * @param input the table with records to train the Model.
     * @return a model trained to fit on the given Table.
     */
    M fit(Table input);
}
 
@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {
 
    public Pipeline(List<Stage<?>> stages) {...}
 
    @Override
    public PipelineModel fit(Table input) {...}
 
    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}
 
@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {
 
    public PipelineModel(List<Transformer<?>> transformers) {...}
 
    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}


The following code block shows the abstract class of AlgoOperator.

abstract class AlgoOperator<T extends AlgoOperator> implements WithParams <T> {
    // the first output table
    protected Table output = null;
    // the side output tables
    private Table[] sideOutputs = null;
    // the params
    private Params params = null;

    /**
     * get the output table
     *
     * @return
     */
    public final Table getOutputTable() {
        if (null == this.output) {
            throw new RuntimeException(
                "There is no output. Please call current AlgoOperator's 'link' or related method firstly, or this "
                    + "AlgoOperator has no output.");
        } else {
            return this.output;
        }
    }

    /**
     * get Params for this algorithm.
     *
     * @return
     */
    public final Params getParams() {
        return params;
    }

    /**
     * get side outputs of this operator
     *
     * @return
     */
    public final AlgoOperator<?>[] getSideOutputs() {
        if (null == sideOutputs) {
            return null;
        }

        TableSourceOp[] sideOutputOps = new TableSourceOp[sideOutputs.length];
        for (int i = 0; i < sideOutputs.length; i++) {
            sideOutputOps[i] = new TableSourceOp(sideOutputs[i]);
        }
        return sideOutputOps;
    }

    /**
     * Link from other AlgoOperators. This function encapsulates the computation logic of this AlgoOperator.
     * Note that: Only the first output table of each input will be used in the computation logic.
     *
     * @param inputs the input AlgoOperators
     * @return return this AlgoOperator
     */
    public final T linkFrom(AlgoOperator <?>... inputs) {
        Table[] inputTables = new Table[inputs.length];
        for (int i = 0; i < inputs.length; i++) {
            inputTables[i] = inputs[i].getOutputTable();
        }
        Table[] outputTables = compute(inputTables);
        if (null != outputTables && outputTables.length > 0) {
            this.output = outputTables[0];
        }
        if (null != outputTables && outputTables.length > 1) {
            this.sideOutputs = Arrays.copyOfRange(outputTables, 1, outputTables.length);
        }
        return (T) this;
    }

    /**
     * Link to another operator.
     *
     * @param next the AlgoOperator to link to.
     * @return return the “next operator”
     */
    public final <S extends AlgoOperator<?>> AlgoOperator<?> link(S next) {
        next.linkFrom(this);
        return next;
    }

    /**
     * Applies the computation logic on the input tables.
     * tables.
     *
     * @param inputs the input tables
     * @return the output tables
     */
    @Internal
    public abstract Table[] compute(Table... inputs);

}


Example Usage

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.

Express algorithms that take more than one input and use input with different schemas during the training and prediction process.

Suppose we want to implement a Metapath2vec algorithm, which takes two inputs tables for training, i.e., the edges table (with schema <nodeId, nodeId, edgeWeight>) and the nodeLabels table (with schema <nodeId, nodeLabel>) and outputs one embedding vector for all nodes. During the inference process, the input is a nodeIds table (with schema <nodeId>), the output is the embedding of all vertices in the nodeId table.

In AlgoOperator abstraction, we decouple the train/prediction into two operators, namely Metapath2vecTrainOp and Metapath2vecPredictOp.

Below are the code snippets.


// User code
TableSourceOp edges, nodeLabels; // used for training, can be constructed from Table
TableSourceOp nodeIds; // used for prediction, can be constructed from Table
AlgoOperator modelOp = new MetaPath2vecTrainOp()
      .linkFrom(edges, nodeLabels);
AlgoOperator result = new MetaPath2vecPredictOp()
      .linkFrom(modelOp, nodeIds);

// Developer code
public class MetaPath2vecTrainOp extends AlgoOperator<MetaPath2vecTrainOp> {

       @Internal 
       @Override
       public abstract Table[] compute(Table… inputs) {
               // do the computation logic here and return the graph embedding models
       }

}

public class MetaPath2VecPredictOp extends AlgoOperator<MetaPath2VecPredictOp> {
       
       @Internal 
       @Override
        public abstract Table[] compute(Table… inputs) {
               // do the computation logic here and return the graph embedding models
        }

}

Express the algorithms that are used/covered by machine learning libraries but usually not implemented as a transformer/estimator.

Since AlgoOperator abstracts the computation logic as taking a list of tables and outputting a list of tables, it is easy to use AlgoOperator to implement Evaluation logic and some other machine learning algorithms.

Here are the code snippets that address this use-case by using the proposed APIs.

First run the following code on clusterA:

// User code:
TableSourceOp input;
AlgoOperator auc = new EvaluationAUCOp().linkFrom(input);

// Developer Code:
public class EvaluationAUCOp extends AlgoOperator<EvaluationAUCOp> {
       @Internal 
       @Override
        public abstract Table[] compute(Table… inputs) {
               // do the computation logic     
        }
}



Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will manually migrate the existing open source projects which use the existing Flink ML API to use the proposed APIs.

Note that there is no implementation of Estimator/Transformer (excluding test-only implementations) in the existing Flink codebase. So no work is needed to migrate the existing Flink codebase.

To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

There is no rejected alternatives to be listed here yet.

  • No labels