Status
Current state: [ UNDER DISCUSSION ACCEPTED ]
Discussion thread: DISCUSS
...
Gliffy Diagram | ||||
---|---|---|---|---|
|
Data Injection:
In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.
...
StreamAssert | StateAssert |
---|---|
contains | contains |
containsInAnyOrder | satisfies |
inWindowPane | |
satifies |
Data Types & Partitions:
The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams
...
Shown below is a targeted test matrix plan to test various components of Samza in p2.
n = number of threads
x = number of partitions
m = number of containers
Samza API | Concurrency | Partitions | Container | Expected Result | ||
---|---|---|---|---|---|---|
StreamTask | task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p <= nx | 1 / nm | in order processing | ||
task.max.concurrency > 1 job.container.thread.pool.size = n | 1 <= p <= nx | 1 / nm | out of order processing | |||
AsyncStreamTask | task.max.concurrency = 1 | 1 <= p <= nx | 1 / nm | in order processing | ||
task.max.concurrency = n | 1 <= p <= nx | 1 / nm | out of order processing | |||
Windowable Task | N/A | 1 <= p <= nx | 1 / nm | expecting processing n messages for messages window of time t | ||
Initiable Task | N/A | 1 <= p <= nx | 1 / nm | stateful testing (assertions on kv store) | ||
Closable Task | N/A | 1 <= p <= nx | 1 / nm | verify closing the client? | ||
Map / Flatmap / Filter / Partition By / Merge / SendTo / Sink / Join / Window
| task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p <= nx | 1 / | nm | in order processing | |
task.max.concurrency > 1 job.container.thread.pool.size = n | out of order processing |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStreamSystem provides utilities to create and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} } |
EventStreamFileStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job,
* advancing time for windowing functions
*/
public class EventStream<T> {
public static abstract class Builder<T> {
public abstract Builder addElement();
public abstract Builder addException();
public abstract Builder advanceTimeTo(long time);
public abstract EventStream<T> build();
}
public static <T> Builder<T> builder() {...}
}
public class EventStreamSystem {
public static EventStreamSystem create(String name) {...}
} |
FileStream:
| ||||||
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * FileStream provides utilities to build a stream of messages from a file on disk */ public class FileStream<T> { public static <T> FileStream<T> of(String fileUri) {...} } public class FileStreamSystem { public static FileStreamSystem create(String name) {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask MyStreamTestTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10)); } }; CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyStreamTestTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs Assert.assertThat(TestRunner.consumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40}))); |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); TestRunner .of(MyAsyncStreamTask.class) .addInputStream(input) .addOutputStream(output) .run(); // Assertions on the outputs StreamAssert.that(output).containsInAnyOrder({10,20,30,40}); |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a High level application */ public class MyStreamApplication implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<Integer> pageViews = graph.getInputStream(“test-samza.page-views”); pageViews.map(s -> "processed " + s) .sendTo(graph.getOutputStream(“test-samza.output”)); } } CollectionStream<Integer> input = CollectionStream .of("test", "input", {1,2,3,4}) .from(system); CollectionStream output = CollectionStream .empty("test", "output") .from(system); // Initialize and run the test framework TestRunner .of(new MyStreamApplication()); .addInputStream(input) .addOutputStream(output) .addOverrideConfig("job.default.system", "test") .run(); // Assertions on the outputs StreamAssert.that(output).contains({"processed 1", "processed 2", "processed 4"}); |
...