Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
void RemoteRunner::run(StreamApplication app) {
  StreamGraph streamGraph = createStreamGraph(app);
  ProcessorGraph processorGraph = executionPlanner.plan(streamGraph);
  streamsManager.createStreams(processorGraph.getIntermediateStreams());
  streamGraph.getProcessorNodes().forEach() { processor ->
     (new JobRunner).run(processor.generateConfig());
  }
}

RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop()

Note: this depends on SAMZA-1089 that adds stop() and status() in JobRunner.


How this works in real deployment

  1. User configs app.application.runner.class to be RemoteRunner.

  2. Deploy the application to RM

  3. Execute run-app.sh and stop-app.sh to start/stop the application. In the scripts, ApplicationRunnerMain.main() is invoked to run RemoteApplicationRunner (based on the config in 1).

2. LocalApplicationRunner

When the LocalApplicationRunner executes the ProcessorGraph, each ProcessorNode is executed by a set of local StreamProcessor(s). For now we only create one StreamProcessor per ProcessorNode, because we only support single-stage applications. LocalApplicationRunner will start and stop the StreamProcessor(s) by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.start() is a blocking call, so it can be used to run in the main thread (see 1 below).


The following pseudocode shows the start of LocalApplicationRunner:

Code Block
void LocalApplicationRunner::start(StreamApplication app) {
  StreamGraph streamGraph = createStreamGraph(app);
  ProcessorGraph processorGraph = executionPlanner.plan(streamGraph);
  if (processorGraph has intermediate streams) {  
    startLeaderElection(() -> onBecomeLeader() {
     streamsManager.createStreams(processorGraph.getIntemediateStreams());
     processLatch.release();
    })
  
    processLatch.wait(); // blocked until the latch is released
  }

  List<StreamProcessor> streamProcessors;
  streamGraph.getProcessors().forEach() { processor ->
     StreamProcessor streamProcessor = new StreamProcessor(processorId, processor.generateConfig());
     streamProcessor.start();
     streamProcessors.add(streamProcessor);
  }
 
  streamProcessors.forEach(sp -> {
     wait for processor complete;
  })
}

LocalRunner.stop() is similar but without the leader election phase and replaces StreamProcessor.start() with stop().


There are two ways of using LocalRunner, and they have different deployment models (Note: I am not exactly clear whether we call executing LocalRunner in Main thread as standalone and LocalRunner in Threadpool as embedded):

2.1 Run LocalRunner in main thread

This means user will run a full-fledged Samza application on their own cluster.

How this works in real deployment

  • User config Job.application.runner to be LocalRunner.

  • Deploy the application to all user cluster hosts.

  • Run run-app.sh on each node to start the whole application. Run stop-app.sh on each node to stop it.

Note the scripts are the same in both Yarn and this deployment model.

2.2 Run LocalRunner in threadpool

In this use case user wants to embed Samza streaming processing inside their application. We need to provide a wrapper (let’s call it LocalRunnerWrapper) to run the LocalRunner in a threadPool. To start Samza processing, user invokes the LocalRunnerWrapper.start(). To stop Samza processing, user invokes the LocalRunnerWrapper.stop(). Note that localRunnerWrapper.start() is nonblocking. This wrapper can be hooked up with the OffSpring lifecycle management.


The following pseudocode shows the wrapper

Code Block
class LocalRunnerWrapper {
   private ApplicationRunner localRunner;

   void start() {
      executorService.submit(() -> {
         localRunner.start();
      }); 
   }
   void stop() {
    localRunner.stop();
  }
}

TODO
: figure out how the error propagation works in this lifecycle hook

3. TestApplicationRunner

 

Compatibility, Deprecation, and Migration Plan

...