...
Online learning by using Transformer and Estimator running on different machines
Here is an online learning scenario:
- We have an infinite stream of tagged data that can be used for training.
- We have an algorithm that can be trained using this infinite stream of data. This algorithm (with its latest states/parameters) can be used to do inference. And the accuracy of the algorithm increases with the increasing amount of training data it has seen.
- We would like to train this algorithm using the given data stream on clusterA. And uses this algorithm with the update-to-date states/parameters to do inference on 10 different web servers.
In order to address this use-case, we can write the training and inference logics of this algorithm into an EstimatorA class and a TransformerA class with the following API behaviors:
- EstimatorA::fit takes a table as input and returns an instance of TransformerA. Before this method returns this transformerA, it calls transformerA.setStateStreams(state_table), where the state_table represents the stream of algorithm parameters changes produced by EstimatorA.
- TransformerA::setStateStreams(...) takes a table as input. Its implementation reads the data from this table to continuously update its algorithm parameters.
- TransformerA::getStateStreams(...) returns the same table instance that has been provided via TransformerA::setStateStreams(...).
- TransformerA::transform takes a table as input and returns a table. The returned table represents the inference results.
Here are the code snippets that address this use-case by using the proposed APIs.
First run the following code on clusterA:
Code Block | ||
---|---|---|
| ||
void runTrainingOnClusterA(...) {
// Creates the training stream from a Kafka topic.
Table training_stream = ...;
Estimator estimator = new EstimatorA(...);
Transformer transformer = estimator.fit(training_stream);
Table state_stream = transformer.getStateStreams()[0];
String transformer_json = transformer.toJson();
// Writes the state_stream to a Kafka topicA.
// Writes transformer_json to a remote file.
// Executes the operators generated by the Estimator::fit(...), which reads from training_stream and writes to state_stream.
env.execute()
} |
Then run the following code on each web server:
Code Block | ||
---|---|---|
| ||
void runInferenceOnWebServer(...) {
// Reads the transformer_json from the same remote file written by the above code snippet.
String transformer_json = ...;
// Creates the state stream from Kafka topicA which is written by the above code snippet.
Table state_stream = ...;
// Creates the input stream that needs inference.
Table input_stream = ...;
Transformer transformer = new Transformer(...);
transformer.loadJson(transformer_json);
transformer.setStateStreams(new Table[]{state_stream});
Table output_stream = transformer.transform(input_stream);
// Do something with the output_stream.
// Executes the operators generated by the Transformer::transform(...), which reads from state_stream to update its parameters. It also does inference on input_stream and produces results to the output_stream.
env.execute()
} |
Compatibility, Deprecation, and Migration Plan
...
We will provide unit tests to validate the proposed changes.
Rejected Alternatives
To be addedThere is no rejected alternatives to be listed here yet.