Status
Current state: [ UNDER DISCUSSION | ACCEPTED | REJECTED ]Partially Accepted. Superseded by SEP-13.
Discussion thread: <link to mailing list DISCUSS thread>http://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E
JIRA: SAMZA-1130
Released:
Problem
With introduction of fluent API (SAMZA-1073) and standalone Samza (SAMZA-1063), we have the following new scenarios:
...
An overview of ApplicationRunner is shown below:
ExecutionPlanner - the planner will generate the plan for execution, including all the job configs, intermediate streams to create and a JSON representation of the JobGraph from the user-defined StreamGraph. A JobGraph includes JobNodes and StreamEdges, in which each JobNode represents a physical Samza job to run and each StreamEdge represents an intermediate stream between jobs.
StreamManager: StreamManager is responsible for creating intermediate streams before deploying the actual job(s).
JobRunner - the runner to deploy the jobs in the JobGraph. It will launch a physical job for each JobNode in the JobGraph.
Based on different execution environment, the runner JobRunner behavior is different:
...
LocalJobRunner will launch a StreamProcessor corresponding to each JobNode within the local JVM. In this mode the LocalJobRunner will run on every host that the application is deployed to. It is also used to launch jobs in test environment as well.
RemoteJobRunner will submit a Samza job for remote execution for each corresponding JobNode. It is for clusters like Yarn.
...
LocalApplicationRunner: deploy and launch jobs in local JVM process. This is the public programming API used in user-defined application main() method.
RemoteApplicationRunner: submit the Samza job(s) to a remote cluster executor (like YARN). Not a public programming API. Only used and implemented by service providers to deploy Samza jobs to a remote cluster.
TestApplicationRunner: uses the same LocalJobRunner to launch jobs locally, with a test runtime environment providing file/collection based input / outputs. Non-production public programming API.
LocalApplicationRunner
When the LocalApplicationRunner executes the JobGraph, each JobNode is executed by a LocalJobRunner. LocalJobRunner will start and stop a StreamProcessor by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.run() is a blocking call, so it can be used to run in the main thread (see Table 1 below).
Code Block |
---|
/** * Example code to implement window-based counter with a re-partition stage */ public class PageViewCounterExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”); MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”); pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); } public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); localRunner.run(new PageViewCounterExample()); } } |
Table 1: Repartition counter example using new fluent API
...
Code Block |
---|
public class TestPageViewCountExample { private PageViewCounterExample userApp = new PageViewCounterExample(); Queue<Entry<String, PageViewCounterExample.PageViewEvent>> inputMessages = new LinkedBlockingQueue<>(); Queue<Entry<String, PageViewCounterExample.PageViewCount>> outgoingMessages = new LinkedBlockingQueue<>(); TestApplicationRunner testRunner; @Before public void setup() { // preparation for config, input, output, and local store Config config = mock(Config.class); testRunner = new TestApplicationRunner(new AppConfig(config)); testRunner.addStream("pageViewEventStream", inputMessages); testRunner.addStream("pageViewEventPerMemberStream", outgoingMessages); inputMessages.offer(new Entry<>("my-page-id", new PageViewCounterExample.PageViewEvent("my-page-id", "my-member-id", 1234556L))); } testRunner.start(userApp);@After } @After public void shutdown() { testRunner.stop(userApp); } @Test public void test() throws InterruptedException { while(true) { Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll(); // run the user app with bounded input stream this.testRunner.run(userApp); // validate the test results Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll(); String wndKey = getWindowKey("my-member-id", 1234556L); assertTrue(countEvent.getKey().equals(wndKey)); assertTrue(countEvent.getValue().count == 1); break; } } private String getWindowKey(String s, long timestamp) { return String.format("%s-%d", s, (timestamp / 10000) * 10000); } } |
Table 3: User test code using TestApplicationRunner
...
Interaction between RuntimeEnvironment and ApplicationRunners:
LocalApplicationRunner:
The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.
RemoteApplicationRunner:
The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.
Public Interfaces
LocalApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
} |
TestApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface TestApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
/**
* Method to be invoked to deploy and run the actual Samza application
* @param streamId the logic identifier for a stream
* @param messages the queue of messages in the stream
*/
<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);
|
Code Block |
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. */ @InterfaceStability.Unstable public interface LocalApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app storeId the logic identifier for a KV-store * @param storeEntries the user-defined {@link StreamApplication} objectmap contains all KV-store entries */ <K, V> void run(StreamApplication appaddStore(String storeId, Map<K, V> storeEntries); } |
Non-public API and implementation classes
RemoteApplicationRunnerTestApplicationRunner
Code Block |
---|
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * */ @InterfaceStability.Unstable public interface RemoteApplicationRunner { /** * ImplementationsMethod ofto thisbe interfaceinvoked mustto definedeploy aand constructor with a single {@link Config} as run the argumentactual inSamza orderapplication * to@param app support the user-defined {@link ApplicationRunner#fromConfig(Config)StreamApplication} static constructor.object */ @InterfaceStability.Unstable public interface TestApplicationRunner { void start(StreamApplication app); /** * Method to be invoked to deploy and runstop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void runstop(StreamApplication app); /** * Method to be invoked to deployget the andstatus runof the actual Samza application * @param streamIdapp the logic identifier for a stream * @param messages the queue of messages in the stream */ <K, V> void addStream(String streamId, Queue<Entry<K, V>> messages); /** user-defined {@link StreamApplication} object */ void status(StreamApplication app); } |
RuntimeEnvironment
Code Block |
---|
/** * Interface to be implemented by any environment that supports physical input/output streams and local stores * */ public interface RuntimeEnvironment { /** * Method to be invokedget the physical {@link org.apache.samza.system.StreamSpec} to deploydescribe andthe runstream thewith actualID Samza{@code applicationstreamId} */ @param storeIdStreamSpec the logic identifier for a KV-store * @param storeEntries the map contains all KV-store entries */ <K, V> void addStore(String storeId, Map<K, V> storeEntries); } |
Non-public API
RemoteApplicationRunner
Code Block |
---|
getStreamSpec(String streamId); /** * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system} */ SystemFactory getSystemFactory(String system); /** * Interface to* be implemented by physical execution engine Method to deployget the configphysical and jobs to run the {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.storage.applicationkv.StreamApplicationKeyValueStore} * with the name {@code storeId} */ @InterfaceStability.Unstable public interfaceStorageEngineFactory getStorageEngineFactory(String storeId); } |
TestRuntimeEnvironment (implementation class)
Code Block |
---|
public class TestRuntimeEnvironment implements RuntimeEnvironment { public StreamSpec getStreamSpec(String streamId) {...} public SystemFactory getSystemFactory(String system) {...} public StorageEngineFactory getStorageEngineFactory(String storeId) {...} /** * Additional methods to add Java collection based input/output streams */ <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...} /** * Additional methods to add Java collection based KV-store */ <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...} RemoteApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void start(StreamApplication app); /** * Method to be invoked to stop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void stop(StreamApplication app); /** * Method to be invoked to get the status of the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void status(StreamApplication app); } |
Implementation and Test Plan
...
Code Block |
---|
LocalApplicationRunner.run() { StreamGraph streamGraph = createStreamGraph(); ExecutionPlan plan = executionPlanner.plan(streamGraph); If (plan has intermediate streams) { startLeaderElection(() -> onBecomeLeader() { streamManager.createStreams(); releaseLatch(); }) waitForLatch(); // blocked until the latch is reached } ); If (plan has intermediate streams) { streamManager.createStreams(); } List<LocalJobRunner> jobs; plan.getJobConfigs().forEach() { jobConfig -> LocalJobRunner jobRunner = new LocalJobRunner(); jobRunner.start(jobConfig); jobs.add(jobRunner); } jobs.forEach(job -> { wait for job complete; }) } |
...
Configure app.class = <user-app-name> and task.execute = run-local-app.sh
Deploy the application to single deployment gateway (RM in YARN).
Run run-app.sh on the deployment gateway to start the whole application. Run stop-app.sh on the same node to stop it.
- RemoteApplicationRunner will automatically configure task.execute = run-local-app.sh for each job to be deployed
When the cluster executor actually launches the Samza container processes, it runs run-local-app.sh on the target host, exactly like in the standalone deployment.
...