Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD1130

Released: 

Problem

Motivation

 

Proposed Changes

ApplicationRunner is the layer between high fluent API and physical execution. It is responsible for planning execution, materializing the streams and configs, and running the physical execution. There are three main components involved in running an application:

  • ProcessorGraph - the graph for execution. This is different from the StreamGraph which is the logical representation to describe the stream operations. ProcessorGraph describes how StreamGraph will be executed. A ProcessorNode is one stage of the execution, and it will execute sub-graph of the StreamGraph. The ProcessorNode is connected by StreamEdge. Each StreamEdge represents a physical Stream in the system. The input streams are called sources. The output streams are called sinks. The internal streams are called intermediate streams, which are created and managed by Samza.

  • ExecutionPlanner - the planner for execution. The planner generates the ProcessorGraph to execute the StreamGraph. Note: as it is for now, the StreamGraph will be executed in a single stage. So there is only one node in the ProcessorGraph. We haven’t built any smarts in cutting the StreamGraph into multiple stages. Based on the StreamGraph and ProcessorGraph, the planner will figure out the partitions for the intermediate streams. The algo enforces the following logic:

    • All input topics to the join should have the same number of partitions.

    • The rest of the intermediate topic partitions can be configured by job.intermediate.stream.partitions. Without explicit user config, the partition count is decided by the following logic:

      Partition_count = MAX(MAX(input stream partitions), MAX(output stream partitions))

After the calculation of partitions, planner will generate the intermediate streams as planned, and the configs for each ProcessorNode.

  • ApplicationRunner - the runner for the execution. Based on different execution environment, we can have three kinds of runners:

    • RemoteApplicationRunner - RemoteRunner will submit the application for remote execution. It is for clusters like Yarn. RemoteRunner is expected to run at a single entry point (RM in yarn), and exit once the application has been submitted.

    • LocalApplicationRunner - LocalRunner will execute the application locally. In this mode the LocalRunner will run on every node that the application is deployed to. LocalRunner.start() is blocking until user stops the runner or an exception happens.

    • TestApplicationRunner - TestRunner will execute the application in the testing environment. Input can be java collections or files, and there might be no planning and physical intermediate streams needed.

Public Interfaces

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface ApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
	 */
	void run(StreamApplication app);
}

Implementation and Test Plan

1. RemoteApplicationRunner

With introduction of fluent API (SAMZA-1073) and standalone Samza (SAMZA-1063), we have the following new scenarios:

  1. A Samza application may include multiple stages in a DAG, each corresponding to a separate Samza job (SAMZA-1041)
  2. Samza jobs may be user programs that are executed in standalone mode, which introduces yet another way of starting the JVM process for a Samza job (e.g. we already have ThreadJobFactory, ProcessJobFactory, and YarnJobFactory to start a Samza job).

Motivation

We want to have a simplified solution to run the new Samza application in various execution environments. The solution should allow user to run a Samza application written in either fluent API or task-level API, and in local standalone environment, or a cluster-executor (e.g. YARN). In addition, the solution should also support easy local tests by providing an abstract layer to offer file/Java collection based input/output streams and Key-Value stores without the need to change user code.

Proposed Changes

In the proposed change, we introduce a new programming API layer, ApplicationRunner, between the user program and the physical execution engine to achieve the following goals:

  • Hides away the actual Job deployment to various different physical environment from the user
  • Provide a swappable RuntimeEnvironment within ApplicationRunner to allow changing the implementation of input/output and stores without change the user code

Overview of ApplicationRunner

An overview of ApplicationRunner is shown below:

Image Added

 

  • ExecutionPlanner - the planner will generate the plan for execution, including all the job configs, intermediate streams to create and a JSON representation of the JobGraph from the user-defined StreamGraph. A JobGraph includes JobNodes and StreamEdges, in which each JobNode represents a physical Samza job to run and each StreamEdge represents an intermediate stream between jobs.

 

  • StreamManager: StreamManager is responsible for creating intermediate streams before deploying the actual job(s).

  • JobRunner - the runner to deploy the jobs in the JobGraph. It will launch a physical job for each JobNode in the JobGraph.

Based on different execution environment, the runner behavior is different:

 

    • LocalJobRunner will launch a StreamProcessor corresponding to each JobNode within the local JVM. In this mode the LocalJobRunner will run on every host that the application is deployed to. It is also used to launch jobs in test environment as well.

    • RemoteJobRunner will submit a Samza job for remote execution for each corresponding JobNode. It is for clusters like Yarn.

ApplicationRunners in Different Execution Environments

Based on the different JobRunners and runtime environment used to execute JobGraph, we categorize the ApplicationRunner accordingly:

  • LocalApplicationRunner: deploy and launch jobs in local JVM process. This is the public programming API used in user-defined application main() method.

  • RemoteApplicationRunner: submit the Samza job(s) to a remote cluster executor (like YARN). Not a public programming API. Only used and implemented by service providers to deploy Samza jobs to a remote cluster.

  • TestApplicationRunner: uses the same LocalJobRunner to launch jobs locally, with a test runtime environment providing file/collection based input / outputs. Non-production public programming API.

Image Added


LocalApplicationRunner

When the LocalApplicationRunner executes the JobGraph, each JobNode is executed by a LocalJobRunner. LocalJobRunner will start and stop a StreamProcessor by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.run() is a blocking call, so it can be used to run in the main thread (see Table 1 below).

 

Code Block
/**
* Example code to implement window-based counter with a re-partition stage
*/
public class PageViewCounterExample implements StreamApplication {

 @Override public void init(StreamGraph graph, Config config) {
   MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”);
   MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”);

   pageViewEvents.
       partitionBy(m -> m.getMessage().memberId).
       window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
           setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
           setAccumulationMode(AccumulationMode.DISCARDING)).
       map(MyStreamOutput::new).
       sendTo(pageViewPerMemberCounters);

 }

 public static void main(String[] args) throws Exception {
   CommandLine cmdLine = new CommandLine();
   Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
   ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
   localRunner.run(new PageViewCounterExample());
 }
}

 

Table 1: Repartition counter example using new fluent API

RemoteApplicationRunner

When the RemoteApplicationRunner executes the JobGraph, each JobNode is executed as a remote Job (for now we only support single-node JobGraph, so only one remote When the RemoteRunner executes the ProcessorGraph, each ProcessorNode is executed as a physical Job (For now we only support single-node ProcessorGraph, so only one physical job will be created). The Jobs will be submitted through the JobRunner RemoteJobRunner. JobRunner RemoteJobRunner is the runner for a single remote Job. JobRunner RemoteJobRunner is cluster agnostic, meaning it should be able to submit jobs to different types of clusters based on configs (right now we only support Yarn). The following pseudocode shows the start of RemoteRunner:

...

Samza provides a ApplicationRunnerMain class to use RemoteApplicationRunner, as shown below:

Code Block
public static void 

...

main(String[] args) throws Exception {
  

...

ApplicationRunnerCommandLine 

...

cmdLine = new 

...

ApplicationRunnerCommandLine(

...

);
  

...

OptionSet 

...

options = 

...

cmdLine.parser().parse(args);
  Config orgConfig = 

...

cmdLine.

...

loadConfig(options);
  Config config = Util.rewriteConfig(orgConfig);
  ApplicationRunnerOperation op = cmdLine.getOperation(options);
  AppConfig appConfig = new AppConfig(config);

  if (appConfig.getAppClass() != null) {
    RemoteApplicationRunner runner = new RemoteApplicationRunner(config);
    StreamApplication app =
        (

...

StreamApplication) 

...

Class.

...

forName(

...

appConfig.

...

getAppClass()).newInstance();
    switch 

...

(

...

Note: this depends on SAMZA-1089 that adds stop() and status() in JobRunner.

How this works in real deployment

  1. User configs app.application.runner.class to be RemoteRunner.

  2. Deploy the application to RM

  3. Execute run-app.sh and stop-app.sh to start/stop the application. In the scripts, ApplicationRunnerMain.main() is invoked to run RemoteApplicationRunner (based on the config in 1).

2. LocalApplicationRunner

When the LocalApplicationRunner executes the ProcessorGraph, each ProcessorNode is executed by a set of local StreamProcessor(s). For now we only create one StreamProcessor per ProcessorNode, because we only support single-stage applications. LocalApplicationRunner will start and stop the StreamProcessor(s) by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.start() is a blocking call, so it can be used to run in the main thread (see 1 below).

...

op) {
      case RUN:
        runner.start(app);
        break;
      case KILL:
        runner.stop(app);
        break;
      case STATUS:
        System.out.println(runner.status(app));
        break;
      default:
        throw new IllegalArgumentException("Unrecognized operation: " + op);
    }
  } else {
    JobRunner$.MODULE$.main(args);
  }
}

Table 2: Samza ApplicationRunnerMain to use RemoteApplicationRunner starting user-application

TestApplicationRunner

TestApplicationRunner executes JobGraph using LocalJobRunner as well, except that it uses a TestRuntimeEnvironment internally to enable input/output streams using files and Java collections. Note that TestApplicationRunner is blocking similar to LocalApplicationRunner. For bounded input streams (data collections or files), the system consumers will emit end-of-stream message in the end, and the runner will stop afterwards, which allows user to validate processing results. User test code example is the following:

Code Block
public class TestPageViewCountExample {
 private PageViewCounterExample userApp = new PageViewCounterExample();

 Queue<Entry<String, PageViewCounterExample.PageViewEvent>> inputMessages = new LinkedBlockingQueue<>();
 Queue<Entry<String, PageViewCounterExample.PageViewCount>> outgoingMessages = new LinkedBlockingQueue<>();

 TestApplicationRunner testRunner;

 @Before
 public void setup() {
   // preparation for config, input, output, and local store
   Config config = mock(Config.class);
   testRunner = new TestApplicationRunner(new AppConfig(config));
   testRunner.addStream("pageViewEventStream", inputMessages);
   testRunner.addStream("pageViewEventPerMemberStream", outgoingMessages);

   inputMessages.offer(new Entry<>("my-page-id", new PageViewCounterExample.PageViewEvent("my-page-id", "my-member-id", 1234556L)));

   testRunner.start(userApp);
 }

 @After
 public void shutdown() {
   testRunner.stop(userApp);
 }

 @Test
 public void test() throws InterruptedException {
   while(true) {
     Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll();
     // validate the test results
     String wndKey = getWindowKey("my-member-id", 1234556L);
     assertTrue(countEvent.getKey().equals(wndKey));
     

...

assertTrue(countEvent.

...

getValue().count == 1);
     

...

break;
   }
 }

 private String 

...

getWindowKey(String s, long timestamp) {
   return 

...

LocalRunner.stop() is similar but without the leader election phase and replaces StreamProcessor.start() with stop().

There are two ways of using LocalRunner, and they have different deployment models (Note: I am not exactly clear whether we call executing LocalRunner in Main thread as standalone and LocalRunner in Threadpool as embedded):

2.1 Run LocalRunner in main thread

This means user will run a full-fledged Samza application on their own cluster.

How this works in real deployment

  • User config Job.application.runner to be LocalRunner.

  • Deploy the application to all user cluster hosts.

  • Run run-app.sh on each node to start the whole application. Run stop-app.sh on each node to stop it.

Note the scripts are the same in both Yarn and this deployment model.

2.2 Run LocalRunner in threadpool

In this use case user wants to embed Samza streaming processing inside their application. We need to provide a wrapper (let’s call it LocalRunnerWrapper) to run the LocalRunner in a threadPool. To start Samza processing, user invokes the LocalRunnerWrapper.start(). To stop Samza processing, user invokes the LocalRunnerWrapper.stop(). Note that localRunnerWrapper.start() is nonblocking. This wrapper can be hooked up with the OffSpring lifecycle management.

The following pseudocode shows the wrapper

Code Block
class LocalRunnerWrapper {
   private ApplicationRunner localRunner;

   void start() {
      executorService.submit(() -> {
         localRunner.start();
      }); 
   }
   void stop() {
    localRunner.stop();
  }
}

...

String.format("%s-%d", s, (timestamp / 10000) * 10000);
 }
}

Table 3: User test code using TestApplicationRunner

 

Public Interfaces

LocalApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
}


TestApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param streamId  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);

	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param storeId  the logic identifier for a KV-store
     * @param storeEntries  the map contains all KV-store entries
	 */
	<K, V> void addStore(String storeId, Map<K, V> storeEntries);

}

Non-public API

RemoteApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 */
@InterfaceStability.Unstable
public interface RemoteApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void start(StreamApplication app);

	/**
	 * Method to be invoked to stop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void stop(StreamApplication app);

	/**
	 * Method to be invoked to get the status of the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);

}

Implementation and Test Plan

The following pseudocode shows the run method of LocalApplicationRunner:

 

 

LocalApplicationRunner.run() {

 

 StreamGraph streamGraph = createStreamGraph();

 ExecutionPlan plan = executionPlanner.plan(streamGraph);


If (plan has intermediate streams) {  

   startLeaderElection(() -> onBecomeLeader() {

    streamManager.createStreams();

    releaseLatch();

   })

 

   waitForLatch(); // blocked until the latch is reached

}


 List<LocalJobRunner> jobs;

 plan.getJobConfigs().forEach() { jobConfig ->

    LocalJobRunner jobRunner = new LocalJobRunner();

    jobRunner.start(jobConfig);

    jobs.add(jobRunner);

 }

 

 jobs.forEach(job -> {

    wait for job complete;

 })

}

 

 

To provide a unified method to run a user program on a physical host, user main() method will be the entrance point to start an actual JVM process on a target host to run the Samza processor.

 

How this works in standalone deployment

  • Configure app.class = <user-app-name>

  • Deploy the application to standalone hosts.

  • Run run-local-app.sh on each node to start/stop the local application.

 

The following pseudocode shows the start of RemoteApplicationRunner:

 

 

RemoteApplicationRunner.start() {

 StreamGraph streamGraph = createStreamGraph();

 ExecutionPlan plan = executionPlanner.plan(streamGraph);


 if (plan has intermediate streams) {  

    streamManager.createStreams();

 }


 plan.getJobConfigs().forEach() { jobConfig ->

    RemoteJobRunner jobRunner = new RemoteJobRunner();

    jobRunner.start(jobConfig);

 }

}

 

 

RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop()

 

How this works in YARN deployment

  • Configure app.class = <user-app-name> and task.execute = run-local-app.sh

  • Deploy the application to single deployment gateway (RM in YARN).

  • Run run-app.sh on the deployment gateway to start the whole application. Run stop-app.sh on the same node to stop it.

  • When the cluster executor actually launches the Samza container processes, it runs run-local-app.sh on the target host, exactly like in the standalone deployment.

 

Note that the last step to start the JVM process in YARN is exactly the same as the last step in the standalone environment. Hence, it provides the exact same local runtime environment between YARN and standalone environments.

...

3. TestApplicationRunner

...


Compatibility, Deprecation, and Migration Plan

...