Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.

2) Express an Estimator/Transformer pair whose input schemas are different, and still be able to compose an Estimator from a linear chain (and additionally DAG) of Estimator/Transformer.

For example, the Word2Vec algorithm uses a list of words (i.e. sentence) as the input schema for training. And the fitted algorithm should be able to use a word (instead of a list of words) as the input schema. 

Also, the GraphEmbedding algorithm is expected to take 2 tables as input for training, and take 1 table as input for inference. The input schemas are also between the corresponding Estimator and Transformer.

The existing Pipeline API can not be used to compose such an Estimator with other Transformer/Estimator because it is expected to use the same inputs for the same stage in the Pipeline.


3) Compose a directed-acyclic-graph Estimator/Transformer into an Estimator/Transformer.

For example, the workflow may involve the join of 2 tables, where each table could be generated by a chain of Estimator/Transformer. The entire workflow is therefore a DAG of Estimator/Transformer.

34) Online learning where a long-running instance Transformer needs to be updated by the latest model data generated by another long-running instance of Estimator.

...

In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:
4
5) The Model interface does not provide any added value (given that we already have Transformer). The added class hierarchy complexity is not justified.

56) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.

67) A Pipeline is both a Transformer and an Estimator. The experience of using Pipeline is therefore different from the experience of using Estimator (with the needFit API).

78) There is no API provided by the Estimator/Transformer interface to validate the schema consistency of a Pipeline. Users would have to instantiate Tables (with I/O logics) and run fit/transform to know whether the stages in the Pipeline are compatible with each other.

...

This change addresses the use-cases described in the motivation section, where we need to compose an Estimator from a DAG of Estimator/Transformer. Note that the Graph/GraphBuilder supports Estimator class whose input schemas are different from its fitted Transformer.

Interfaces and classes after the proposed API changes

...

Code Block
languagejava
void runInferenceOnWebServer(...) {

  // Reads the transformer_json from the same remote file written by the above code snippet.
  String transformer_json = ...;
  // Creates the state stream from Kafka topicA which is written by the above code snippet. 
  Table state_stream = ...;
  // Creates the input stream that needs inference.
  Table input_stream = ...;

  Transformer transformer = new Transformer(...);
  transformer.loadJson(transformer_json);
  transformer.setStateStreams(new Table[]{state_stream});
  Table output_stream = transformer.transform(input_stream);

  // Do something with the output_stream.

  // Executes the operators generated by the Transformer::transform(...), which reads from state_stream to update its parameters. It also does inference on input_stream and produces results to the output_stream.
  env.execute()
}

Compose an Estimator whose input schemas are different from its fitted Transformer 

To be added.


Compatibility, Deprecation, and Migration Plan

...