Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

...

JIRA: https://issues.apache.org/jira/browse/FLINK-22915

Released: 1.1415

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents


NOTE: we have decided to replace this option with the design proposed here. This design doc will be deleted later.

Motivation and Use-cases

The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.

...

For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to take two tables as inputs. These two tables represent nodes labels and edges of the graph respectively. This logic can be expressed as an Estimator with 2 input tables.

And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.

2) Express an Estimator/Transformer pair whose input schemas are different, and still be able to compose an Estimator from a linear chain (and additionally DAG) of Estimator/Transformer.

For example, the Word2Vec algorithm uses a list of words (i.e. sentence) as the input schema for training. And the fitted algorithm should be able to use a word (instead of a list of words) as the input schema. 

Also, the GraphEmbedding algorithm is expected to take 2 tables as input for training, and take 1 table as input for inference. The input schemas are different between the corresponding Estimator and Transformer.

The existing Pipeline API can not be used to compose such an Estimator with other Transformer/Estimator because it is expected to use the same inputs for the same stage in the Pipeline.

3) Compose a directed-acyclic-graph (i.e DAG) Estimator/Transformer into an Estimator/Transformer.

For example, the workflow may involve the join of 2 tables, where each table could be generated by a chain of Estimator/Transformer. The entire workflow is therefore a DAG of Estimator/Transformer.

4) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

a generic machine learning function that has multiple inputs/outputs and make sure the class used for this purpose does not have any historical association with Estimator/Model concepts.

For example, some machine learning-related algorithms like model evaluation, computation of some statistics (e.g., ChiSquareTest), graph algorithms (e.g., PageRank) also take tables as input and produce tables as output. Though the functionality of Transformer API

can express this algorithm, some machine learning practitioners have the concern that Transformer is historically associated with the Estimator/Model/Feature Engineering concepts such that it feels weird to use Transformer for these use-cases.  

3) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

In this scenario, we In this scenario, we need to allow the Estimator to be run on a different machine than the Transformer. So that Estimator could consume sufficient computation resource in a cluster while the Transformer could be deployed on edge devices.In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:
5) The Model interface (which currently simply extends the Transformer interface without adding any extra logic) does not provide any added value. The added class hierarchy complexity is not justified.

6) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

7) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

8) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each other.

Background

4) Provide APIs to allow Estimator/Transformer to be efficiently saved/loaded even if state (e.g. model data) of Estimator/Transformer is more than 10s of GBs.

The existing PipelineStage::toJson basically requires developer of Transformer/Estimator to serialize all model data into an in-memory string, which could be very inefficient (or practically impossible) if the model data is very large (e.g 10s of GBs).


In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:

5) The Model interface (which currently simply extends the Transformer interface without adding any extra logic) does not provide any added value. The added class hierarchy complexity is not justified.

6) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

7) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

Background

Note: Note: Readers who are familiar with the existing Estimator/Transformer/Pipeline APIs can skip this section.

...

It is important to make the following observation: if we don't provide the Pipeline class, users can still accomplish the same use-cases targeted by Pipeline by explicitly writing the training logic and inference logic separately using Estimator/Transformer APIs. But users would have to construct this chain of Estimator/Transformer twice (for training and inference respectively).

Design Principles

Multiple choices exist to address the use-cases targeted by this design doc. In the following, we explain the design principles followed by the proposed design, to hopefully make the understanding of the design choices more intuitive.

1) When the new use-case can be supported by just extending the arity of an existing API, we prefer to extend the arity of this API instead of adding a new class.

As a result of this philosophy, in order to support algorithms which can have multiple inputs and multiple outputs, we choose to extend Transformer::transform and Estimator::fit to take multiple Tables as inputs. And we also extend Transformer::transform to return multiple Tables as outputs.

An alternative solution is to add new classes, e.g. MultiInputTransformer, which has a transform(...) method that takes multiple input Tables and return multiple output Tables. In comparison to the proposed approach, this approach increases the number of classes that users have to deal with.

2) As much as possible, the API design should allow users to address the new use-case while still enjoying the existing benefits.

As described in the Background Section, the existing Pipeline class allows users to compose an Estimator from a linear chain of Estimator/Transformer, without requiring users to specify this linear chain twice. We consider this to be one of the most important feature provided by the existing Scikit-learn/Spark/Flink ML API.

As a result of this philosophy, we believe it is important/intuitive to provide similar benefit as the existing Pipeline class, while allowing users to compose Estimator from DAG of Estimator/Transformer. Therefore, we propose to add the Graph/GraphModel/GraphBuilder classes to provide the following capabilities:

  • Allow users to compose an Estimator from a DAG of Estimator/Transformer, without requiring users to specify this DAG twice

Public Interfaces

This FLIP proposes quite a few changes and additions to the existing Flink ML APIs. We first describe the proposed API additions and changes, followed by the API code of interfaces and classes after making the proposed changes.

API additions and changes

Here we list the additions and the changes to the Flink ML API.

1) Removed TableEnvironment from the parameter list of fit/transform APIs.

Public Interfaces

This FLIP proposes quite a few changes and additions to the existing Flink ML APIs. We first describe the proposed API additions and changes, followed by the API code of interfaces and classes after making the proposed changes.

API additions and changes

Here we list the additions and the changes to the Flink ML API.

The following changes are the most important changes proposed by this doc:

1) Added the AlgoOperator class. AlgoOperator class has the same interface as the existing Transformer (i.e. has the transform method).

This change address the need to encode a generic multi-input multi-output machine learning function. 

2) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.

3) Added setStateStreams and getStateStreams to the Transformer interface.

This change addresses the use-cases described in the motivation section, where a running Transformer needs to ingest the model state streams emitted by a Estimator, which could be running on a different machine.

4) Removed the methods PipelineStage::toJson and PipelineStage::loadJson. Add methods save(...) and load(...) to the Stage interface.


The following changes are relatively minor:

5) Removed TableEnvironment from the parameter list of fit/transform APIs.

This This change simplifies the usage of fit/transform APIs.

26) Added PipelineModel pipelineTransformer and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.

This change makes the experience of using Pipeline consistent with the experience of using Estimator/Transformer, where a class is either an Estimator or a Transformer.

37) Removed Pipeline::appendStage from the Pipeline class.

This change makes the concept of Pipeline consistent with that of Graph/GraphBuilder. Neither Graph nor Pipeline provides the API to construct themselves.

48) Removed the Model interface. And renamed PipelineModel to PipelineTransformer.

This change simplifies the class hierarchy by removing a redundant class. It follows the philosophy of only adding complexity when we have explicit use-case for it.

59) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

This change is reasonable because we will now compose Graph (not just Pipeline) using this class.

6) Added transformSchemas()  to the Stage interface.

This is needed to validate the compatibility of input schemas with a given Estimator/Transformer instance.

7) Updated Transformer/Estimator to take list of tables as inputs and return list of tables as output.

This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.

8) Added setStateStreams and getStateStreams to the Transformer interface.

This change addresses the use-cases described in the motivation section, where a running Transformer needs to ingest the model state streams emitted by a Estimator, which could be running on a different machine.

9) Added Graph, GraphModel and GraphBuilder.

This change addresses the use-cases described in the motivation section, where we need to compose an Estimator from a DAG of Estimator/Transformer. Note that the Graph/GraphBuilder supports Estimator class whose input schemas are different from its fitted Transformer.

Interfaces and classes after the proposed API changes

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and PipelineModel after the proposed changes.

Interfaces and classes after the proposed API changes

The following code block shows the interface of Stage, Transformer, Estimator, Pipeline and pipelineTransformer after making the changes listed above.

Code Block
languagejava
/**
 * Base class for a stage in a Pipeline. The interface is only a concept, and does not have any actual
 * functionality. Its subclasses could be Estimator, Transformer or AlgoOperator. No other classes should inherit this
 * interface directly.
 *
 * <p>Each stage is with parameters, and requires a public empty constructor for restoration.
 *
 * @param <T> The class type of the Stage implementation itself.
 * @see WithParams
 */
Code Block
languagejava
@PublicEvolving
interface Stage<T extends Stage<T>> extends WithParams<T>, Serializable {
    /**
     * Saves Thisthis methodstage checksto the compatibility between input schemas, stage's parameters and stage'sgiven path.
     */
    void * logic. It should raise an exception if there is any mismatch, e.g. the number of inputsave(String path);

    /**
     * Loads this stage from the given path.
     */
 schemas  is wrong, or if a required field is missing from a schema.
     *
     * <p>If there is no mismatch, the method derives and returns the output schemas from the input
     * schemas.
     *
     * <p>Note that the output schemas of a given Estimator instance should equal the output schemas
     * of the Transformer instance fitted by this Estimator instance, suppose the same list of input
     * schemas are used as inputs to the fit/transform methods respectivelyvoid load(String path);
}

/**
 * A AlgoOperator is a Stage that takes a list of tables as inputs and produces a list of
 * tables as results. It can be used to encode a generic multi-input multi-output machine learning function.
 *
 * @param <T> The class type of the AlgoOperator implementation itself.
 */
@PublicEvolving
public interface AlgoOperator<T extends AlgoOperator<T>> extends Stage<T> {
    /**
     * Applies the AlgoOperator on the given input tables, and returns the result tables.
     *
     * @param schemasinputs thea list of schemas of the input tables.tables
     * @return thea list of schemas of the output tables.
     */
    TableSchemaTable[] transformSchemastransform(TableSchemaTable... schemasinputs);
}
    
/** Skipped
 */
 A Transformer is defaulta StringAlgoOperator toJson() {...}

    /** Skipped */
    default void loadJson(String json) {...}
}

with additional support for state streams, which could be set by the Estimator that fitted
 * this Transformer. Unlike AlgoOperator, a Transformer is typically associated with an Estimator.
 *
 * @param <T> The class type of the Transformer implementation itself.
 */
@PublicEvolving
public interface Transformer<T extends Transformer<T>> extends Stage<T>AlgoOperator<T> {

    /**
     * AppliesUses the Transformergiven onlist theof giventables inputto tables,update andinternal returnsstates. theThis resultcan tables.
be useful for   *e.g. online
     * @paramlearning inputswhere aan listEstimator offits tables
an infinite stream of training *samples @returnand astreams listthe of tablesmodel
     */
 diff data to Table[] transform(Table... inputs);

this Transformer.
     /**
     * Uses<p>This themethod givenmay listbe ofcalled tablesat to update internal states. This can be useful for e.g. onlinemost once.
     *
     * learning@param whereinputs ana Estimator fits an infinite stream list of trainingtables
 samples and streams the model*/
    default * diff data to this Transformer.void setStateStreams(Table... inputs) {
     *
   throw new * <p>This method may be called at most once.UnsupportedOperationException("this method is not implemented");
    }

     /**
     * @paramGets inputs a list of tables
 representing changes of internal */
states of this Transformer. defaultThese
 void setStateStreams(Table... inputs) {
 * tables might come from the Estimator throwthat newinstantiated UnsupportedOperationException("this methodTransformer.
 is not implemented");
    }

    /**
     * Gets@return a list of tables representing changes of internal states of this Transformer. These
     */
 tables might come fromdefault the Estimator that instantiated this Transformer.Table[] getStateStreams() {
     *
   throw new * @return a list of tablesUnsupportedOperationException("this method is not implemented");
     */
    default Table[] getStateStreams() {
        throw new UnsupportedOperationException("this method is not implemented");
    }
}

@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {

    /**
     * Trains on the given inputs and produces a Transformer. If this Estimator may be used to
     * compose a Pipeline, the transform method of the returned Transformer should be able to accept
     * a list of tables of the same length and schemas as the fit method of this Estimator.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, PipelineModel> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public PipelineModel fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

@PublicEvolving
public final class PipelineModel implements Transformer<PipelineModel> {

    public PipelineModel(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

The following code block shows the interface of Graph, GraphModel and GraphBuilder that we propose to add.

Code Block
languagejava
/**
 * A Graph acts as an Estimator. It consists of a DAG of stages, each of which is either an
 * Estimator or Transformer.
 */
@PublicEvolving
public final class Graph implements Estimator<Graph, GraphModel> {
    public Graph(...) {...}

    @Override
    public GraphModel fit(Table... inputs) {...}

    @Override
    public TableSchema[] transformSchemas(TableSchema... schemas) {
        return schemas;
    }

    /** Skipped a few methods, including the implementations of some Estimator APIs. */
}

/** A GraphModel acts as a Transformer. It consists of a DAG of Transformers. */
@PublicEvolving
public final class GraphModel implements Transformer<GraphModel> {
    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}

/** A GraphBuilder helps connect Stage instances into a Graph or GraphModel. */
@PublicEvolving
public final class GraphBuilder {
    /**
     * Specifies the upper bound (could be loose) of the number of output tables that can be
     * returned by the Transformer::getStateStreams and Transformer::transform methods, for any
     * stage involved in this Graph.
     *
     * <p>The default upper bound is 20.
     */
    public GraphBuilder setMaxOutputLength(int maxOutputLength) {...}

    /**
     * Creates a TableId associated with this GraphBuilder. It can be used to specify the passing of
     * tables between stages, as well as the input/output tables of the Graph/GraphModel generated
     * by this builder.
     */
    public TableId createTableId() {...}

    /**
     * If the stage is an Estimator, both its fit method and the transform method of its fitted Transformer would be
     * invoked with the given inputs when the graph runs.
     *
     * If this stage is a Transformer, its transform method would be invoked with the given inputs when the graph runs.
     *
     * Returns a list of TableIds, which represents outputs of the Transformer::transform invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId... inputs) {...}

    /**
     * If this stage is an Estimator, its fit method would be invoked with estimatorInputs, and the transform method
     * of its fitted Transformer would be invoked with transformerInputs, when the graph runs.
     *
     * This method throws Exception if the stage is a Transformer.
     *
     * This method is useful when the state is an Estimator AND the Estimator::fit needs to take a different list of
     * Tables from the Transformer::transform of the fitted Transformer.
     *
     * Returns a list of TableIds, which represents outputs of the Transformer::transform invocation.
     */
    public TableId[] getOutputs(Stage<?> stage, TableId[] estimatorInputs, TableId[] transformerInputs) {...}

    /**
     * The GraphModel::setStateStreams should invoke the setStateStreams of the corresponding stage
     * with the corresponding inputs.
     */
    void setStateStreams(Stage<?> stage, TableId... inputs) {...}

    /**
     * The GraphModel::getStateStreams should invoke the getStateStreams of the corresponding stage.
     *
     * <p>Returns a list of TableIds, which represents outputs of the getStateStreams invocation.
     */
    TableId[] getStateStreams(Stage<?> stage) {...}

    /**
     * Returns a Graph instance which the following API specification:
     *
     * 1) Graph::fit should take inputs and returns a GraphModel with the following specification.
     *
     * 2) GraphModel::transform should take inputs and return outputs.
     *
     * 3) GraphModel::setStateStreams should take inputStates.
     *
     * 4) GraphModel::getStateStreams should return outputStates.
     *
     * The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in the order specified by the DAG of stages.
     */
    Graph build(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a Graph instance which the following API specification:
     *
     * 1) Graph::fit should take estimatorInputs and returns a GraphModel with the following specification.
     *
     * 2) GraphModel::transform should take transformerInputs and return outputs.
     *
     * 3) GraphModel::setStateStreams should take inputStates.
     *
     * 4) GraphModel::getStateStreams should return outputStates.
     *
     * The fit/transform/setStateStreams/getStateStreams should invoke the APIs of the internal stages in the order specified by the DAG of stages.
     *
     * This method is useful when the Graph::fit needs to take a different list of Tables from the GraphModel::transform of the fitted GraphModel.
     */
    Graph build(TableId[] estimatorInputs, TableId[] transformerInputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    /**
     * Returns a GraphModel instance which the following API specification:
     *
     * 1) GraphModel::transform should take inputs and returns outputs.
     *
     * 2) GraphModel::setStateStreams should take inputStates.
     *
     * 3) GraphModel::getStateStreams should return outputStates.
     *
     * The transform/setStateStreams/getStateStreams should invoke the APIs of the internal  stages in the order specified by the DAG of stages.
     *
     * This method throws Exception if any stage of this graph is an Estimator.
     */
    GraphModel buildModel(TableId[] inputs, TableId[] outputs, TableId[] inputStates, TableId[] outputStates) {...}

    // The TableId is necessary to pass the inputs/outputs of various API calls across the
    // Graph/GraphModel stags.
    static class TableId {}

}

Example Usage

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.

Composing an Estimator from a DAG of Estimator/Transformer

Suppose we have the following Transformer and Estimator classes:

  • TransformerA whose transform(...) takes 1 input table and has 1 output table.
  • TransformerB whose transform(...) takes 2 input tables and has 1 output table.
  • EstimatorB whose fit(...) takes 2 input tables and returns an instance of TransformerB.

And we want to compose an Estimator (e.g. Graph) from the following DAG of Transformer/Estimator.

Image Removed

The resulting Graph::fit is expected to have the following behavior:

  • The method takes 2 input tables. The 1st input table is given to a TransformerA instance. And the 2nd input table is given to another TransformerA instance.
  • An EstimatorB instance fits the output tables of these two TransformerA instances and generates a new TransformerB instance.
  • Returns a GraphModel instance which contains 2 TransformerA instance and 1 TransformerB instance, connected using the same DAG as shown above.

...

}
}

/**
 * An Estimator is a Stage that takes a list of tables as inputs and produces a Transformer.
 *
 * @param <E> class type of the Estimator implementation itself.
 * @param <M> class type of the Transformer this Estimator produces.
 */
@PublicEvolving
public interface Estimator<E extends Estimator<E, M>, M extends Transformer<M>> extends Stage<E> {
    /**
     * Trains on the given inputs and produces a Transformer.
     *
     * @param inputs a list of tables
     * @return a Transformer
     */
    M fit(Table... inputs);
}

/**
 * A Pipeline acts as an Estimator. It consists of an ordered list of stages, each of which could be
 * an Estimator, Transformer or AlgoOperator.
 */
@PublicEvolving
public final class Pipeline implements Estimator<Pipeline, pipelineTransformer> {

    public Pipeline(List<Stage<?>> stages) {...}

    @Override
    public pipelineTransformer fit(Table... inputs) {...}

    /** Skipped a few methods, including the implementations of the Estimator APIs. */
}

/**
 * A pipelineTransformer acts as a Transformer. It consists of an ordered list of Transformers or AlgoOperators.
 */
@PublicEvolving
public final class pipelineTransformer implements Transformer<pipelineTransformer> {

    public pipelineTransformer(List<Transformer<?>> transformers) {...}

    /** Skipped a few methods, including the implementations of the Transformer APIs. */
}


Example Usage

In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.

Online learning by running Transformer and Estimator concurrently on different machines

Here is an online learning scenario:

  • We have an infinite stream of tagged data that can be used for training.
  • We have an algorithm that can be trained using this infinite stream of data. This algorithm (with its latest states/parameters) can be used to do inference. And the accuracy of the algorithm increases with the increasing amount of training data it has seen.
  • We would like to train this algorithm using the given data stream on clusterA. And uses this algorithm with the update-to-date states/parameters to do inference on 10 different web servers.

In order to address this use-case, we can write the training and inference logics of this algorithm into an EstimatorA class and a TransformerA class with the following API behaviors:

  • EstimatorA::fit takes a table as input and returns an instance of TransformerA. Before this method returns this transformerA, it calls transformerA.setStateStreams(state_table), where the state_table represents the stream of algorithm parameters changes produced by EstimatorA.
  • TransformerA::setStateStreams(...) takes a table as input. Its implementation reads the data from this table to continuously update its algorithm parameters.
  • TransformerA::getStateStreams(...) returns the same table instance that has been provided via TransformerA::setStateStreams(...).
  • TransformerA::transform takes a table as input and returns a table. The returned table represents the inference results.


Here are the code snippets that address this use-case by using the proposed APIs.

First run the following code on clusterA:

Code Block
languagejava
void runTrainingOnClusterA(...) {
  // Creates the training stream from a Kafka topic.
  Table training_stream = ...;

  Estimator estimator = new EstimatorA(...);
  Transformer transformer = estimator.fit(training_stream);
  Table state_stream = transformer.getStateStreams()[0];

  // Writes the state_stream to a Kafka topicA.
  state_stream.sinkTable(...);
  // Saves transformer's state/metadata to a remote path.
  transformer.save(remote_path);

  // Executes the operators generated by the Estimator::fit(...), which reads from training_stream and writes to state_stream.
  env.execute()
}


Then run the following code on each web server:

Code Block
languagejava
void runInferenceOnWebServer(...) {
  // Creates the state stream from Kafka topicA which is written by the above code snippet. 
  Table state_stream = ...;
  // Creates the input stream that needs inference.
  Table input_stream = ...;

  Transformer transformer = new Transformer(...);
  transformer.load(remote_path);
  transformer.setStateStreams(new Table[]{state_stream});
  Table output_stream = transformer.transform(input_stream);

  // Do something with the output_stream.

  // Executes the operators generated by the Transformer::transform(...), which reads from state_stream to update its parameters. 
  // It also does inference on input_stream and produces results to the output_stream.
  env.execute()
}
Code Block
languagejava
GraphBuilder builder = new GraphBuilder();

// Creates nodes
Stage<?> stage1 = new TransformerA();
Stage<?> stage2 = new TransformerA();
Stage<?> stage3 = new EstimatorB();
// Creates inputs and inputStates
TableId input1 = builder.createTableId();
TableId input2 = builder.createTableId();
// Feeds inputs to nodes and gets outputs.
TableId output1 = builder.getOutputs(stage1, input1)[0];
TableId output2 = builder.getOutputs(stage2, input2)[0];
TableId output3 = builder.getOutputs(stage3, output1, output2)[0];

// Specifies the ordered lists of inputs, outputs, input states and output states that will
// be used as the inputs/outputs of the corresponding Graph and GraphModel APIs.
TableId[] inputs = new TableId[] {input1, input2};
TableId[] outputs = new TableId[] {output3};

// Generates the Graph instance.
Graph graph = builder.build(inputs, outputs, new TableId[]{}, new TableId[]{});
// The fit method takes 2 tables which are mapped to input1 and input2.
GraphModel model = graph.fit(...);
// The transform method takes 2 tables which are mapped to input1 and input2.
Table[] results = model.transform(...);

Online learning by running Transformer and Estimator concurrently on different machines

Here is an online learning scenario:

  • We have an infinite stream of tagged data that can be used for training.
  • We have an algorithm that can be trained using this infinite stream of data. This algorithm (with its latest states/parameters) can be used to do inference. And the accuracy of the algorithm increases with the increasing amount of training data it has seen.
  • We would like to train this algorithm using the given data stream on clusterA. And uses this algorithm with the update-to-date states/parameters to do inference on 10 different web servers.

In order to address this use-case, we can write the training and inference logics of this algorithm into an EstimatorA class and a TransformerA class with the following API behaviors:

  • EstimatorA::fit takes a table as input and returns an instance of TransformerA. Before this method returns this transformerA, it calls transformerA.setStateStreams(state_table), where the state_table represents the stream of algorithm parameters changes produced by EstimatorA.
  • TransformerA::setStateStreams(...) takes a table as input. Its implementation reads the data from this table to continuously update its algorithm parameters.
  • TransformerA::getStateStreams(...) returns the same table instance that has been provided via TransformerA::setStateStreams(...).
  • TransformerA::transform takes a table as input and returns a table. The returned table represents the inference results.

...

First run the following code on clusterA:

Code Block
languagejava
void runTrainingOnClusterA(...) {
  // Creates the training stream from a Kafka topic.
  Table training_stream = ...;

  Estimator estimator = new EstimatorA(...);
  Transformer transformer = estimator.fit(training_stream);
  Table state_stream = transformer.getStateStreams()[0];
  String transformer_json = transformer.toJson();

  // Writes the state_stream to a Kafka topicA.
  // Writes transformer_json to a remote file.

  // Executes the operators generated by the Estimator::fit(...), which reads from training_stream and writes to state_stream.
  env.execute()
}

Then run the following code on each web server:

Code Block
languagejava
void runInferenceOnWebServer(...) {

  // Reads the transformer_json from the same remote file written by the above code snippet.
  String transformer_json = ...;
  // Creates the state stream from Kafka topicA which is written by the above code snippet. 
  Table state_stream = ...;
  // Creates the input stream that needs inference.
  Table input_stream = ...;

  Transformer transformer = new Transformer(...);
  transformer.loadJson(transformer_json);
  transformer.setStateStreams(new Table[]{state_stream});
  Table output_stream = transformer.transform(input_stream);

  // Do something with the output_stream.

  // Executes the operators generated by the Transformer::transform(...), which reads from state_stream to update its parameters. It also does inference on input_stream and produces results to the output_stream.
  env.execute()
}

Compose an Estimator from a chain of Estimator/Transformer whose input schemas are different from its fitted Transformer 

Suppose we have the following Estimator and Transformer classes where an Estimator's input schemas could be different from the input schema of its fitted Transformer:

  • TransformerA whose transform(...) takes 1 input table and has 1 output table.
  • EstimatorA whose fit(...) takes 2 input tables and returns an instance of TransformerA.
  • TransformerB whose transform(...) takes 1 input table and has 1 output table.

And we want to compose an Estimator (e.g. Graph) from the following DAG of Transformer/Estimator.

Image Removed

The resulting Graph::fit is expected to have the following behavior:

  • The method takes 2 input tables. Both tables are given to EstimatorA::fit.
  • EstimatorA fits the input tables and generates a TransformerA instance. The TransformerA instance takes 1 table input, which is different from the 2 tables given to the EstimatorA.
  • Returns a GraphModel instance which contains a TransformerA instance and a TransformerB instance, which are connected as a chain.

Notes:

  • The fitted GraphModel takes only 1 table as input whereas the Graph takes 2 tables as inputs.
  • The proposed APIs also support composing an Estimator from a DAG of Estimator/Transformer whose input schemas are different from its fitted Transformer. 

Here is the code snippet that addresses this use-case by using the proposed APIs:

Code Block
languagejava
GraphBuilder builder = new GraphBuilder();

// Creates nodes
Stage<?> stage1 = new EstimatorA();
Stage<?> stage2 = new TransformerB();
// Creates inputs
TableId estimatorInput1 = builder.createTableId();
TableId estimatorInput2 = builder.createTableId();
TableId transformerInput1 = builder.createTableId();

// Feeds inputs to nodes and gets outputs.
TableId output1 = builder.getOutputs(stage1, new TableId[] {estimatorInput1, estimatorInput2}, new TableId[] {transformerInput1})[0];
TableId output2 = builder.getOutputs(stage2, output1)[0];

// Specifies the ordered lists of estimator inputs, transformer inputs, outputs, input states and output states
// that will be used as the inputs/outputs of the corresponding Graph and GraphModel APIs.
TableId[] estimatorInputs = new TableId[] {estimatorInput1, estimatorInput2};
TableId[] transformerInputs = new TableId[] {transformerInput1};
TableId[] outputs = new TableId[] {output2};
TableId[] inputStates = new TableId[] {};
TableId[] outputStates = new TableId[] {};

// Generates the Graph instance.
Graph graph = builder.build(estimatorInputs, transformerInputs, outputs, inputStates, outputStates);
// The fit method takes 2 tables which are mapped to estimatorInput1 and estimatorInput2.
GraphModel model = graph.fit(...);
// The transform method takes 1 table which is mapped to transformerInput1.
Table[] results = model.transform(...);


Compatibility, Deprecation, and Migration Plan

The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will manually migrate the existing open source projects which use the existing Flink ML API to use the proposed APIs.

...

To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

There is no rejected alternatives to be listed here yet.

...