Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is the Samza job you write, this can be either done using Low Level Api or the fluent High Level Api. Test frameworks provides api to set up test for both the cases. Test framework supports both the apis with Single container and Multi-container mode. Users implement StreamTask and /Async Stream task in /Stream Application in the same way, as they do for their Samza job . For high level api users don't need a class implementing StreamApplication, they just configure message streams of any type and apply operators on it directly (see the sample example below(or pass an instance of Samza job).

Data Validation: 

For the low level api & high level api once user runs the job, users can assert data from any streams the job uses. Whereas Samza fluent api does job chaining hence only the final expected output can be compared in this casejob produces to. StreamAssert spins up a consumer for the system under test, gets the messages in the stream and compares this result with the expected value. Various assertion functions provided are:

...

The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams 

...

The test framework is designed in a way which asks users to do none or minimal Samza configs, in future we intend to use StreamDescriptors in the test framework to do Samza configs. This change would cause a small change in the way user passes their custom configs (if any) to the test framework With StreamDescriptor and High Level api, another wrapper can be provided over the job to run the configuration in a test environment

Test Matrix (Plan)


Samza APIConcurrencyPartitionsContainerExpected Result
StreamTask
task.max.concurrency = 1
job.container.thread.pool.size = n
1 <= p <= n1 / nin order processing
   
task.max.concurrency > 1
job.container.thread.pool.size = n
1 <= p <= n1 / nout of order processing
   
AsyncStreamTask
task.max.concurrency = 11 <= p <= n1 / nin order processing
task.max.concurrency = n1 <= p <= n1 / nout of order processing
Windowable TaskN/A1 <= p <= n1 / nexpecting processing n messages for messages window of time t
Initiable TaskN/A1 <= p <= n1 / nstateful testing (assertions on kv store)
Closable TaskN/A1 <= p <= n1 / n?

Map / Flatmap / Filter /

Partition By / Merge /

SendTo / Sink / Join /

Window

 

task.max.concurrency = 1

job.container.thread.pool.size = n

1 <= p < n 1 / nin order processing

task.max.concurrency > 1

job.container.thread.pool.size = n

out of order processing

...

Code Block
languagejava
themeEclipse
titleTestRunner
/**
* TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask
*/ 

public class TestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestRunner of(StreamTask task) {...}
  public static TestRunner of(AsyncStreamTask task) {...}
  public static TestRunner of(StreamApplication app) {...}
  
  // Add/Ovveride any custom configs
  public TestRunner addOverrideConfigs(Map<String,String> configs) {...}
  public TestRunner addOverrideConfigs(String configUri) {...}
  public TestRunner addConfigaddOverrideConfig(String key, String val){...} 
  // Set container mode either single container or multi container
  public TestRunner setContainerMode(Mode mode) {...}
  
  // Multithreading configs for users
  public TestRunner setTaskMaxConcurrency(Integer value) {...}
  public TestRunner setTaskCallBackTimeoutMS(Integer value) {...}
  public TestRunner setJobContainerThreadPoolSize(Integer value) {...}
 
  // Configure state for application
  public TestRunner addState(String storeName) {...}  
  
  // Configure an input stream for samza system, that task can consume from
  public TestRunner addInputStream(CollectionStream stream) {...}
  // Configures an output stream for samza system, that task can producer to
  public TestRunner addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 

...

Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
public class MyStreamApplication implements StreamApplication {
  @Override
  public void init(StreamGraph graph, Config config) {
    MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”);

    pageViews.map(s -> "processed " + s)
             .sendTo(graph.getOutputStream(“test-samza.Output”));
 }
}


 
// Initialize and run the test framework
TestRunner
	.of(new MyStreamApplication());
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
    .addOutputStream(CollectionStream.empty("test-samza.Outputoutput"))
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.input").contains(Arrays.asList({"processed 1", "processed 2", "processed 4")});

Implementation and Test Plan

...