Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Data Injection phase means configuring the input source for Samza job.
  • Data Transformation phase means api logic of samza job(low-level/high-level). 
  • Data Validation phase asserts the expected results to the actual results computed after running a Samza job.

 

Data Injection:

In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.

...

For the low level api once user runs the job, users can assert data from any streams the job uses. Whereas Samza fluent api does job chaining hence only the final expected output can be compared in this case. TaskAssert StreamAssert spins up a consumer for the system under test, gets the messages in the stream and compares this result with the expected value. Various assertion functions provided are:

 

contains
StreamAssertTaskAssertStateAssert
containscontainscontainsInAnyOrder
containsInAnyOrdersatisfies
inWindowisOnTimePane 
inFinalPanesatisfiers 
isOnTimePane  
satifies  

 

Data Types & Partitions:

The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams 

 

Running Config

Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides functions to configure concurrency semantics for their Samza job. 

 Image Removed

Stateful & Stateless Testing:

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(myTask, config, mode.SINGLE_CONTAINER)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))
    .run();
 
// Assertions on the outputs
TaskAssertStreamAssert.that("test-samza.output").containsInAnyOrder({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        // process your message
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTask
    .create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.PageView", pageviews))
	.addOutputStream(CollectionStream.empty("test-samza.Output"))
	.run();
 
// Assertions on the outputs
TaskAssertStreamAssert.that("test-samza.Output").containsInAnyOrder(pageviews);

...

As this is a new feature, no plans are required for compatibility, deprecation and migration.

Rejected Alternatives