Status
Current state: [ UNDER DISCUSSION ACCEPTED ]
Discussion thread: <link to mailing list DISCUSS thread>
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released:
Problem:
Samza today provides various api's like high level (fluent/application) for chaining complex jobs, low level (task) with configuration like Samza-Yarn, Samza as a library (Samza standalone) and in coming future Samza will support In-memory system for in memory streams that a user can produce/conusme from. Still Samza users have to get their hands dirty with too much configuration details and dependency on external system(like kafka) to write integration tests. In order to alleviate this we intend to provide a fluent test api to users. This will help in setting-up quick tests against their system without depending on underlying incoming stream from another system e.g Kafka Topic.
The goal of the proposed SEP we will provide a standardized framework to set-up continuous integration test for High Level Api, Low Level Api with Single Container configuration & Multi Container Configuration (Samza standalone using Zookeeper) with ease. Users won’t have to worry about the underlying Samza config details or even understand implementation details. This abstraction will increase developer productivity and help client's make their Samza system robust.
Motivation
Addition of this Continuous Integration Test Framework will alleviate:
Lack of a standardized way to set up integration tests for users
Lack of brevity in code and much detailed understanding low level config details of the Samza Api to set up samza systems
Lack of a pluggable Test System with any Samza single & multi-container configuration
Set of robust tests for current Samza api
Assumptions
System depends on In Memory system to spin up in-memory system consumers and producers
Single Container system runs on SingleContainerGrouperFactory, that means these test are supposed to be ran on a single container environment
Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service
In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is also provided
Proposed Changes
We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation. Injection phase means configuring the input source for Samza job. Transformation phase means api logic of samza job(low-level/high-level). Validation phase asserts the expected results to the actual results computed after running a job.
Data Injection:
The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources
- In Memory Data Stream
Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data. We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:- Collection Stream
Users can plug a collection (either List or Map) to create an in-memory input stream
e.g: CollectionStream.of(...,{1,2,3,4}) - Event Builder Stream
Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions
- Collection Stream
- File Stream
Users can create an input stream reading and writing from local file
e.g: FileStream.of("/path/to/file") - Local Kafka Stream
Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations
Data Transformation:
Framework supports Samza job application logic, either using Low level api (StreamTask and AsyncStream Task) & High level api. This is done in the same way user writes their Samza job, but instead of writing verbose configs, they just need to pass class instance implementing this logic to the api.
Data Validation:
Data Types & Partitions:
Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types. Test framework will provide api's for initialization of input streams (data injection), read from/write to single partition and multi-partition bounded streams (data transformation) and verification of expected to actual results (data validation)
Running Config
Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides apis to configure concurrency semantics for the job.
Public Interfaces
Two Apis for writing tests are: Low Level Test Api (TestTask) & High Level Api (TestApplication)
Targeted audience for this SEP are two sets of users
- Samza Job Owners: To make their Samza jobs robust
- Samza Dev Teams: To enable dev teams make their samza infra robust
This SEP targets two phase development of the Framework:
- PHASE I: Testing Samza's public apis targeting Samza job Owners
- PHASE II: Testing various component of Samza targeting testing Samza Dev team
Terminologies
- p1: refers to apis targeted for development for Phase I
- p2: refers to apis targeted for development for Phase II
Motivation
Addition of this Integration Test Framework will alleviate:
Lack of a standardized way to set up integration tests for users
Lack of brevity in code and much detailed understanding of low level configs to set up samza systems
Lack of a pluggable Test System with any Samza configuration (single & multi-container)
Comprehensive set of tests for Samza api
Assumptions
System depends on In Memory system to spin up in-memory system consumers and producers for in memory data streams
Single Container mode runs on SingleContainerGrouperFactory
Multi Container system leverages Samza as a library (Samza standalone) using Zookeeper as coordination service for setting up test
In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is provided
Testing is always supposed to be done using bounded streams using EndOfStreamMessage
System should directly consume from File stream or Local Kafka stream without any additional configs, File Stream implementation is out of scope for this SEP, although this SEP suggests what the design for file stream should look like
Bounded message streams from Kafka (either using Latch or EndOfStreamMessage) is beyond the scope of this SEP
For Stateful testing system may use RocksDb as it ships with Samza or maintain the state in memory
Design
We propose a three step process for configuration of integration test for your samza job. Three facets are data injection, transformation and validation.
- Data Injection phase means configuring the input source for Samza job.
- Data Transformation phase means api logic of samza job(low-level/high-level).
- Data Validation phase asserts the expected results to the actual results computed after running a Samza job.
Gliffy Diagram | ||||
---|---|---|---|---|
|
Data Injection:
In the context of this Test framework an input stream means a stream that Samza job may consume from and an output stream means a stream that Samza job can produce to. The initial source of data stream is always supposed to be bounded since this is a test, and that may originate as one of the following sources listed below.
- In Memory Data Stream
Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data. We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:- Collection Stream (p1)
Users can plug a collection (either List or Map) to create an in-memory input stream
e.g: CollectionStream.of(...,{1,2,3,4}) - Event Builder Stream (p2)
Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions.
- Collection Stream (p1)
- File Stream
Users can create an input stream reading and writing from local file
e.g: FileStream.of("/path/to/file") - Local Kafka Stream
Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations, however implementation of bounded streams in kafka (using Latche Message or EndOfStreamMessage) is beyond scope of this SEP
For in memory streams the api actually initializes the stream by spinning up a Samza producer using an InMemorySystemProducer to write the stream, this is how a collection of data or events gets initialized as a steam. It also configures any output stream if the user has added any.
Data Transformation:
This is the Samza job you write, this can be either done using Low Level Api or the fluent High Level Api. Test frameworks provides api to set up test for both the cases. Test framework supports both the apis with Single container and Multi-container mode. Users implement StreamTask/Async Stream/Stream Application in the same way, as they do for their Samza job (or pass an instance of Samza job).
Data Validation:
StreamAssert | StateAssert |
---|---|
contains | contains |
containsInAnyOrder | satisfies |
inWindowPane | |
satifies |
Data Types & Partitions:
The framework will provide complete flexibility for usage of primitive and derived data types. Serdes are required for local kafka stream and file stream but in memory streams dont require any Serde configuration. Serdes will be also required if users want to maintain State. Test framework will provide api's for initialization of input streams (both single and multi-partition), and also data validation on single partition and multi-partition of the bounded streams
Running Config
Traditionally we ask users to set up config for any samza job, for test purposes we set up basic config boiler plate for users and provide them a flexible option to still add any custom config (rarely needed), api exposes functions to configure single container or multi container mode (using Zookeeper). It also provides functions to configure concurrency semantics for their Samza job.
Stateful & Stateless Testing:
Test framework will support stateful and stateless testing, satateful testing can be either done using RocksDB or maintaining state in memory
Future Changes with Stream Descriptors:
The test framework is designed in a way which asks users to do none or minimal Samza configs, in future we intend to use StreamDescriptors in the test framework to do Samza configs. With StreamDescriptor and High Level api, another wrapper can be provided over the job to run the configuration in a test environment
Test Matrix (Plan)
Shown below is a targeted test matrix plan to test various components of Samza in p2.
n = number of threads
x = number of partitions
m = number of containers
Samza API | Concurrency | Partitions | Container | Expected Result | |
---|---|---|---|---|---|
StreamTask | task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p <= x | 1 / m | in order processing | |
task.max.concurrency > 1 job.container.thread.pool.size = n | 1 <= p <= x | 1 / m | out of order processing | ||
AsyncStreamTask | task.max.concurrency = 1 | 1 <= p <= x | 1 / m | in order processing | |
task.max.concurrency = n | 1 <= p <= x | 1 / m | out of order processing | ||
Windowable Task | N/A | 1 <= p <= x | 1 / m | expecting processing n messages for messages window of time t | |
Initiable Task | N/A | 1 <= p <= x | 1 / m | stateful testing (assertions on kv store) | |
Closable Task | N/A | 1 <= p <= x | 1 / m | verify closing the client | |
Map / Flatmap / Filter / Partition By / Merge / SendTo / Sink / Join / Window
| task.max.concurrency = 1 job.container.thread.pool.size = n | 1 <= p <= x | 1 / m | in order processing | |
task.max.concurrency > 1 job.container.thread.pool.size = n | out of order processing |
Metadata Streams | Tests | |
---|---|---|
Changelog Stream | Assertions on Changelog stream in Stateful tests | |
Checkpoint Stream | Asserts on Checkpoint Stream (Kafka or InMemory) | |
Coordinator Stream | Asserts on Coordinator stream (Kafka or InMemory) |
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* TestRunner provides static factory for quick setup of Samza environment for testing Low level api and High level api , users can configure input streams
* they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask/AsyncStreamTask/StreamApplication
*/
public class TestRunner {
// Static Factory to config & create runner for low level api
public static TestRunner of(Class taskClass) {...}
// Static Factory to config & create runner for high level api
public static TestRunner of(StreamApplication app) {...}
// Add/Ovveride any custom configs
public TestRunner addConfigs(Config configs) {...}
public TestRunner addOverrideConfig(String key, String val){...}
// Configure state for application
public TestRunner addState(String storeName) {...}
// Configure an input stream for samza system, that job can consume from
public TestRunner addInputStream(CollectionStream stream) {...}
// Configures an output stream for samza system, that job can producer to
public TestRunner addOutputStream(CollectionStream stream) {...}
// Consume messages from a Stream
public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) {...}
// Run the TestRunner
public void run() {...}
}
|
Types of Input Streams
CollectionStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestTaskCollectionStream provides static factory for quick setup of Samza environment for testing low level api, users can configure input streams * they consume from and output streams they produce to, users pass in their Samza job api logic via StreamTask and AsyncStreamTaskacts a descriptor that can be used to build an in memory input stream (single/multiple partition) of java collections */ public class TestTaskCollectionStream<T> { // private TestTask(String systemName, StreamTask task, HashMap<String, String> config) {...} private TestTaskCreate an empty stream with single partition that a Samza job can produce to public static <T> CollectionStream<T> empty(String systemName, AsyncStreamTask task, HashMap<String, String> configString streamName) {...} // Static Factory to config & create runner for low level api public static TestTask create(String systemName, StreamTask task, HashMap<String, String> config) {...}} // Create an empty stream with multiple partitions that a Samza job can produce to public static <T> TestTaskCollectionStream<T> createempty(String systemName, AsyncStreamTask task, HashMap<StringString streamName, String>Integer configpartitionCount) {...} // Create Multithreadinga configsstream forof users messages from publicinput TestTasklist setTaskMaxConcurrency(Integer value) {...}with single partition public TestTaskstatic setTaskCallBackTimeoutMS(Integer value) {...} public TestTask setTaskMaxConcurrency(Integer value) <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> collection){...} // Configure an Create a stream of messages from input streamlist forwith samzamultiple systempartition, key thatof taskpartitions canmap consumeis frompartitionId public TestTask addInputStream(CollectionStream stream) static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer,? extends Iterable<T>> partitions){...} // Configures an output stream for samza system, that task can producer to public TestTask addOutputStream(CollectionStream stream) {...} // Run the app public void run( } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * CollectionStreamSystem provides utilities to create and initialize an in memory input stream. */ public class CollectionStreamSystem { // Create a CollectionStreamSystem public static CollectionStreamSystem create(String name) {...} } |
FileStream:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * TestApplicationFileStream provides static factory for quick setup of Samza environment for testing High Level Api, users can configure input * streams and then apply various operators on streams and run the application */ utilities to build a stream of messages from a file on disk */ public class FileStream<T> { public static <T> FileStream<T> of(String fileUri) {...} } public class TestApplication<T>FileStreamSystem { public static privateFileStreamSystem TestApplicationcreate(String HashMap<String, String> configs, Mode mode) // Static factory to config & create runner for High level api public static TestApplication create(String systemName, HashMap<String, String> config) {...} // Configure any kind of input stream for samza system, and get the handle of message stream to apply operators on public <T> MessageStream<T> getInputStream(EventStream<T> stream) {return null;} public <T> MessageStream<T> getInputStream(CollectionStream<T> stream) {return null;} public <T> MessageStream<T> getInputStream(FileStream<T> stream) {return null;} // Run the app public void run() { }; } |
Types of Input Streams
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* CollectionStream provides utilities to build an in memory input stream of collections(list, map). It also supports initialization of
* multiple partitions for an input stream
*/
public class CollectionStream<T> {
private CollectionStream(String systemStream, List<T> collection, Integer partitionCount) {...}
private CollectionStream(String systemStream) {...}
// Create an empty stream that a Samza task can produce to
public static <T> CollectionStream<T> empty(String systemStream) {...}
// Create a stream of messages from input list with single partition
public static <T> CollectionStream<T> of(String systemStream, List<T> collection){...}
// Create a stream of messages from input list with multiple partition
public static <T> CollectionStream<T> ofPartitions(String systemStream, List<List<T>> collection){...}
// Create a stream of messages from input list with multiple partition
public static <K, V> CollectionStream<KV<K, V>> of(String systemStream, Map<K, V> elems) {...}
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* EventStream provides utilities to build an in memory input stream of events. It helps mimic run time environment of your job,
* advancing * time for windowing functions
*/
public class EventStream<T> {
public static abstract class Builder<T> {
public abstract Builder addElement();
public abstract Builder addException();
public abstract Builder advanceTimeTo(long time);
public abstract EventStream<T> build();
}
public static <T> Builder<T> builder() {...}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* FileStream provides utilities to build a stream of messages from a file on disk
*/
public class FileStream<T> {
public static <T> FileStream<T> of(String fileUri) {...}
} |
...
name) {...}
} |
Examples Usages of Test Api:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies each integer with 10
*/
// Create a StreamTask
MyStreamTestTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10));
}
};
CollectionStream<Integer> input = CollectionStream
.of("test", "input", {1,2,3,4})
CollectionStream output = CollectionStream
.empty("test", "output")
TestRunner
.of(MyStreamTestTask.class)
.addInputStream(input)
.addOutputStream(output)
.run();
// Assertions on the outputs
Assert.assertThat(TestRunner.consumeStream(output), IsIterableContainingInOrder.contains({10,20,30,40}))); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a low level application in the async mode
*/
//
public class MyAsyncStreamTask implements AsyncStreamTask, InitableTask, ClosableTask {
private Client client;
private WebTarget target;
@Override
public void init(Config config, TaskContext taskContext) throws Exception {
// Your initialization of web client code goes here
client = ClientBuilder.newClient();
target = client.target("http://example.com/resource/").path("hello");
}
@Override
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator, final TaskCallback callback) {
target.request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test","output"), obj*10));
callback.complete();
}
@Override
public void failed(Throwable throwable) {
System.out.println("Invocation failed.");
callback.failure(throwable);
}
});
}
@Override
public void close() throws Exception {
client.close();
}
}
CollectionStream<Integer> input = CollectionStream
.of("test", "input", {1,2,3,4});
CollectionStream output = CollectionStream
.empty("test", "output");
TestRunner
.of(MyAsyncStreamTask.class)
.addInputStream(input)
.addOutputStream(output)
.run();
// Assertions on the outputs
StreamAssert.that(output).containsInAnyOrder({10,20,30,40});
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Simple Test case using a collection as an input for a High level application
*/
public class MyStreamApplication implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
MessageStream<Integer> pageViews = graph.getInputStream(“test.page-views”);
pageViews.map(s -> "processed " + s)
.sendTo(graph.getOutputStream(“test.output”));
}
}
CollectionStream<Integer> input = CollectionStream
.of("test", "input", {1,2,3,4});
CollectionStream output = CollectionStream
.empty("test", "output");
|
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with
* minimal(eg. here none) configs and it reads an input stream of integers and multiplies
* each integer with 10
*/
// Create a StreamTask
StreamTask myTask = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(systemName: "test-samza", myTask, config, mode.SINGLE_CONTAINER)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty(streamName: "output"))
.run();
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40}); |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
// Initialize and run the test framework TestApplicationTestRunner .createof(config, mode.SINGLE_CONTAINERnew MyStreamApplication()); .getInputStream(CollectionStream.of(streamName: "input", {1,2,3}))addInputStream(input) .addOutputStream(output) .map(s -> "processed " + s)addOverrideConfig("job.default.system", "test") .run(); // Assertions on the outputs StreamAssert.that(streamoutput).containsInAnyOrder(Arrays.asList(contains({"processed 1", "processed 2", "processed 4")); |
...
", "processed 4"}); |
Implementation and Test Plan
- Introduce the new interface and api for test framework
- Use the same api to write comprehensive integration test for:
- Samza Low level Async Api (Single and Multi-Container Mode)
- Samza Low level Synchronous Api (Single and Multi-Container Mode)
- Samza High Level Api (Single and Multi-Container Mode)
Compatibility, Deprecation, and Migration Plan
As this is a new feature, no plans are required for compatibility, deprecation and migration.
Rejected Alternatives