Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents


NOTE: we have decided to replace this option with the design proposed here. This design doc will be deleted later.

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

...

For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to take two tables as inputs. These two tables represent nodes labels and edges of the graph respectively. This logic can be expressed as an Estimator with 2 input tables.

...

2) Express a generic machine learning function that has multiple inputs/outputs and make sure the class used for this purpose does not have any historical association with Estimator/Model concepts.

For example, some machine learning-related algorithms like model evaluation, computation of some graph embedding algorithm takes multiple tables as inputs and produces multiple tables as outputs, where the output tables can be used to decide embedding for new inputs. Though the statistics (e.g., ChiSquareTest), graph algorithms (e.g., PageRank) also take tables as input and produce tables as output. Though the functionality of Transformer API

can express this algorithm, some machine learning practitioners have the concern that Transformer is historically associated with the Estimator/Model/Feature Engineering concepts such that it feels weird to use Transformer for this these use-case.  

3) Express an Estimator/Transformer pair whose input schemas are different, and still be able to compose an Estimator from a linear chain (and additionally DAG) of Estimator/Transformer.

For example, the Word2Vec algorithm uses a list of words (i.e. sentence) as the input schema for training. And the fitted algorithm should be able to use a word (instead of a list of words) as the input schema. 

Also, the GraphEmbedding algorithm is expected to take 2 tables as input for training, and take 1 table as input for inference. The input schemas are different between the corresponding Estimator and Transformer.

The existing Pipeline API can not be used to compose such an Estimator with other Transformer/Estimator because it is expected to use the same inputs for the same stage in the Pipeline.

cases.  

34) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

In this scenario, we need to allow the Estimator to be run on a different machine than the Transformer. So that Estimator could consume sufficient computation resource in a cluster while the Transformer could be deployed on edge devices.

54) Provide APIs to allow Estimator/Transformer to be efficiently saved/loaded even if state (e.g. model data) of Estimator/Transformer is more than 10s of GBs.

...

In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:
6
5) The Model interface (which currently simply extends the Transformer interface without adding any extra logic) does not provide any added value. The added class hierarchy complexity is not justified.

76) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

87) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).9) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each otherfrom the experience of using Estimator (with the needFit API).

Background

Note: Readers who are familiar with the existing Estimator/Transformer/Pipeline APIs can skip this section.

...

7) Removed Pipeline::appendStage from the Pipeline class.This change makes the concept of Pipeline consistent with that of Graph/GraphBuilder. Neither Graph nor Pipeline provides the API to construct themselves.

8) Removed the Model interface. And renamed PipelineModel to PipelineTransformer.

...

9) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.This change is reasonable because we will now compose Graph (not just Pipeline) using this class.

Interfaces and classes after the proposed API changes

...

Code Block
languagejava
/**
 * Base class for a stage in a Pipeline or Graph. The interface is only a concept, and does not have any actual
 * functionality. Its subclasses could be Estimator, Transformer or AlgoOperator. No other classes should inherit this
 * interface directly.
 *
 * <p>Each stage is with parameters, and requires a public empty constructor for restoration.
 *
 * @param <T> The class type of the Stage implementation itself.
 * @see WithParams
 */
@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * Saves this stage to the given path.
     */
    void save(String path);

    /**
     * Loads this stage from the given path.
     */
    void load(String path);
}

/**
 * A AlgoOperator is a Stage that takes a list of tables as inputs and produces a list of
 * tables as results. It can be used to encode a generic multi-input multi-output machine learning function.
 *
 * @param <T> The class type of the AlgoOperator implementation itself.
 */
@PublicEvolving
public interface AlgoOperator<T extends AlgoOperator<T>> extends Stage<T> {
    /**
     * Applies the AlgoOperator on the given input tables, and returns the result tables.
     *
     * @param inputs a list of tables
     * @return a list of tables
     */
    Table[] transform(Table... inputs);
}

/**
 * A Transformer is a AlgoOperator with additional support for state streams, which could be set by the Estimator that fitted
 * this Transformer. Unlike AlgoOperator, a Transformer is typically associated with an Estimator.
 *
 * @param <T> The class type of the Transformer implementation itself.
 */
@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends AlgoOperator<T> {
    /**
     * Uses the given list of tables to update internal states. This can be useful for e.g. online
     * learning where an Estimator fits an infinite stream of training samples and streams the model
     * diff data to this Transformer.
     *
     * <p>This method may be called at most once.
     *
     * @param inputs a list of tables
     */
    default void setStateStreams(Table... inputs) {
        throw new UnsupportedOperationException("this method is not implemented");
    }

    /**
     * Gets a list of tables representing changes of internal states of this Transformer. These
     * tables might come from the Estimator that instantiated this Transformer.
     *
     * @return a list of tables
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}

/**
 * An Estimator is a Stage that takes a list of tables as inputs and produces a Transformer.
 *
 * @param <E> class type of the Estimator implementation itself.
 * @param <M> class type of the Transformer this Estimator produces.
 */
@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {
    /**
     * Trains on the given inputs and produces a Transformer.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

/**
 * A Pipeline acts as an Estimator. It consists of an ordered list of stages, each of which could be
 * an Estimator, Transformer or AlgoOperator.
 */
@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, pipelineTransformer> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public pipelineTransformer fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

/**
 * A pipelineTransformer acts as a Transformer. It consists of an ordered list of Transformers or AlgoOperators.
 */
@PublicEvolving
public final class pipelineTransformer implements Transformer<pipelineTransformer> {

    public pipelineTransformer(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

...