Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Metadata StreamsTests
Changelog StreamAssertions on Changelog stream in Stateful tests
Checkpoint StreamAsserts on Checkpoint Stream (Kafka or InMemory)
Coordinator StreamAsserts on Coordinator stream (Kafka or InMemory)



Public Interfaces

Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)

 

Code Block
languagejava
themeEclipse
titleTestTaskTestRunner
/**
* TestTaskTestRunner provides static factory for quick setup of Samza environment for testing low Low level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTask
*/ 

public class TestTaskTestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestTaskTestRunner createof(StreamTask task) {...}
  public static TestTaskTestRunner createof(AsyncStreamTask task) {...}
  public static TestRunner of(StreamApplication app) {...}
  
  // Add/Ovveride any custom configs
  public TestTaskTestRunner addOverrideConfigs(Map<String,String> configs) {...}
  public TestTaskTestRunner addOverrideConfigs(String configUri) {...}
  public TestRunner addConfig(String key, String val){...} 
  // Set container mode either single container or multi container
  public TestTaskTestRunner setContainerMode(Mode mode) {...}
  
  // Multithreading configs for users
  public TestTaskTestRunner setTaskMaxConcurrency(Integer value) {...}
  public TestTaskTestRunner setTaskCallBackTimeoutMS(Integer value) {...}
  public TestTaskTestRunner setJobContainerThreadPoolSize(Integer value) {...}
 
  // Configure state for application
  public TestTaskTestRunner addState(String storeName) {...}  
  
  // Configure an input stream for samza system, that task can consume from
  public TestTaskTestRunner addInputStream(CollectionStream stream) {...}
  // Configures an output stream for samza system, that task can producer to
  public TestTaskTestRunner addOutputStream(CollectionStream stream) {...}
  
  // Run the app
  public void run() {...}
}

 

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTaskTestRunner
    .create(myTask)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").contains({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTaskTestRunner
    .create(new AsyncRestTask())
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
	.addOutputStream(CollectionStream.empty("test-samza.Output"))
	.run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});

...