Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.

2) Express an Estimator/Transformer pair whose input schemas are different, and still be able to compose an Estimator from a linear chain (and additionally DAG) of Estimator/Transformera generic machine learning function that has multiple inputs/outputs and make sure the class used for this purpose does not have any historical association with Estimator/Model concepts.

For example, some graph embedding algorithm takes multiple tables as inputs and produces multiple tables as outputs, where the output tables can be used to decide embedding for new inputs. Though the Transformer API

can express this algorithm, some machine learning practitioners have the concern that Transformer is historically associated with the Estimator/Model concepts such that it feels weird to use Transformer for this use-case.  

3) Express an Estimator/Transformer pair whose input schemas are different, and still be able to compose an Estimator from a linear chain (and additionally DAG) of Estimator/Transformer.

For example, the Word2Vec algorithm uses a list of words (i.e. sentence the Word2Vec algorithm uses a list of words (i.e. sentence) as the input schema for training. And the fitted algorithm should be able to use a word (instead of a list of words) as the input schema for training. And the fitted algorithm should be able to use a word (instead of a list of words) as the input schema. 

Also, the Also, the GraphEmbedding algorithm is expected to take 2 tables as input for training, and take 1 table as input for inference. The input schemas are different between the corresponding Estimator and Transformer.

The existing Pipeline API can not be used to compose such an Estimator with other Transformer/Estimator because it is expected to use the same inputs for the same stage in the Pipeline.

34) Compose a directed-acyclic-graph (i.e DAG) Estimator/Transformer into an Estimator/Transformer.

For example, the workflow may involve the join of 2 tables, where each table could be generated by a chain of Estimator/Transformer. The entire workflow is therefore a DAG of Estimator/Transformer.

45) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

In this scenario, we need to allow the Estimator to be run on a different machine than the Transformer. So that Estimator could consume sufficient computation resource in a cluster while the Transformer could be deployed on edge devices.

56) Provide APIs to allow Estimator/Transformer to be efficiently saved/loaded even if state (e.g. model data) of Estimator/Transformer is more than 10s of GBs.

...

In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:
6
7) The Model interface (which currently simply extends the Transformer interface without adding any extra logic) does not provide any added value. The added class hierarchy complexity is not justified.

78) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

89) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

910) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each other.

...

The following changes are the most important changes proposed by this doc:

1) Added the MLFunc class. MLFunc class has the same interface as the existing Transformer (i.e. has the transform method).

This change address the need to encode a generic multi-input multi-output machine learning function. 

2) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.

23) Added Graph, GraphModel and GraphBuilder.

This change addresses the use-cases described in the motivation section, where we need to compose an Estimator from a DAG of Estimator/Transformer. Note that the Graph/GraphBuilder supports Estimator class whose input schemas are different from its fitted Transformer.

34) Added transformSchemas()  to the Stage interface.

This is needed to validate the compatibility of input schemas with a given Estimator/Transformer instance.

45) Added setStateStreams and getStateStreams to the Transformer interface.

This change addresses the use-cases described in the motivation section, where a running Transformer needs to ingest the model state streams emitted by a Estimator, which could be running on a different machine.

56) Remove Removed the methods PipelineStage::toJson and PipelineStage::loadJson. Add methods save(...) and load(...) to the Stage interface.


The following changes are relatively minor:

67) Removed TableEnvironment from the parameter list of fit/transform APIs.

This change simplifies the usage of fit/transform APIs.

78) Added PipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

This change makes the experience of using Pipeline consistent with the experience of using Estimator/Transformer, where a class is either an Estimator or a Transformer.

89) Removed Pipeline::appendStage from the Pipeline class.

This change makes the concept of Pipeline consistent with that of Graph/GraphBuilder. Neither Graph nor Pipeline provides the API to construct themselves.

910) Removed the Model interface.

This change simplifies the class hierarchy by removing a redundant class. It follows the philosophy of only adding complexity when we have explicit use-case for it.

911) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

...

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after making the changes listed above.


Code Block
languagejava
/**
 * Base class for a stage in a Pipeline or Graph. The interface is only a concept, and does not have any actual
 * functionality. Its subclasses could be Estimator, Transformer or MLfunc. No other classes should inherit this
 * interface directly.
 *
 * <p>Each stage is with parameters, and requires a public empty constructor for restoration.
 *
 * @param <T> The class type of the Stage implementation itself.
 * @see WithParams
 */
@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * This method checks the compatibility between input schemas, stage's parameters and stage's
     * logic. It should raise an exception if there is any mismatch, e.g. the number of input
     * schemas is wrong, or if a required field is missing from a schema.
     *
     * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * <p>Note that the output schemas of a given Estimator instance should equal the output schemas
     * of the Transformer instance fitted by this Estimator instance, suppose the same list of input
     * schemas are used as inputs to the fit/transform methods respectively.
     *
     * <p>For any Estimator instance added in a Pipeline, the transform method of the Transformer returned by this
     * Estimator instance should be able to accept a list of tables of the same length and schemas as the fit method of@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * This method checks the compatibility between input schemas, stage's parameters and stage's
     * this Estimator logicinstance.
 It  should raise an*
 exception if there is any* mismatch, e.g.@param schemas the list of numberschemas of the input tables.
     * @return the list of schemas of the is wrong, or if a required field is missing from a schema.
output tables.
     */
    TableSchema[] transformSchemas(TableSchema... schemas);

     /**
     * <p>IfSaves therethis is no mismatch,stage to the methodgiven derivespath.
 and returns the output schemas from the input */
    void * schemas.
 save(String path);

    /**
     * <p>NoteLoads thatthis thestage outputfrom schemasthe of a given Estimatorpath.
 instance should equal the output*/
 schemas
   void load(String path);
}

/**
 of* theA TransformerMLFunc instanceis fitteda byStage thisthat Estimator instance, suppose the sametakes a list of input
     * schemas are used tables as inputs to the fit/transform methods respectively.
     *
     * @param schemas the list of schemas of the input tables.
     * @return the list of schemas of the output tables.
     */
    TableSchema[] transformSchemas(TableSchema... schemas);

 and produces a list of
 * tables as results. It can be used to encode a generic multi-input multi-output machine learning function.
 *
 * @param <T> The class type of the MLFunc implementation itself.
 */
@PublicEvolving
public interface MLFunc<T extends MLFunc<T>> extends Stage<T> {
    /**
     * Applies the MLFunc on the *given Savesinput thistables, stageand toreturns the givenresult pathtables.
     */
     void* save(String path);

    /**@param inputs a list of tables
     * Loads@return thisa stagelist from the given path.of tables
     */
    voidTable[] load(String pathtransform(Table... inputs);
}

@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {

    /**
 * A Transformer is *a Applies theMLFunc Transformerwith onadditional thesupport givenfor inputstate tablesstreams, andwhich returnscould thebe resultset tables.
by the Estimator that fitted
 *
 this Transformer. Unlike MLFunc, a *Transformer @paramis inputstypically aassociated listwith ofan tablesEstimator.
 *
 * @param <T> *The @returnclass a listtype of tables
the Transformer implementation itself.
  */
@PublicEvolving
public interface Transformer<T extends Transformer<T>> Table[] transform(Table... inputs);
extends MLFunc<T> {
    /**
     * Uses the given list of tables to update internal states. This can be useful for e.g. online
     * learning where an Estimator fits an infinite stream of training samples and streams the model
     * diff data to this Transformer.
     *
     * <p>This method may be called at most once.
     *
     * @param inputs a list of tables
     */
    default void setStateStreams(Table... inputs) {
        throw new UnsupportedOperationException("this method is not implemented");
    }

    /**
     * Gets a list of tables representing changes of internal states of this Transformer. These
     * tables might come from the Estimator that instantiated this Transformer.
     *
     * @return a list of tables
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}

/**
 * An Estimator is a Stage that takes a list of tables as inputs and produces a Transformer.
 *
 * @param <E> class type of the Estimator implementation itself.
 * @param <M> class type of the Transformer this Estimator produces.
 */
@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {

    /**
     * Trains on the given inputs and produces a Transformer.
 If this Estimator may be used to*
     * compose@param inputs a Pipeline, the transform methodlist of the returned Transformer should be able to accepttables
     * @return a list ofTransformer
 tables of the same length*/
 and schemas as theM fit method of this Estimator.
     (Table... inputs);
}

/**
 * A Pipeline  * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}
acts as an Estimator. It consists of an ordered list of stages, each of which could be
 * an Estimator, Transformer or MLFunc.
 */
@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

/**
 * A PipelineModel acts as a Transformer. It consists of an ordered list of Transformers or MLFuncs.
 */
}

@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

    public PipelineModel(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

...

Code Block
languagejava
/**
 * A Graph, which acts as an estimatorEstimator. A Graph consists of a DAG of stages, each of which iscould eitherbe
 * an Estimator, Transformer or a TransformerMLFunc. When `Graph::fit` is called, the stages are executed in a
 * topologically-sorted order. If a stage is an Estimator, its `Estimator::fit` method will be
 * called on the input tables (from the input edges) to fit a model. Then the model, which is a
 * transformerTransformer, will be used to transform the input tables to produce output tables to the output
 * edges. If a stage is a Transformer transformeror MLFunc, its `Transformer`MLFunc::transform` method will be called on the
 * input tables to produce output tables to the output edges. The fitted model from a Graph is a
 * GraphModel, which consistingconsists of fitted models and transformers, with those stages having 1-1 mappingcorresponding to the Graph's
 * stages in the original Graph.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {
    public Graph(...) {...}

    @Override
    public GraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
        return schemas;
    }

    /** Skipped a few methods, including the implementations of some Estimator APIs. */
}

/**
 * A GraphModel, which acts as a Transformer. A GraphModel consists of a DAG of transformersTransformers or MLFuncs. When
 * `GraphModel::transform` is called, the stages are executed in a topologically-sorted order. When
 * a stage is executed, its `Transformer`MLFunc::transform` method will be called on the input tables (from
 * the input edges) to produce output tables to the output edges.
 */
public final class GraphModel implements Transformer<GraphModel> {
    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

/**
 * A GraphBuilder provides APIs to build Graph and GraphModel from a DAG of Estimator, Transformer and TransformerMLFunc instances.
 */
@PublicEvolving
public final class GraphBuilder {
    /**
     * Specifies the upper bound (could be loose) of the number of output tables that can be
     * returned by the Transformer::getStateStreams and Transformer::transform methods, for any
     * stage involved in this Graph.
     *
     * <p>The default upper bound is 20.
     */
    public GraphBuilder setMaxOutputLength(int maxOutputLength) {...}

    /**
     * Creates a TableId associated with this GraphBuilder. It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated
     * by this builder.
     */
    public TableId createTableId() {...}

    /**
     * If the stage is an Estimator, both its fit method and the transform method of its fitted Transformer would be
     * invoked with the given inputs when the graph runs.
     *
     * If<p>If this stage is a Transformer or MLfunc, its transform method would be invoked with the given
     * inputs when the graph runs.
     *
     * Returns a list of TableIds, which represents outputs of the Transformer::transform invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId... inputs) {...}

    /**
     * If this stage is an Estimator, its fit method would be invoked with estimatorInputs, and the transform method
     * of its fitted Transformer would be invoked with transformerInputs, when the graph runs.
     *
     * This<p>This method throws Exception if the stage is a Transformer or MLFunc.
     *
     * This method is useful when the state is an Estimator AND the Estimator::fit needs to take a different list of
     * Tables from the Transformer::transform of the fitted Transformer.
     *
     * Returns a list of TableIds, which represents outputs of the Transformer::transform invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId[] estimatorInputs, TableId[] transformerInputs) {...}

    /**
     * The GraphModel::setStateStreams should invoke the setStateStreams of the corresponding stage
     * with the corresponding inputs.
     */
    void setStateStreams(Stage<?> stage, TableId... inputs) {...}

    /**
     * The GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.
     *
     * <p>Returns a list of TableIds, which represents outputs of the getStateStreams invocation.
     */
    TableId[] getStateStreams(Stage<?> stage) {...}

    /**
     * Returns a Graph instance which the following API specification:
     *
     * 1) Graph::fit should take inputs and returns a GraphModel with the following specification.
     *
     * 2) GraphModel::transform should take inputs and return outputs.
     *
     * 3) GraphModel::setStateStreams should take inputStates.
     *
     * 4) GraphModel::getStateStreams should return outputStates.
     *
     * The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in the order specified by the DAG of stages.
     */
    Graph build(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a Graph instance which the following API specification:
     *
     * 1) Graph::fit should take estimatorInputs and returns a GraphModel with the following specification.
     *
     * 2) GraphModel::transform should take transformerInputs and return outputs.
     *
     * 3) GraphModel::setStateStreams should take inputStates.
     *
     * 4) GraphModel::getStateStreams should return outputStates.
     *
     * The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in the order specified by the DAG of stages.
     *
     * This method is useful when the Graph::fit needs to take a different list of Tables from the GraphModel::transform of the fitted GraphModel.
     */
    Graph build(TableId[] estimatorInputs, TableId[] transformerInputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a GraphModel instance which the following API specification:
     *
     * 1) GraphModel::transform should take inputs and returns outputs.
     *
     * 2) GraphModel::setStateStreams should take inputStates.
     *
     * 3) GraphModel::getStateStreams should return outputStates.
     *
     * The transform/setStateStreams/getStateStreams should invoke the APIs of the internal  stages in the order specified by the DAG of stages.
     *
     * This method throws Exception if any stage of this graph is an Estimator.
     */
    GraphModel buildModel(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    // The TableId is necessary to pass the inputs/outputs of various API calls across the
    // Graph/GraphModel stags.
    static class TableId {}

}

...