...
- Data Injection phase means configuring the input source for Samza job.
- Data Transformation phase means api logic of samza job(low-level/high-level).
- Data Validation phase asserts the expected results to the actual results computed after running a Samza job.
Data Injection:
In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.
...
StreamAssert | TaskAssert | StateAssert | contains|
---|---|---|---|
contains | containscontainsInAnyOrder | ||
containsInAnyOrder | satisfies | ||
inWindow | isOnTimePane | ||
inFinalPanesatisfiers | |||
isOnTimePane | |||
satifies |
Data Types & Partitions:
The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams
Running Config
Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides functions to configure concurrency semantics for their Samza job.
Stateful & Stateless Testing:
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with * minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10 */ // Create a StreamTask StreamTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; // Initialize and run the test framework TestTask .create(myTask, config, mode.SINGLE_CONTAINER) .setJobContainerThreadPoolSize(4) .addInputStream(CollectionStream.of("test-samza.input", {1,2,3,4})) .addOutputStream(CollectionStream.empty("test-samza.output")) .run(); // Assertions on the outputs TaskAssertStreamAssert.that("test-samza.output").containsInAnyOrder({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Simple Test case using a collection as an input for a low level application in the async mode */ // public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { private Client client; private WebTarget target; @Override public void init(Config config, TaskContext taskContext) throws Exception { // Your initialization of web client code goes here client = ClientBuilder.newClient(); target = client.target("http://example.com/resource/").path("hello"); } @Override public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, final TaskCallback callback) { target.request().async().get(new InvocationCallback<Response>() { @Override public void completed(Response response) { // process your message callback.complete(); } @Override public void failed(Throwable throwable) { System.out.println("Invocation failed."); callback.failure(throwable); } }); } @Override public void close() throws Exception { client.close(); } } // Initialize and run the test framework TestTask .create(new AsyncRestTask(), config, Mode.SINGLE_CONTAINER) .setTaskCallBackTimeoutMS(200) .setTaskMaxConcurrency(4) .addInputStream(CollectionStream.of("test-samza.PageView", pageviews)) .addOutputStream(CollectionStream.empty("test-samza.Output")) .run(); // Assertions on the outputs TaskAssertStreamAssert.that("test-samza.Output").containsInAnyOrder(pageviews); |
...
As this is a new feature, no plans are required for compatibility, deprecation and migration.
Rejected Alternatives