Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTestTask
/**
* TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask
*/ 

public class TestTask {
  
  // Static Factory to config & create runner for low level api 
  public static TestTask create(StreamTask task) {...}
  public static TestTask create(AsyncStreamTask task) {...}
  
  // Add/Ovveride any custom configs
  public TestTask addOverrideConfigs(Map<String,String> config) {...}
 
  // Set container mode either single container or multi container
  public TestTask setContainerMode(Mode mode) {...}
  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value) {...}
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask setTaskMaxConcurrency(Integer value) {...}
 
  // Configure state for application
  public TestTask addState(String storeName) {...}  
  
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 
Code Block
languagejava
themeEclipse
titleTestApplication
/**
* TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input  
* streams and then apply various operators on streams and run the application
*/
 
public class TestApplication<T> {  
  // Static factory to config & create runner for High level api 
  public static TestApplication create(Map<String, String> config) {...}
  public static TestApplication create(StreamApplication app) {...}

  // Add/Ovveride any custom configs
  public TestApplication addOverrideConfigs(Map<String,String> config) {...}

  // Set container mode either single container or multi container
  public TestApplication setContainerMode(Mode mode) {...}

  // Multithreading configs for users
  public TestApplication setTaskMaxConcurrency(Integer value) {...}
  public TestApplication setTaskCallBackTimeoutMS(Integer value) {...}
  public TestApplication setTaskMaxConcurrency(Integer value) {...}  
  
  // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...}
  
  // Run the app
  public void run() {...};
}

...

Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
// Initialize and run the test framework
TestApplication
	.create(config);
    .getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
    .map(s -> "processed " + s)
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.input").contains(Arrays.asList("processed 1", "processed 2", "processed 4"));

...