Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

 

Proposed Changes

ApplicationRunner is the layer between high fluent API and physical execution. It is responsible for planning execution, materializing the streams and configs, and running the physical execution. There are three main components involved in running an application:

  • ProcessorGraph - the graph for execution. This is different from the StreamGraph which is the logical representation to describe the stream operations. ProcessorGraph describes how StreamGraph will be executed. A ProcessorNode is one stage of the execution, and it will execute sub-graph of the StreamGraph. The ProcessorNode is connected by StreamEdge. Each StreamEdge represents a physical Stream in the system. The input streams are called sources. The output streams are called sinks. The internal streams are called intermediate streams, which are created and managed by Samza.

  • ExecutionPlanner - the planner for execution. The planner generates the ProcessorGraph to execute the StreamGraph. Note: as it is for now, the StreamGraph will be executed in a single stage. So there is only one node in the ProcessorGraph. We haven’t built any smarts in cutting the StreamGraph into multiple stages. Based on the StreamGraph and ProcessorGraph, the planner will figure out the partitions for the intermediate streams. The algo enforces the following logic:

    • All input topics to the join should have the same number of partitions.

    • The rest of the intermediate topic partitions can be configured by job.intermediate.stream.partitions. Without explicit user config, the partition count is decided by the following logic:

      Partition_count = MAX(MAX(input stream partitions), MAX(output stream partitions))

After the calculation of partitions, planner will generate the intermediate streams as planned, and the configs for each ProcessorNode.

  • ApplicationRunner - the runner for the execution. Based on different execution environment, we can have three kinds of runners:

    • RemoteApplicationRunner - RemoteRunner will submit the application for remote execution. It is for clusters like Yarn. RemoteRunner is expected to run at a single entry point (RM in yarn), and exit once the application has been submitted.

    • LocalApplicationRunner - LocalRunner will execute the application locally. In this mode the LocalRunner will run on every node that the application is deployed to. LocalRunner.start() is blocking until user stops the runner or an exception happens.

    • TestApplicationRunner - TestRunner will execute the application in the testing environment. Input can be java collections or files, and there might be no planning and physical intermediate streams needed.

Public Interfaces

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface ApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
	 */
	void run(StreamApplication app);
}

Implementation and Test Plan

...