Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

...

Note: Part of the proposed modification or use case is the same as that in FLIP-173. To be self-contained, we copy-paste it here. 

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

...

5) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

Background

Note: Readers who are familiar with the existing Estimator/Transformer/Pipeline APIs can skip this section.

...

It is important to make the following observation: if we don't provide the Pipeline class, users can still accomplish the same use-cases targeted by Pipeline by explicitly writing the training logic and inference logic separately using Estimator/Transformer APIs. But users would have to construct this chain of Estimator/Transformer twice (for training and inference respectively).

Design Principles

Multiple choices exist to address the use-cases targeted by this design doc. In the following, we explain the design principles followed by the proposed design, to hopefully make the understanding of the design choices more intuitive.

...

To enable high usability, we wrap the computation logic with a chaining method for efficiently constructing the machine learning job and make the code more human-readable.

Public Interfaces

This FLIP proposes quite a few changes and additions to the existing Flink ML APIs. We first describe the proposed API additions and changes, followed by the API code of interfaces and classes after making the proposed changes.

API additions and changes

Here we list the additions and the changes to the Flink ML API.

...

For each algorithm, developers need to implement a train operator and a prediction operator. Take Kmeans as an example, developers need to implement a KmeansTrainOp and KmeansPredictOp to describe the training and prediction logic. The input and output of a train operator are the training data and the model data, respectively. The inputs of a prediction operator are the model data and the data used for prediction. The output of a prediction operator is the prediction results.

Interfaces and classes after the proposed API changes

The following code block shows the PipelineStage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

...

Code Block
languagejava
public abstract class AlgoOperator<T extends AlgoOperator> implements WithParams <T> {
    // the first output table
    protected Table output = null;
    // the side output tables
    private Table[] sideOutputs = null;
    // the params
    private Params params = null;

    /**
     * get the output table
     *
     * @return
     */
    public final Table getOutputTable() {
        if (null == this.output) {
            throw new RuntimeException(
                "There is no output. Please call current AlgoOperator's 'link' or related method firstly, or this "
                    + "AlgoOperator has no output.");
        } else {
            return this.output;
        }
    }

    /**
     * get Params for this algorithm.
     *
     * @return
     */
    public final Params getParams() {
        return params;
    }

    /**
     * get side outputs of this operator
     *
     * @return
     */
    public final AlgoOperator<?>[] getSideOutputs() {
        if (null == sideOutputs) {
            return null;
        }

        TableSourceOp[] sideOutputOps = new TableSourceOp[sideOutputs.length];
        for (int i = 0; i < sideOutputs.length; i++) {
            sideOutputOps[i] = new TableSourceOp(sideOutputs[i]);
        }
        return sideOutputOps;
    }

    /**
     * Link from other AlgoOperators. This function encapsulates the computation logic of this AlgoOperator.
     * Note that: Only the first output table of each input will be used in the computation logic.
     *
     * @param inputs the input AlgoOperators
     * @return return this AlgoOperator
     */
    public final T linkFrom(AlgoOperator <?>... inputs) {
        Table[] inputTables = new Table[inputs.length];
        for (int i = 0; i < inputs.length; i++) {
            inputTables[i] = inputs[i].getOutputTable();
        }
        Table[] outputTables = compute(inputTables);
        if (null != outputTables && outputTables.length > 0) {
            this.output = outputTables[0];
        }
        if (null != outputTables && outputTables.length > 1) {
            this.sideOutputs = Arrays.copyOfRange(outputTables, 1, outputTables.length);
        }
        return (T) this;
    }

    /**
     * Link to another operator.
     *
     * @param next the AlgoOperator to link to.
     * @return return the “next operator”
     */
    public final <S extends AlgoOperator<?>> AlgoOperator<?> link(S next) {
        next.linkFrom(this);
        return next;
    }

    /**
     * Applies the computation logic on the input tables.
     * tables.
     *
     * @param inputs the input tables
     * @return the output tables
     */
    @Internal
    public abstract Table[] compute(Table... inputs);

}


Example Usage

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.

Express algorithms that take more than one input and use input with different schemas during the training and prediction process.

Suppose we want to implement a Metapath2vec algorithm, which takes two inputs tables for training, i.e., the edges table (with schema <nodeId, nodeId, edgeWeight>) and the nodeLabels table (with schema <nodeId, nodeLabel>) and outputs one embedding vector for all nodes. During the inference process, the input is a nodeIds table (with schema <nodeId>), the output is the embedding of all vertices in the nodeId table.

...

Code Block
languagejava
// User code

TableSourceOp edges, nodeLabels; // used for training, can be constructed from Table
TableSourceOp nodeIds; // used for prediction, can be constructed from Table
AlgoOperator modelOp = new MetaPath2VecTrainOp()
      .linkFrom(edges, nodeLabels);
AlgoOperator result = new MetaPath2VecPredictOp()
      .linkFrom(modelOp, nodeIds);

// Developer code

public class MetaPath2VecTrainOp extends AlgoOperator<MetaPath2VecTrainOp> {

       @Internal 
       @Override
       public Table[] compute(Table… inputs) {
               // do the computation logic
       }
}

public class MetaPath2VecPredictOp extends AlgoOperator<MetaPath2VecPredictOp> {
       
       @Internal 
       @Override
       public Table[] compute(Table… inputs) {
              // do the computation logic
       }
}

Express the algorithms that are used/covered by machine learning libraries but usually not implemented as a transformer/estimator.

Since AlgoOperator abstracts the computation logic as taking a list of tables and outputting a list of tables, it is easy to use AlgoOperator to implement Evaluation logic and some other machine learning algorithms.

...

Code Block
languagejava
// User code:

TableSourceOp input;
AlgoOperator auc = new EvaluationAUCOp().linkFrom(input);

// Developer Code:

public class EvaluationAUCOp extends AlgoOperator<EvaluationAUCOp> {
       @Internal 
       @Override
       public Table[] compute(Table… inputs) {
              // do the computation logic     
       }
}



Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will manually migrate the existing open source projects which use the existing Flink ML API to use the proposed APIs.

...

To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

There is no rejected alternatives to be listed here yet.

...