Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titleTestRunner
/**
* TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication
*/ 

public class TestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestRunner of(StreamTaskClass tasktaskClass) {...}
  // Static  public static TestRunner of(AsyncStreamTask task) {...}Factory to config & create runner for high level api
  public static TestRunner of(StreamApplication app) {...}
  
  // Add/Ovveride any custom configs
  public TestRunner addOverrideConfigsaddConfigs(Map<String,String> configs) {...}
  public TestRunner addOverrideConfigsaddConfigs(StringConfig configUriconfigs) {...}
  public TestRunner addOverrideConfig(String key, String val){...}
 
  // SetConfigure containerstate mode either single container or multi containerfor application
  public TestRunner setContainerModeaddState(ModeString modestoreName) {...}  
  
  // Multithreading configsConfigure an input stream for users samza system, that job can consume from
  public TestRunner setTaskMaxConcurrencyaddInputStream(IntegerCollectionStream valuestream) {...}
  // Configures an output stream for samza system, publicthat TestRunnerjob setTaskCallBackTimeoutMS(Integer value) {...}can producer to
  public TestRunner setJobContainerThreadPoolSizeaddOutputStream(IntegerCollectionStream valuestream) {...}
   
  // ConfigureRun statethe for applicationapp
  public TestRunnervoid addStaterun(String storeName) {...}  
  
  // Configure an input stream for samza system, that job can consume from
  public TestRunner addInputStream(CollectionStream stream) {...}
}

 

 

Types of Input Streams

 

CollectionStream:

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of collections(list, map).
*/
 
public class CollectionStream<T> { 
  // ConfiguresCreate an outputempty stream forthat samza system, thata Samza job can producerproduce to
  public TestRunner addOutputStream(CollectionStream streamstatic <T> CollectionStream<T> empty(String streamName) {...}
  
  // Run the app Create a stream of messages from input list with single partition
  public static <T> voidCollectionStream<T> run() of(String streamName, Iterable<T> collection){...}
}

 

 

Types of Input Streams

 

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of 
* multiple partitions for an input stream 
*/
 
public class CollectionStream<T> { 
  // Create an empty stream that a Samza job can produce to
  public static <T> CollectionStream<T> empty(String steamId  
  // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String streamName, Map<Integer,? extends Iterable<T>> partitions){...}
  
  // 
  public CollectionStream<T> from(CollectionStreamSystem system) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String steamId, Iterable<T> collection)
}
Code Block
languagejava
themeEclipse
titleCollectionStreamSystem
/**
* CollectionStreamSystem provides utilities to create and initialize an in memory input stream.
*/
 
public class CollectionStreamSystem { 

 // Create a CollectionStreamSystem 
 public static CollectionStreamSystem create(String name) {...}
  
  // Create
 apublic stream<T> ofCollectionStreamSystem messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String steamIdaddInput(String streamName, Map<Integer, Iterable<T>> partitions) {...}
 
 public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Iterable<T>> partitions) {...}
  
}

 

 

EventStream:

Code Block
Code Block
languagejava
themeEclipse
titleEventStream
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, 
* advancing time for windowing functions  
*/
 
public class EventStream<T> {
  public static abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addException();
    public abstract Builder advanceTimeTo(long time);
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}
 
public class EventStreamSystem {
  public static EventStreamSystem create(String name) {...}
}
Code Block
languagejava
themeEclipse
titleFileStream
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
 
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}

...


 
public class FileStreamSystem {
  public static FileStreamSystem create(String name) {...}
}


Examples Usages of Test Api:


Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
*

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
StreamTaskMyStreamTestTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
  }
};
 
//CollectionStreamSystem Initializesystem and run the test framework
TestRunner
    .of(myTask)
    .setJobContainerThreadPoolSize(4)
    .addInputStream(CollectionStream.of("test-samza.= CollectionStreamSystem.create("test");


CollectionStream<Integer> input = CollectionStream
	.of("input", {1,2,3,4})
	.from(system);


CollectionStream output =  .addOutputStream(CollectionStream
	.empty("test-samza.output"))
  )
	.from(system);

TestRunner
    .of(MyStreamTestTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
 
 
// Assertions on the outputs
StreamAssertAssert.that("test-samza.output")assertThat(StreamUtils.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,20,40})));

 

 

Code Block
languagejava
themeEclipse
titleSimple StreamTask with StreamDescriptorsAsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. Itin demonstratesthe set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/

HashMap<String,String> configs = new HashMap<String,String>();
 
class MyStreamTaskFactory implements StreamTaskFactory {
    @Override
    public StreamTask createInstance() {
      return (envelope, collector, coordinator) -> {
        return envelope * 10;async mode
*/

// 
public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      };
TaskCoordinator coordinator, final TaskCallback callback) }{
   }
}

StreamDescriptor.Input input = StreamDescriptor.<String, String>input("input")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

StreamDescriptor.Output output = StreamDescriptor.<String, String>output("output")
    .withKeySerde(new StringSerde("UTF-8"))
    .withMsgSerde(new StringSerde("UTF-8"));

// InMemorySystem maintains system and stream config
InMemoryTestSystem testingSystem = InMemoryTestSystem.create("samza-test")
    .addInput(input, {1,2,3,4,5})
    .addOutput(output);

 
// JOB Specific Config
configs.put(JobConfig.JOB_NAME(), JOB_NAME);


// Default Single Container configs
configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1");
configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());


 
StreamTaskApplication app = StreamApplications.createStreamTaskApp(new MapConfig(configs), new MyStreamTaskFactory());
app.addInputs(Collections.singletonList(input)).addOutputs(Collections.singletonList(output)).run();
app.waitForFinish();
 
// Assertions on the outputs
Assert.assertThat(testingSystem.getStreamState(output), IsIterableContainingInOrder.contains({10,20,30,40,50})));
Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
     target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}
 

//CollectionStreamSystem Initializesystem and run the test framework
TestRunner
    .of(new AsyncRestTask())
	.setTaskCallBackTimeoutMS(200)
	.setTaskMaxConcurrency(4)
	.addInputStream(CollectionStream.of("test-samza.Input= CollectionStreamSystem.create("test"); 

CollectionStream<Integer> input = CollectionStream
	.of("input", {1,2,3,4}))
	.addOutputStreamfrom(system);

CollectionStream output = CollectionStream
	.empty("test-samza.output"))
	output")
	.from(system);

TestRunner
    .of(MyAsyncStreamTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.output").containsInAnyOrder({10,20,30,40});
Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
public class MyStreamApplication implements StreamApplication {
  @Override
  public void init(StreamGraph graph, Config config) {
    MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”);

    pageViews.map(s -> "processed " + s)
             .sendTo(graph.getOutputStream(“test-samza.output”));
 }
}


/
 
public class MyStreamApplication implements StreamApplication {
  @Override
  public void init(StreamGraph graph, Config config) {
    MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”);
    pageViews.map(s -> "processed " + s)
             .sendTo(graph.getOutputStream(“test-samza.output”));
 }
}
 
CollectionStreamSystem system = CollectionStreamSystem.create("test");

CollectionStream<Integer> input = CollectionStream
	.of("input", {1,2,3,4})
	.from(system);

CollectionStream output = CollectionStream
	.empty("output")
	.from(system);
 
// Initialize and run the test framework
TestRunner
	.of(new MyStreamApplication());
    .addInputStream(CollectionStream.of("test-samza.input", {1,2,3}))
    .addOutputStream(CollectionStream.empty("test-samza.output"))output)
    .addOverrideConfig("job.default.system", "test") 
    .run();
 
// Assertions on the outputs
StreamAssert.that("test-samza.Output"output).contains({"processed 1", "processed 2", "processed 4"});

...