Status
Current state: [ UNDER DISCUSSION ]
Discussion thread: <link to mailing list DISCUSS thread>
JIRA:
Released:
Problem:
Samza today provides various api's like high level (fluent) for chaining complex jobs, low level (task) with configuration like Samza-Yarn, Samza as a library (Samza standalone) and in coming future Samza will support In-memory system for in memory streams that a user can produce/conusme from. Still Samza users have to get their hands dirty with too much configuration details and dependency on external system(like kafka) to write integration tests. In order to alleviate this we intend to provide a fluent test api to users. This will help in setting-up quick tests against their system without depending on underlying incoming stream e.g Kafka Topic. The goal of the proposed SEP we will provide a standardized framework to set-up continuous integration test for High Level Api, Low Level Api in any configuration of Samza with ease. Users won’t have to worry about the underlying Samza config details or even understand low level implementation. This abstraction will increase developer productivity and help them make their Samza system robust.
Motivation
Addition of this Continuous Integration Test Framework will alleviate:
Lack of a standardized way to set up integration tests for users
Lack of brevity in code and much detailed understanding low level config details of the Samza Api to set up samza systems
Lack of a pluggable Test System with any Samza configuration
Set of robust tests for current Samza api
Assumptions
System depends on In Memory system to spin up in-memory system consumers and producers wherever applicable
Initial system runs on SingleContainerGrouperFactory, that means these test are supposed to be ran on a single container environment
In order to provide a testing api with least config management, basic configs are set up for users, but flexibility to add/override some custom config is also provided
Proposed Changes
DATA INJECTION:
The initial source of data stream is always supposed to be bounded since this is a test, and that may originate from various sources
- In Memory Data Stream
Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data. We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:- Collection Stream
Users can plug a collection (either List or Map) to create an in-memory input stream
e.g: CollectionStream.of(...,{1,2,3,4}) - Event Builder Stream
Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions
- Collection Stream
- File Stream
Users can create an input stream reading and writing from local file
e.g: FileStream.of("/path/to/file")
- Local Kafka Stream
Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations
DATA TYPES:
Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types.
Public Interfaces
/** * Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with minimal(eg. here none) configs and it reads an input stream of integers and multiplies * each integer with 10 */ // Create a StreamTask StreamTask myTask = new StreamTask() { @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { Integer obj = (Integer) envelope.getMessage(); collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10)); } }; // Initialize and run the test framework TestTask .create(systemName: "test-samza", myTask, config) .setJobContainerThreadPoolSize(4) .addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4})) .addOutputStream(CollectionStream.empty(streamName: "output")) .run(); // Assertions on the outputs TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40});
Implementation and Test Plan
Compatibility, Deprecation, and Migration Plan
Rejected Alternatives