Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Test framework will support stateful and stateless testing, satateful testing can be either either done using RocksDB or maintaining state in memory

Future Changes with Stream Descriptors:

...

Code Block
languagejava
themeEclipse
titleTestApplication
/**
* TestApplication provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input  
* streams and then apply various operators on streams and run the application
*/
 
public class TestApplication<T> {  
  // Static factory to config & create runner for High level api 
  public static TestApplication create(HashMap<String, String> config, Mode mode) {...}
  
  // Configure state for application
  public TestApplication addState(Serde<?> serde) {...}  
  // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on
  public <T> MessageStream<T> getInputStream(EventStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {...}
  public <T> MessageStream<T> getInputStream(FileStream<T> stream) {...}
  
  // Run the app
  public void run() { };
}

...

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of 
* multiple partitions for an input stream 
*/
 
public class CollectionStream<T> { 
  // Create an empty stream that a Samza task can produce to
  public static <T> CollectionStream<T> empty(String steamId) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String steamId, List<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition
  public static <T> CollectionStream<T> ofPartitions(String steamId, List<List<T>> collection){...}
  public static <K,V> CollectionStream<T> ofPartitions(String steamId, List<Map<K, V>>> collection){...}
  // Create a stream of messages from input list with multiple partition
  public static <K, V> CollectionStream<KV<K, V>> of(String steamId, Map<K, V> elems) {...}
}

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
StreamTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
// Initialize and run the test framework
TestTask
    .create(myTask, config, mode.SINGLE_CONTAINER)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").containsInAnyOrdercontains({10,30,20,40});
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

// Initialize and run the test framework
TestTask
    .create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER)
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input", {1,2,3,4}))
	.addOutputStream(CollectionStream.empty("test-samza.Output"))
	.run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.Output").containsInAnyOrder({10,20,30,40});
Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
// Initialize and run the test framework
TestApplication
	.create(config, mode.SINGLE_CONTAINER);
    .getInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
    .map(s -> "processed " + s)
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.input").containsInAnyOrdercontains(Arrays.asList("processed 1", "processed 2", "processed 4"));

...