Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table 3: User test code using TestApplicationRunner

 

Public Interfaces

LocalApplicationRunner

Interaction between RuntimeEnvironment and ApplicationRunners:

LocalApplicationRunner:

The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.

Image Added

RemoteApplicationRunner:

The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.

Image Added

Public Interfaces

LocalApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
}


TestApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param streamId  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);
Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param appstoreId  the user-defined {@link StreamApplication} object logic identifier for a KV-store
     * @param storeEntries  the map contains all KV-store entries
	 */
		<K, V> void run(StreamApplication appaddStore(String storeId, Map<K, V> storeEntries);

}

Non-public API and implementation classes

RemoteApplicationRunnerTestApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {/
@InterfaceStability.Unstable
public interface RemoteApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void start(StreamApplication app);

	/**
	 * Method to be invoked to deploy and runstop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void runstop(StreamApplication app);

	/**
	 * Method to be invoked to get deploythe andstatus runof the actual Samza application
	 * @param streamIdapp  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);

	/**
	user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);

}


RuntimeEnvironment

Code Block
/**
 * Interface to be implemented by any environment that supports physical input/output streams and local stores
 *
 */
public interface RuntimeEnvironment {

  /**
   * Method to be invoked to deploy and run the actual Samza application
	 * @param storeId  the logic identifier for a KV-store
     * @param storeEntries  the map contains all KV-store entries
	 */
	<K, V> void addStore(String storeId, Map<K, V> storeEntries);

}

Non-public API

RemoteApplicationRunner

Code Block
/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId}
   */
  StreamSpec getStreamSpec(String streamId);

  /**
   * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system}
   */
  SystemFactory getSystemFactory(String system);

  /**
   * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.applicationstorage.kv.StreamApplicationKeyValueStore} 
   * with the name {@code storeId}
   */
@InterfaceStability.Unstable
public interface RemoteApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void start(StreamApplication app);

	/**
	 * Method to be invoked to stop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void stop(StreamApplication app);

	/**
	 * Method to be invoked to get the status of the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);
  StorageEngineFactory getStorageEngineFactory(String storeId);

}

 

TestRuntimeEnvironment (implementation class)

 

Code Block
public class TestRuntimeEnvironment implements RuntimeEnvironment {
  public StreamSpec getStreamSpec(String streamId) {...}
  public SystemFactory getSystemFactory(String system) {...}
  public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
 
  /**
   * Additional methods to add Java collection based input/output streams
   */
  <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
 
  /**
   * Additional methods to add Java collection based KV-store
   */
  <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
 
}

Implementation and Test Plan

...