Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E

...

An overview of ApplicationRunner is shown below:

Image Removed Image Added

 ExecutionPlanner - the planner will generate the plan for execution, including all the job configs, intermediate streams to create and a JSON representation of the JobGraph from the user-defined StreamGraph. A JobGraph includes JobNodes and StreamEdges, in which each JobNode represents a physical Samza job to run and each StreamEdge represents an intermediate stream between jobs.

 

  • StreamManager: StreamManager is responsible for creating intermediate streams before deploying the actual job(s).

  • JobRunner - the runner to deploy the jobs in the JobGraph. It will launch a physical job for each JobNode in the JobGraph.

Based on different execution environment, the runner JobRunner behavior is different:

...

    • LocalJobRunner will launch a StreamProcessor corresponding to each JobNode within the local JVM. In this mode the LocalJobRunner will run on every host that the application is deployed to. It is also used to launch jobs in test environment as well.

    • RemoteJobRunner will submit a Samza job for remote execution for each corresponding JobNode. It is for clusters like Yarn.

...

  • LocalApplicationRunner: deploy and launch jobs in local JVM process. This is the public programming API used in user-defined application main() method.

  • RemoteApplicationRunner: submit the Samza job(s) to a remote cluster executor (like YARN). Not a public programming API. Only used and implemented by service providers to deploy Samza jobs to a remote cluster.

  • TestApplicationRunner: uses the same LocalJobRunner to launch jobs locally, with a test runtime environment providing file/collection based input / outputs. Non-production public programming API.

Image RemovedImage Added


LocalApplicationRunner

When the LocalApplicationRunner executes the JobGraph, each JobNode is executed by a LocalJobRunner. LocalJobRunner will start and stop a StreamProcessor by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.run() is a blocking call, so it can be used to run in the main thread (see Table 1 below).

 

Code Block
/**
* Example code to implement window-based counter with a re-partition stage
*/
public class PageViewCounterExample implements StreamApplication {

 @Override public void init(StreamGraph graph, Config config) {
   MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”);
   MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”);

   pageViewEvents.
       partitionBy(m -> m.getMessage().memberId).
       window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
           setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
           setAccumulationMode(AccumulationMode.DISCARDING)).
       map(MyStreamOutput::new).
       sendTo(pageViewPerMemberCounters);

 }

 public static void main(String[] args) throws Exception {
   CommandLine cmdLine = new CommandLine();
   Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
   ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
   localRunner.run(new PageViewCounterExample());
 }
}

 

Table 1: Repartition counter example using new fluent API

...

Table 3: User test code using TestApplicationRunner

 

...

Interaction between RuntimeEnvironment and ApplicationRunners:

LocalApplicationRunner:

The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.

Image Added

RemoteApplicationRunner:

The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.

Image Added

Public Interfaces

LocalApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
}


TestApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param streamId  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);
Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app storeId  the logic identifier for a KV-store
     * @param storeEntries  the user-defined {@link StreamApplication} objectmap contains all KV-store entries
	 */
	<K, V> void run(StreamApplication appaddStore(String storeId, Map<K, V> storeEntries);

}

Non-public API and implementation classes

RemoteApplicationRunnerTestApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 */
@InterfaceStability.Unstable
public interface RemoteApplicationRunner {
	/**
	 * ImplementationsMethod ofto thisbe interfaceinvoked mustto definedeploy aand constructor with a single {@link Config} as run the argumentactual inSamza orderapplication
	 * to@param app support the user-defined {@link ApplicationRunner#fromConfig(Config)StreamApplication} static constructor.object
	 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {	void start(StreamApplication app);

	/**
	 * Method to be invoked to deploy and runstop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void runstop(StreamApplication app);

	/**
	 * Method to be invoked to deployget the andstatus runof the actual Samza application
	 * @param streamIdapp  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);

	/**
	 user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);

}


RuntimeEnvironment

Code Block
/**
 * Interface to be implemented by any environment that supports physical input/output streams and local stores
 *
 */
public interface RuntimeEnvironment {

  /**
   * Method to be invokedget the physical {@link org.apache.samza.system.StreamSpec} to deploydescribe andthe runstream thewith actualID Samza{@code applicationstreamId}
	   */
 @param storeIdStreamSpec  the logic identifier for a KV-store
     * @param storeEntries  the map contains all KV-store entries
	 */
	<K, V> void addStore(String storeId, Map<K, V> storeEntries);

}

Non-public API

RemoteApplicationRunner

Code Block
getStreamSpec(String streamId);

  /**
   * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system}
   */
  SystemFactory getSystemFactory(String system);

  /**
 * Interface to* be implemented by physical execution engine Method to deployget the configphysical and jobs to run the {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.storage.applicationkv.StreamApplicationKeyValueStore} 
   * with the name {@code storeId}
   */
@InterfaceStability.Unstable
public  interfaceStorageEngineFactory getStorageEngineFactory(String storeId);

}

 

TestRuntimeEnvironment (implementation class)

 

Code Block
public class TestRuntimeEnvironment implements RuntimeEnvironment {
  public StreamSpec getStreamSpec(String streamId) {...}
  public SystemFactory getSystemFactory(String system) {...}
  public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
 
  /**
   * Additional methods to add Java collection based input/output streams
   */
  <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
 
  /**
   * Additional methods to add Java collection based KV-store
   */
  <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
 RemoteApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void start(StreamApplication app);

	/**
	 * Method to be invoked to stop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void stop(StreamApplication app);

	/**
	 * Method to be invoked to get the status of the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);

}

Implementation and Test Plan

...

Code Block
LocalApplicationRunner.run() {
  
  StreamGraph streamGraph = createStreamGraph();
  ExecutionPlan plan = executionPlanner.plan(streamGraph);

 If (plan has intermediate streams) {  
    startLeaderElection(() -> onBecomeLeader() {
     streamManager.createStreams();
     releaseLatch();
    })
  
    waitForLatch(); // blocked until the latch is reached
 }
);

 If (plan has intermediate streams) {  
     streamManager.createStreams();
 }
  List<LocalJobRunner> jobs;
  plan.getJobConfigs().forEach() { jobConfig ->
     LocalJobRunner jobRunner = new LocalJobRunner();
     jobRunner.start(jobConfig);
     jobs.add(jobRunner);
  }


  jobs.forEach(job -> {
     wait for job complete;
  })
}

...

  • Configure app.class = <user-app-name> and task.execute = run-local-app.sh

  • Deploy the application to single deployment gateway (RM in YARN).

  • Run run-app.sh on the deployment gateway to start the whole application. Run stop-app.sh on the same node to stop it.

    • RemoteApplicationRunner will automatically configure task.execute = run-local-app.sh for each job to be deployed
  • When the cluster executor actually launches the Samza container processes, it runs run-local-app.sh on the target host, exactly like in the standalone deployment.

...