Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Online learning by using Transformer and Estimator running on different machines

Here is an online learning scenario:

  • We have an infinite stream of tagged data that can be used for training.
  • We have an algorithm that can be trained using this infinite stream of data. This algorithm (with its latest states/parameters) can be used to do inference. And the accuracy of the algorithm increases with the increasing amount of training data it has seen.
  • We would like to train this algorithm using the given data stream on clusterA. And uses this algorithm with the update-to-date states/parameters to do inference on 10 different web servers.

In order to address this use-case, we can write the training and inference logics of this algorithm into an EstimatorA class and a TransformerA class with the following API behaviors:

  • EstimatorA::fit takes a table as input and returns an instance of TransformerA. Before this method returns this transformerA, it calls transformerA.setStateStreams(state_table), where the state_table represents the stream of algorithm parameters changes produced by EstimatorA.
  • TransformerA::setStateStreams(...) takes a table as input. Its implementation reads the data from this table to continuously update its algorithm parameters.
  • TransformerA::getStateStreams(...) returns the same table instance that has been provided via TransformerA::setStateStreams(...).
  • TransformerA::transform takes a table as input and returns a table. The returned table represents the inference results.


Here are the code snippets that address this use-case by using the proposed APIs.

First run the following code on clusterA:

Code Block
languagejava
void runTrainingOnClusterA(...) {
  // Creates the training stream from a Kafka topic.
  Table training_stream = ...;

  Estimator estimator = new EstimatorA(...);
  Transformer transformer = estimator.fit(training_stream);
  Table state_stream = transformer.getStateStreams()[0];
  String transformer_json = transformer.toJson();

  // Writes the state_stream to a Kafka topicA.
  // Writes transformer_json to a remote file.

  // Executes the operators generated by the Estimator::fit(...), which reads from training_stream and writes to state_stream.
  env.execute()
}


Then run the following code on each web server:

Code Block
languagejava
void runInferenceOnWebServer(...) {

  // Reads the transformer_json from the same remote file written by the above code snippet.
  String transformer_json = ...;
  // Creates the state stream from Kafka topicA which is written by the above code snippet. 
  Table state_stream = ...;
  // Creates the input stream that needs inference.
  Table input_stream = ...;

  Transformer transformer = new Transformer(...);
  transformer.loadJson(transformer_json);
  transformer.setStateStreams(new Table[]{state_stream});
  Table output_stream = transformer.transform(input_stream);

  // Do something with the output_stream.

  // Executes the operators generated by the Transformer::transform(...), which reads from state_stream to update its parameters. It also does inference on input_stream and produces results to the output_stream.
  env.execute()
}


Compatibility, Deprecation, and Migration Plan

...

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

To be addedThere is no rejected alternatives to be listed here yet.