Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. System depends on In Memory system to spin up in-memory system consumers and producers for in memory data streams

  2. Single Container mode runs on SingleContainerGrouperFactory, that means these test are supposed to be ran on a single container environment

  3. Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service for setting up test

  4. In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is also provided

  5. Testing is always supposed to be done using bounded streams using EndOfStreamMessage

  6. System should directly consume from File stream or Local Kafka stream without any additional configs, File Stream implementation is out of scope for this SEP, although this SEP suggests what the design for file stream should look like

  7. Bounded message streams from Kafka (either using Latch or EndOfStreamMessage) is beyond the scope of this SEP

  8. For Stateful testing system may use RocksDb as it ships with Samza or maintain the state in memory

Design

We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation.

...

For the low level api once user runs the job, users can assert data from any streams the job uses. Whereas Samza fluent api does job chaining hence only the final expected output can be compared in this case. TaskAssert spins up a consumer for the system under test, gets the messages in the stream and compares this result with the expected value. Various assertion functions provided are:
  • Task Assert: assert utilities on low level api
  • Stream Assert: assert utilities on low level api  

Data Types & Partitions:

 

StreamAssertTaskAssertStateAssert
containscontainscontains
containsInAnyOrdercontainsInAnyOrdersatisfies
inWindowisOnTimePane 
inFinalPanesatisfiers 
isOnTimePane  
satifies  

 

Data Types & Partitions:

The framework will provide complete flexibility for The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams 

...

Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides functions to configure concurrency semantics for their Samza job. 

 


Stateful & Stateless Testing:

Test framework will support stateful and stateless testing, satateful testing can be either 

Future Changes with Stream Descriptors:

...

Code Block
languagejava
themeEclipse
titleTestTask
/**
* TestTask provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask
*/
 
public class TestTask {
  private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...}
  private TestTask(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...} TestTask {
  
  // Static Factory to config & create runner for low level api 
  public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}
  public static TestTask create(String systemName, AsyncStreamTask task, HashMap<String, String> config) {...}
  
  // Multithreading configs for users
  public TestTask setTaskMaxConcurrency(Integer value) {...}
  public TestTask setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTask setTaskMaxConcurrency(Integer value) {...}
 
  // Configure an input stream for samza system, that task can consume from
  public TestTask addInputStream(CollectionStream stream) {...}
  
  // Configures an output stream for samza system, that task can producer to
  public TestTask addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 

...

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of 
* multiple partitions for an input stream 
*/
 
public class CollectionStream<T> {
  private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
  private CollectionStream(String systemStream) {...}
   
  // Create an empty stream that a Samza task can produce to
  public static <T> CollectionStream<T> empty(String systemStream) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String systemStream, List<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition
  public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){...}
  
  // Create a stream of messages from input list with multiple partition
  public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}

...