...
Table 3: User test code using TestApplicationRunner
Public Interfaces
LocalApplicationRunner
Interaction between RuntimeEnvironment and ApplicationRunners:
LocalApplicationRunner:
The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.
RemoteApplicationRunner:
The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.
Public Interfaces
LocalApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
} |
TestApplicationRunner
Code Block |
---|
/**
* Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
*
* Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
* to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
*/
@InterfaceStability.Unstable
public interface TestApplicationRunner {
/**
* Method to be invoked to deploy and run the actual Samza application
* @param app the user-defined {@link StreamApplication} object
*/
void run(StreamApplication app);
/**
* Method to be invoked to deploy and run the actual Samza application
* @param streamId the logic identifier for a stream
* @param messages the queue of messages in the stream
*/
<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);
|
Code Block |
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. */ @InterfaceStability.Unstable public interface LocalApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param appstoreId the user-defined {@link StreamApplication} object logic identifier for a KV-store * @param storeEntries the map contains all KV-store entries */ <K, V> void run(StreamApplication appaddStore(String storeId, Map<K, V> storeEntries); } |
Non-public API and implementation classes
RemoteApplicationRunnerTestApplicationRunner
Code Block |
---|
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication} * * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor. */ @InterfaceStability.Unstable public interface TestApplicationRunner {/ @InterfaceStability.Unstable public interface RemoteApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void start(StreamApplication app); /** * Method to be invoked to deploy and runstop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void runstop(StreamApplication app); /** * Method to be invoked to get deploythe andstatus runof the actual Samza application * @param streamIdapp the logic identifier for a stream * @param messages the queue of messages in the stream */ <K, V> void addStream(String streamId, Queue<Entry<K, V>> messages); /** user-defined {@link StreamApplication} object */ void status(StreamApplication app); } |
RuntimeEnvironment
Code Block |
---|
/** * Interface to be implemented by any environment that supports physical input/output streams and local stores * */ public interface RuntimeEnvironment { /** * Method to be invoked to deploy and run the actual Samza application * @param storeId the logic identifier for a KV-store * @param storeEntries the map contains all KV-store entries */ <K, V> void addStore(String storeId, Map<K, V> storeEntries); } |
Non-public API
RemoteApplicationRunner
Code Block |
---|
/** * Interface to be implemented by physical execution engine to deploy the config and jobs to run the get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId} */ StreamSpec getStreamSpec(String streamId); /** * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system} */ SystemFactory getSystemFactory(String system); /** * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.applicationstorage.kv.StreamApplicationKeyValueStore} * with the name {@code storeId} */ @InterfaceStability.Unstable public interface RemoteApplicationRunner { /** * Method to be invoked to deploy and run the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void start(StreamApplication app); /** * Method to be invoked to stop the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void stop(StreamApplication app); /** * Method to be invoked to get the status of the actual Samza application * @param app the user-defined {@link StreamApplication} object */ void status(StreamApplication app); StorageEngineFactory getStorageEngineFactory(String storeId); } |
TestRuntimeEnvironment (implementation class)
Code Block |
---|
public class TestRuntimeEnvironment implements RuntimeEnvironment {
public StreamSpec getStreamSpec(String streamId) {...}
public SystemFactory getSystemFactory(String system) {...}
public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
/**
* Additional methods to add Java collection based input/output streams
*/
<K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
/**
* Additional methods to add Java collection based KV-store
*/
<K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
} |
Implementation and Test Plan
...