Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION  ACCEPTED ]

Discussion threadDISCUSS

...

Gliffy Diagram
nameFlow New
pagePin6

Data Injection:

In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.

...

 

StreamAssertStateAssert
containscontains
containsInAnyOrdersatisfies
inWindowPane 
satifies 

 

Data Types & Partitions:

The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams 

...

Shown below is a targeted test matrix plan to test various components of Samza in p2.

nnumber of threads

x = number of partitions 

m = number of containers

n
Samza APIConcurrencyPartitionsContainerExpected Result
StreamTask
task.max.concurrency = 1
job.container.thread.pool.size = n
1 <= p <= nx1 / nmin order processing
   
task.max.concurrency > 1
job.container.thread.pool.size = n
1 <= p <= nx1 / nmout of order processing
   
AsyncStreamTask
task.max.concurrency = 11 <= p <= nx1 / nmin order processing
task.max.concurrency = n1 <= p <= nx1 / nmout of order processing
Windowable TaskN/A1 <= p <= nx1 / nmexpecting processing n messages for messages window of time t
Initiable TaskN/A1 <= p <= nx1 / nmstateful testing (assertions on kv store)
Closable TaskN/A1 <= p <= nx1 / nmverify closing the client?

Map / Flatmap / Filter /

Partition By / Merge /

SendTo / Sink / Join /

Window

 

task.max.concurrency = 1

job.container.thread.pool.size = n

1 <= p <= nx 1 / min order processing

task.max.concurrency > 1

job.container.thread.pool.size = n

out of order processing

...

Code Block
languagejava
themeEclipse
titleTestRunner
/**
* TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams 
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication
*/ 

public class TestRunner {
  
  // Static Factory to config & create runner for low level api 
  public static TestRunner of(Class taskClass) {...}
  // Static Factory to config & create runner for high level api
  public static TestRunner of(StreamApplication app) {...}
  
  // Add/Ovveride any custom configs
  public TestRunner addConfigs(Map<String,String>Config configs) {...}
  public TestRunner addConfigs(Config configs) {...}
  public TestRunner addOverrideConfigaddOverrideConfig(String key, String val){...}
 
  // Configure state for application
  public TestRunner addState(String storeName) {...}  
  
  // Configure an input stream for samza system, that job can consume from
  public TestRunner addInputStream(CollectionStream stream) {...}
  // Configures an output stream for samza system, that job can producer to
  public TestRunner addOutputStream(CollectionStream stream) {...}
 
  // Consume messages from a Stream
  public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) {...}
  
  // Run the TestRunner
  public void run() {...}
}

 

...

Code Block
languagejava
themeEclipse
titleCollectionStream
/**
* CollectionStream acts a descriptor that can be used to build an in memory input stream (single/multiple partition) of java collections
*/
 
public class CollectionStream<T> { 
  // Create an empty stream with single partition that a Samza job can produce to
  public static <T> CollectionStream<T> empty(String systemName, String streamName) {...}
  
  // Create an empty stream with multiple partitions that a Samza job can produce to
  public static <T> CollectionStream<T> empty(String systemName, String streamName, Integer partitionCount) {...}
  
  // Create a stream of messages from input list with single partition
  public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> collection){...}
  
  // Create a stream of messages from input list with multiple partition, key of partitions map is partitionId
  public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer,? extends Iterable<T>> partitions){...}
  
  // Associate this CollectionStream with a CollectionStreamSystem
  public CollectionStream<T> from(CollectionStreamSystem system) {...}  
}


Code Block
languagejava
themeEclipse
titleCollectionStreamSystem
/**
* CollectionStreamSystem provides utilities to create and initialize an in memory input stream.
*/
 
public class CollectionStreamSystem { 

 // Create a CollectionStreamSystem 
 public static CollectionStreamSystem create(String name) {...}  
 // Create an In memory stream and initialize it with messages from partitions map
 public <T> CollectionStreamSystem addInput(String streamName, Map<Integer, Iterable<T>> partitions) {...}
 // Create an empty in memory stream with as many partitions as partitionCount
 public <T> CollectionStreamSystem addOutput(String streamName, Map<Integer, Integer partitionCount) {...}
 
}

 

 

EventStream:

Code Block
languagejava
themeEclipse
titleEventStream
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job, 
* advancing time for windowing functions  
*/
 
public class EventStream<T> {
  public static abstract class Builder<T> {
    public abstract Builder addElement();
    public abstract Builder addException();
    public abstract Builder advanceTimeTo(long time);
    public abstract EventStream<T> build(); 
 }

 public static <T> Builder<T> builder() {...}
}and initialize an in memory input stream.
*/
 
public class EventStreamSystemCollectionStreamSystem { 

 // Create a CollectionStreamSystem 
 public static EventStreamSystemCollectionStreamSystem create(String name) {...}  
 
}

 

FileStream:

Code Block
languagejava
themeEclipse
titleFileStream
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
 
public class FileStream<T> {
  public static <T> FileStream<T> of(String fileUri) {...}
}
 
public class FileStreamSystem {
  public static FileStreamSystem create(String name) {...}
}

...

Code Block
languagejava
themeEclipse
titleSimple StreamTask Test
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with 
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
 
// Create a StreamTask
MyStreamTestTask myTask = new StreamTask() {
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
    Integer obj = (Integer) envelope.getMessage();
    collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10));
  }
};
 
CollectionStreamSystem system = CollectionStreamSystem.create("test"));
  }
};
 

CollectionStream<Integer> input = CollectionStream
	.of("test", "input", {1,2,3,4})
	.from(system);


CollectionStream output = CollectionStream
	.empty("test", "output")
	.from(system);

TestRunner
    .of(MyStreamTestTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
 
 
// Assertions on the outputs
Assert.assertThat(StreamUtilsTestRunner.getStreamStateconsumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40})));

...

Code Block
languagejava
themeEclipse
titleSimple AsyncStreamTask Test
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/

// 
public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target; 

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    // Your initialization of web client code goes here
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback<Response>() {
      @Override
      public void completed(Response response) {
        Integer obj = (Integer) envelope.getMessage();
        collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

CollectionStreamSystem system = CollectionStreamSystem.create("test"); 

CollectionStream<Integer> input = CollectionStream
	.of(of("test", "input", {1,2,3,4})
	.from(system);

CollectionStream output = CollectionStream
	.empty("test", "output")
	.from(system);

TestRunner
    .of(MyAsyncStreamTask.class)
    .addInputStream(input)
    .addOutputStream(output)
    .run();
// Assertions on the outputs
StreamAssert.that(output).containsInAnyOrder({10,20,30,40});

...

Code Block
languagejava
themeEclipse
titleSimple High Level Api Test
/**
* Simple Test case using a collection as an input for a High level application
*/
 
public class MyStreamApplication implements StreamApplication {
  @Override
  public void init(StreamGraph graph, Config config) {
    MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”);
    pageViews.map(s -> "processed " + s)
             .sendTo(graph.getOutputStream(“test-samza.output”));
 }
}
 
CollectionStreamSystem system = CollectionStreamSystem.create("test");

CollectionStream<Integer> input = CollectionStream
	.of("test", "input", {1,2,3,4})
	.from(system);

CollectionStream output = CollectionStream
	.empty("test", "output")
	.from(system);
 
// Initialize and run the test framework
TestRunner
	.of(new MyStreamApplication());
    .addInputStream(input)
    .addOutputStream(output)
    .addOverrideConfig("job.default.system", "test") 
    .run();
 
// Assertions on the outputs
StreamAssert.that(output).contains({"processed 1", "processed 2", "processed 4"});

...