Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP proposes quite a few changes and addition to the existing Flink ML APIs. We first describe the final APIs of the classes updated by this FLIPproposed API additions and changes, followed by the detailed explanation of changes we have made to the Flink ML APIs.

...

API code of interfaces and classes after making the proposed changes.

API additions and changes

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

Here we list the additions and the changes to the Flink ML API.

1) Removed TableEnvironment from the parameter list of fit/transform APIs.

This change simplifies the usage of fit/transform APIs.

2) Added PipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

This change makes the experience of using Pipeline consistent with the experience of using Estimator/Transformer, where a class is either an Estimator or a Transformer.

3) Removed Pipeline::appendStage from the Pipeline class.

This change makes the concept of Pipeline consistent with that of Graph/GraphBuilder. Neither Graph nor Pipeline provides the API to construct themselves.

4) Removed the Model interface.

This change simplifies the class hierarchy by removing a redundant class. It follows the philosophy of only adding complexity when we have explicit use-case for it.

5) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

This change is reasonable because we will now compose Graph (not just Pipeline) using this class.

6) Added transformSchemas()  to the Stage interface.

This is needed to validate the compatibility of input schemas with a given Estimator/Transformer instance.

7) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.

8) Added setStateStreams and getStateStreams to the Transformer interface.

This change addresses the use-cases described in the motivation section, where a running Transformer needs to ingest the model state streams emitted by a Estimator, which could be running on a different machine.

9) Added Graph, GraphModel and GraphBuilder.

This change addresses the use-cases described in the motivation section, where we need to compose an Estimator from a DAG of Estimator/Transformer.

Interfaces and classes after the proposed API changes

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

Code Block
languagejava
@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    
Code Block
languagejava
@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * This method checks the compatibility between input schemas, stage's parameters and stage's
     * logic. It should raise an exception if there is any mismatch, e.g. the number of input
     * schemas is wrong, or if a required field is missing from a schema.
     *
     * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * <p>Note that the output schemas of a given Estimator instance should equal the output schemas
     * of the Transformer instance fitted by this Estimator instance, suppose the same list of input
     * schemas are used as inputs to the fit/transform methods respectively.
     *
     * @param schemas the list of schemas of the input tables.
     * @return the list of schemas of the output tables.
     */
    TableSchema[] transformSchemas(TableSchema... schemas);

    /** Skipped */
    default String toJson() {...}

    /** Skipped */
    default void loadJson(String json) {...}
}

@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {

    /**
     * AppliesThis themethod Transformerchecks onthe thecompatibility givenbetween input tablesschemas, andstage's returnsparameters the result tables.and stage's
     *
     * @param inputs a list of tables logic. It should raise an exception if there is any mismatch, e.g. the number of input
     * @return schemas is wrong, or if a list of tablesrequired field is missing from a schema.
     */
    Table[] transform(Table... inputs);

    /**
     * Uses the given list of tables to update internal states. This can be useful for e.g. online * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * learning<p>Note wherethat anthe Estimatoroutput fitsschemas anof infinitea streamgiven ofEstimator traininginstance samplesshould andequal streamsthe theoutput modelschemas
     * diff data to thisof the Transformer.
 instance fitted by this *
Estimator instance, suppose the same *list <p>Thisof methodinput
 may be called at most* once.
schemas are used as inputs to the fit/transform methods respectively.
     *
     * @param inputsschemas athe list of schemas of the input tables.
     */
 @return the list defaultof schemas void setStateStreams(Table... inputs) {of the output tables.
     */
   throw newTableSchema[] UnsupportedOperationException("this method is not implemented"transformSchemas(TableSchema... schemas);
    }

    /**
 Skipped    */
 Gets a list ofdefault tablesString representing changes of internal states of this Transformer. ThesetoJson() {...}

    /** Skipped */
    default *void tables might come from the Estimator that instantiated this Transformer.loadJson(String json) {...}
}

@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T> {

     /**
     * @return a list of tables
     */ Applies the Transformer on the given input tables, and returns the result tables.
    default Table[] getStateStreams() { *
     * @param  throw new UnsupportedOperationException("this method is not implemented");
    }
}

@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {inputs a list of tables
     * @return a list of tables
     */
    Table[] transform(Table... inputs);

    /**
     * Trains onUses the given inputs and produces a Transformer. If this Estimator may be used tolist of tables to update internal states. This can be useful for e.g. online
     * composelearning awhere Pipeline,an theEstimator transformfits methodan ofinfinite thestream returnedof Transformertraining shouldsamples beand ablestreams tothe acceptmodel
     * adiff listdata ofto tablesthis of the same length and schemas as the fit method of this EstimatorTransformer.
     *
     * <p>This method may be called at most once.
     *
     * @param inputs a list of tables
     * @return a Transformer/
    default */
    M fitvoid setStateStreams(Table... inputs);
}

@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {... {
        throw new UnsupportedOperationException("this method is not implemented");
    }

    /**
  Skipped a few methods,* includingGets thea implementationslist of thetables Estimatorrepresenting APIs. */
}

@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

changes of internal states of this Transformer. These
    public PipelineModel(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

The following code block shows the interface of Graph, GraphModel and GraphBuilder that we propose to add.

Code Block
languagejava
/**
 * A Graph acts as an Estimator. It consists of a DAG of stages, each of which is either an
 * Estimator or Transformer.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {
    public Graph(...) {...}

    @Override
    public GraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
        return schemas;
    }

    /** Skipped a few methods, including the implementations of some Estimator APIs. */
}

/** A GraphModel acts as a Transformer. It consists of a DAG of Transformers. */
@PublicEvolving
public final class GraphModel implements Transformer<GraphModel> {
    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

/** A GraphBuilder helps connect Stage instances into a Graph or GraphModel. */
@PublicEvolving
public final class GraphBuilder {
    /**
     * Specifies the upper bound (could be loose) of the number of output tables that can be
     * returned by the Transformer::getStateStreams and Transformer::transform methods, for any
     * stage involved in this Graph.
     *
     * <p>The default upper bound is 20.
     */
    public GraphBuilder setMaxOutputLength(int maxOutputLength* tables might come from the Estimator that instantiated this Transformer.
     *
     * @return a list of tables
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}

@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {

    /**
     * Trains on the given inputs and produces a Transformer. If this Estimator may be used to
     * compose a Pipeline, the transform method of the returned Transformer should be able to accept
     * a list of tables of the same length and schemas as the fit method of this Estimator.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

    public PipelineModel(List<Transformer<?>> transformers) {...}

    /**
     * Creates Skipped a TableIdfew methods, including the implementations of the Transformer APIs. */
}


The following code block shows the interface of Graph, GraphModel and GraphBuilder that we propose to add.

Code Block
languagejava
/**
 * A Graph acts as an Estimator. It consists of a DAG of stages, each of which is either an
 * Estimator or Transformer.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {associated with this GraphBuilder. It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated
     * by this builder.
     */
    public TableId createTableId(Graph(...) {...}

    /**@Override
    public * The Graph::fit and GraphModel::transform should invoke the fit/transform of the correspondingGraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
     * stage with the corresponding inputs.return schemas;
     *}

     /** <p>ReturnsSkipped a listfew of TableIdsmethods, whichincluding representsthe outputsimplementations of some the Transformer::transform
     * invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId... inputs) {...}
Estimator APIs. */
}

/** A GraphModel acts as a Transformer. It consists of a DAG of Transformers. */
@PublicEvolving
public final class GraphModel implements Transformer<GraphModel> {
    /**
 Skipped a few methods, *including The GraphModel::setStateStreams should invoke the setStateStreams of the corresponding stage
     * with the corresponding inputs.
     */the implementations of the Transformer APIs. */
}

/** A GraphBuilder helps connect Stage instances into a Graph or GraphModel. */
@PublicEvolving
public final class GraphBuilder {
    /**
    void setStateStreams(Stage<?> stage, TableId... inputs) {...}

    /**
     * The GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.* Specifies the upper bound (could be loose) of the number of output tables that can be
     *
 returned by the  * <p>Returns a list of TableIds, which represents outputs of the getStateStreams invocationTransformer::getStateStreams and Transformer::transform methods, for any
     * stage involved in this Graph.
     */
    TableId[] getStateStreams(Stage<?> stage) {...}

    /** * <p>The default upper bound is 20.
     */
 Returns a Graph instancepublic whichGraphBuilder the following API specification: - Graph::fit should takesetMaxOutputLength(int maxOutputLength) {...}

    /**
     * inputsCreates anda returnsTableId aassociated GraphModel with thethis following specificationGraphBuilder. - GraphModel::transform
     * should take inputs and returns outputs. - GraphModel::setStateStreams should take
     * inputStates. - GraphModel::getStateStreams should return outputStates.
     *It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated
     * by this builder.
     * <p>The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal/
    public TableId createTableId() {...}

    /**
     * stages in the order specified by the DAG of stages.The Graph::fit and GraphModel::transform should invoke the fit/transform of the corresponding
     */
 stage with the Graph build(]corresponding inputs,.
 TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

 *
    /**
     * Returns<p>Returns a GraphModel instancelist of TableIds, which therepresents followingoutputs APIof specification: - GraphModelthe Transformer::transform
     * shouldinvocation.
 take inputs and returns outputs. - GraphModel::setStateStreams should take */
    public * inputStates. - GraphModel::getStateStreams should return outputStates.TableId[] getOutputs(Stage<?> stage, TableId... inputs) {...}

     /**
     * <p>The transform/setStateStreams/getStateStreamsThe GraphModel::setStateStreams should invoke the APIssetStateStreams of the corresponding internalstage
     * stages inwith the order specified by the DAG of stagescorresponding inputs.
     */
    void setStateStreams(Stage<?> stage, TableId... inputs) {...}

    /**
   <p>This method throws* exception if any stage of this graph is an EstimatorThe GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.
     */
    GraphModel buildModel(TableId[] inputs, TableId[] outputs, TableId[] inputStates,* <p>Returns a list of TableIds, which represents outputs of the getStateStreams invocation.
     */
    TableId[] outputStatesgetStateStreams(Stage<?> stage) {...}

    //**
 The TableId is necessary to* passReturns thea inputs/outputsGraph of variousinstance which the following API calls across the specification: - Graph::fit should take
    // Graph/GraphModel stags.
    static class TableId {}

}

2) The list of API additions and changes

Here we list the additions and the changes to the Flink ML API

1) Removed TableEnvironment from the parameter list of fit/transform APIs.

This change simplifies the usage of fit/transform APIs.

2) Added PipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

This change makes the experience of using Pipeline consistent with the experience of using Estimator/Transformer, where a class is either an Estimator or a Transformer.

3) Removed Pipeline::appendStage from the Pipeline class.

This change makes the concept of Pipeline consistent with that of Graph/GraphBuilder. Neither Graph nor Pipeline provides the API to construct themselves.

4) Removed the Model interface.

This change simplifies the class hierarchy by removing a redundant class. It follows the philosophy of only adding complexity when we have explicit use-case for it.

5) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

This change is reasonable because we will now compose Graph (not just Pipeline) using this class.

6) Added transformSchemas()  to the Stage interface.

This is needed to validate the compatibility of input schemas with a given Estimator/Transformer instance.

7) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.

8) Added setStateStreams and getStateStreams to the Transformer interface.

This change addresses the use-cases described in the motivation section, where a running Transformer needs to ingest the model state streams emitted by a Estimator, which could be running on a different machine.

9) Added Graph, GraphModel and GraphBuilder.

...

 * inputs and returns a GraphModel with the following specification. - GraphModel::transform
     * should take inputs and returns outputs. - GraphModel::setStateStreams should take
     * inputStates. - GraphModel::getStateStreams should return outputStates.
     *
     * <p>The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal
     * stages in the order specified by the DAG of stages.
     */
    Graph build(] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a GraphModel instance which the following API specification: - GraphModel::transform
     * should take inputs and returns outputs. - GraphModel::setStateStreams should take
     * inputStates. - GraphModel::getStateStreams should return outputStates.
     *
     * <p>The transform/setStateStreams/getStateStreams should invoke the APIs of the internal
     * stages in the order specified by the DAG of stages.
     *
     * <p>This method throws exception if any stage of this graph is an Estimator.
     */
    GraphModel buildModel(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    // The TableId is necessary to pass the inputs/outputs of various API calls across the
    // Graph/GraphModel stags.
    static class TableId {}

}


Example Usage

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.

...

Composing an Estimator from a DAG of Estimator/Transformer

Suppose we have the following Transformer and Estimator classes:

...

  • The method takes 2 input tables. The 1st input table is given to a TransformerA instance. And the 2nd input table is given to another TransformerA instance.
  • An EstimatorB instance fits the output tables of these two TransformerA instances and generates a new TransformerB instance.
  • Returns a GraphModel instance which contains 2 TransformerA instance and 1 TransformerB instance, connected using the same DAG as shown above.


Here is the code snippet which achieves the expected goal that addresses this use-case by using the proposed APIs:

Code Block
languagejava
GraphBuilder builder = new GraphBuilder();

// Creates nodes
Stage<?> stage1 = new TransformerA();
Stage<?> stage2 = new TransformerA();
Stage<?> stage3 = new EstimatorB();
// Creates inputs and inputStates
TableId input1 = builder.createTableId();
TableId input2 = builder.createTableId();
// Feeds inputs to nodes and gets outputs.
TableId output1 = builder.getOutputs(stage1, input1)[0];
TableId output2 = builder.getOutputs(stage2, input2)[0];
TableId output3 = builder.getOutputs(stage3, output1, output2)[0];

// Specifies the ordered lists of inputs, outputs, input states and output states that will
// be used as the inputs/outputs of the corresponding Graph and GraphModel APIs.
TableId[] inputs = new TableId[] {input1, input2};
TableId[] outputs = new TableId[] {output3};
// Generates the Graph instance.
Graph graph = builder.build(inputs, outputs, new TableId[]{}, new TableId[]{});

// Use the Graph instance as an Estimator.
GraphModel model = graph.fit(...);
Table[] results = model.transform(...);

...

Online learning by using Transformer and Estimator running on different machines




Compatibility, Deprecation, and Migration Plan

...