You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: [ UNDER DISCUSSION | ACCEPTED | REJECTED ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD

Released: 

Problem


Motivation

 

Proposed Changes

ApplicationRunner is the layer between high fluent API and physical execution. It is responsible for planning execution, materializing the streams and configs, and running the physical execution. There are three main components involved in running an application:

  • ProcessorGraph - the graph for execution. This is different from the StreamGraph which is the logical representation to describe the stream operations. ProcessorGraph describes how StreamGraph will be executed. A ProcessorNode is one stage of the execution, and it will execute sub-graph of the StreamGraph. The ProcessorNode is connected by StreamEdge. Each StreamEdge represents a physical Stream in the system. The input streams are called sources. The output streams are called sinks. The internal streams are called intermediate streams, which are created and managed by Samza.

  • ExecutionPlanner - the planner for execution. The planner generates the ProcessorGraph to execute the StreamGraph. Note: as it is for now, the StreamGraph will be executed in a single stage. So there is only one node in the ProcessorGraph. We haven’t built any smarts in cutting the StreamGraph into multiple stages. Based on the StreamGraph and ProcessorGraph, the planner will figure out the partitions for the intermediate streams. The algo enforces the following logic:

    • All input topics to the join should have the same number of partitions.

    • The rest of the intermediate topic partitions can be configured by job.intermediate.stream.partitions. Without explicit user config, the partition count is decided by the following logic:

      Partition_count = MAX(MAX(input stream partitions), MAX(output stream partitions))

After the calculation of partitions, planner will generate the intermediate streams as planned, and the configs for each ProcessorNode.

  • ApplicationRunner - the runner for the execution. Based on different execution environment, we can have three kinds of runners:

    • RemoteApplicationRunner - RemoteRunner will submit the application for remote execution. It is for clusters like Yarn. RemoteRunner is expected to run at a single entry point (RM in yarn), and exit once the application has been submitted.

    • LocalApplicationRunner - LocalRunner will execute the application locally. In this mode the LocalRunner will run on every node that the application is deployed to. LocalRunner.start() is blocking until user stops the runner or an exception happens.

    • TestApplicationRunner - TestRunner will execute the application in the testing environment. Input can be java collections or files, and there might be no planning and physical intermediate streams needed.

Public Interfaces

/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface ApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
	 */
	void run(StreamApplication app);
}

Implementation and Test Plan

1. RemoteApplicationRunner

When the RemoteRunner executes the ProcessorGraph, each ProcessorNode is executed as a physical Job (For now we only support single-node ProcessorGraph, so only one physical job will be created). The Jobs will be submitted through the JobRunner. JobRunner is the runner for a single remote Job. JobRunner is cluster agnostic, meaning it should be able to submit jobs to different types of clusters based on configs (right now we only support Yarn). The following pseudocode shows the start of RemoteRunner:

 

void RemoteRunner::run(StreamApplication app) {
  StreamGraph streamGraph = createStreamGraph(app);
  ProcessorGraph processorGraph = executionPlanner.plan(streamGraph);
  streamsManager.createStreams(processorGraph.getIntermediateStreams());
  streamGraph.getProcessorNodes().forEach() { processor ->
     (new JobRunner).run(processor.generateConfig());
  }
}

RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop()

Note: this depends on SAMZA-1089 that adds stop() and status() in JobRunner.


How this works in real deployment

  1. User configs app.application.runner.class to be RemoteRunner.

  2. Deploy the application to RM

  3. Execute run-app.sh and stop-app.sh to start/stop the application. In the scripts, ApplicationRunnerMain.main() is invoked to run RemoteApplicationRunner (based on the config in 1).

2. LocalApplicationRunner

 

Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives

 

  • No labels