Status
Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E
...
An overview of ApplicationRunner is shown below:
ExecutionPlanner - the planner will generate the plan for execution, including all the job configs, intermediate streams to create and a JSON representation of the JobGraph from the user-defined StreamGraph. A JobGraph includes JobNodes and StreamEdges, in which each JobNode represents a physical Samza job to run and each StreamEdge represents an intermediate stream between jobs.
StreamManager: StreamManager is responsible for creating intermediate streams before deploying the actual job(s).
JobRunner - the runner to deploy the jobs in the JobGraph. It will launch a physical job for each JobNode in the JobGraph.
...
LocalApplicationRunner: deploy and launch jobs in local JVM process. This is the public programming API used in user-defined application main() method.
RemoteApplicationRunner: submit the Samza job(s) to a remote cluster executor (like YARN). Not a public programming API. Only used and implemented by service providers to deploy Samza jobs to a remote cluster.
TestApplicationRunner: uses the same LocalJobRunner to launch jobs locally, with a test runtime environment providing file/collection based input / outputs. Non-production public programming API.
LocalApplicationRunner
When the LocalApplicationRunner executes the JobGraph, each JobNode is executed by a LocalJobRunner. LocalJobRunner will start and stop a StreamProcessor by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.run() is a blocking call, so it can be used to run in the main thread (see Table 1 below).
Code Block |
---|
/** * Example code to implement window-based counter with a re-partition stage */ public class PageViewCounterExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”); MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”); pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); } public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); localRunner.run(new PageViewCounterExample()); } } |
Table 1: Repartition counter example using new fluent API
...
Table 3: User test code using TestApplicationRunner
...
Interaction between RuntimeEnvironment and ApplicationRunners:
LocalApplicationRunner:
The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.
RemoteApplicationRunner:
The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.
Public Interfaces
LocalApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
} |
TestApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface TestApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
/**
* Method to be invoked to deploy and run the actual Samza application
* @param streamId the logic identifier for a stream
* @param messages the queue of messages in the stream
*/
<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);
|
Code Block |
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. */ @InterfaceStability.Unstable public interface LocalApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param appstoreId the user-defined {@link StreamApplication} object logic identifier for a KV-store * @param storeEntries the map contains all KV-store entries */ <K, V> void run(StreamApplication appaddStore(String storeId, Map<K, V> storeEntries); } |
Non-public API and implementation classes
RemoteApplicationRunnerTestApplicationRunner
Code Block |
---|
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. */ @InterfaceStability.Unstable public interface TestApplicationRunner {/ @InterfaceStability.Unstable public interface RemoteApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void start(StreamApplication app); /** * Method to be invoked to deploy and runstop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void runstop(StreamApplication app); /** * Method to be invoked to get deploythe andstatus runof the actual Samza application * @param streamIdapp the logic identifier for a stream * @param messages the queue of messages in the stream */ <K, V> void addStream(String streamId, Queue<Entry<K, V>> messages); /** user-defined {@link StreamApplication} object */ void status(StreamApplication app); } |
RuntimeEnvironment
Code Block |
---|
/** * Interface to be implemented by any environment that supports physical input/output streams and local stores * */ public interface RuntimeEnvironment { /** * Method to be invoked to deploy and run the actual Samza application * @param storeId the logic identifier for a KV-store * @param storeEntries the map contains all KV-store entries */ <K, V> void addStore(String storeId, Map<K, V> storeEntries); } |
Non-public API
RemoteApplicationRunner
Code Block |
---|
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId} */ StreamSpec getStreamSpec(String streamId); /** * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system} */ SystemFactory getSystemFactory(String system); /** * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.applicationstorage.kv.StreamApplicationKeyValueStore} * with the name {@code storeId} */ @InterfaceStability.Unstable public interface RemoteApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void start(StreamApplication app); /** * Method to be invoked to stop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void stop(StreamApplication app); /** * Method to be invoked to get the status of the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void status(StreamApplication app); StorageEngineFactory getStorageEngineFactory(String storeId); } |
TestRuntimeEnvironment (implementation class)
Code Block |
---|
public class TestRuntimeEnvironment implements RuntimeEnvironment {
public StreamSpec getStreamSpec(String streamId) {...}
public SystemFactory getSystemFactory(String system) {...}
public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
/**
* Additional methods to add Java collection based input/output streams
*/
<K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
/**
* Additional methods to add Java collection based KV-store
*/
<K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
} |
Implementation and Test Plan
...