...
Code Block |
---|
void RemoteRunner::run(StreamApplication app) { StreamGraph streamGraph = createStreamGraph(app); ProcessorGraph processorGraph = executionPlanner.plan(streamGraph); streamsManager.createStreams(processorGraph.getIntermediateStreams()); streamGraph.getProcessorNodes().forEach() { processor -> (new JobRunner).run(processor.generateConfig()); } } |
RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop()
Note: this depends on SAMZA-1089 that adds stop() and status() in JobRunner.
How this works in real deployment
User configs app.application.runner.class to be RemoteRunner.
Deploy the application to RM
Execute run-app.sh and stop-app.sh to start/stop the application. In the scripts, ApplicationRunnerMain.main() is invoked to run RemoteApplicationRunner (based on the config in 1).
2. LocalApplicationRunner
When the LocalApplicationRunner executes the ProcessorGraph, each ProcessorNode is executed by a set of local StreamProcessor(s). For now we only create one StreamProcessor per ProcessorNode, because we only support single-stage applications. LocalApplicationRunner will start and stop the StreamProcessor(s) by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.start() is a blocking call, so it can be used to run in the main thread (see 1 below).
The following pseudocode shows the start of LocalApplicationRunner:
Code Block |
---|
void LocalApplicationRunner::start(StreamApplication app) {
StreamGraph streamGraph = createStreamGraph(app);
ProcessorGraph processorGraph = executionPlanner.plan(streamGraph);
if (processorGraph has intermediate streams) {
startLeaderElection(() -> onBecomeLeader() {
streamsManager.createStreams(processorGraph.getIntemediateStreams());
processLatch.release();
})
processLatch.wait(); // blocked until the latch is released
}
List<StreamProcessor> streamProcessors;
streamGraph.getProcessors().forEach() { processor ->
StreamProcessor streamProcessor = new StreamProcessor(processorId, processor.generateConfig());
streamProcessor.start();
streamProcessors.add(streamProcessor);
}
streamProcessors.forEach(sp -> {
wait for processor complete;
})
} |
LocalRunner.stop() is similar but without the leader election phase and replaces StreamProcessor.start() with stop().
There are two ways of using LocalRunner, and they have different deployment models (Note: I am not exactly clear whether we call executing LocalRunner in Main thread as standalone and LocalRunner in Threadpool as embedded):
2.1 Run LocalRunner in main thread
This means user will run a full-fledged Samza application on their own cluster.
How this works in real deployment
User config Job.application.runner to be LocalRunner.
Deploy the application to all user cluster hosts.
Run run-app.sh on each node to start the whole application. Run stop-app.sh on each node to stop it.
Note the scripts are the same in both Yarn and this deployment model.
2.2 Run LocalRunner in threadpool
In this use case user wants to embed Samza streaming processing inside their application. We need to provide a wrapper (let’s call it LocalRunnerWrapper) to run the LocalRunner in a threadPool. To start Samza processing, user invokes the LocalRunnerWrapper.start(). To stop Samza processing, user invokes the LocalRunnerWrapper.stop(). Note that localRunnerWrapper.start() is nonblocking. This wrapper can be hooked up with the OffSpring lifecycle management.
The following pseudocode shows the wrapper
Code Block |
---|
class LocalRunnerWrapper {
private ApplicationRunner localRunner;
void start() {
executorService.submit(() -> {
localRunner.start();
});
}
void stop() {
localRunner.stop();
}
} |
TODO: figure out how the error propagation works in this lifecycle hook
3. TestApplicationRunner
Compatibility, Deprecation, and Migration Plan
...