THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- In Memory Data Stream
Users have ability to produce to & consume from in memory system partitions after SEP-8. In Memory Data system alleviates the need of serialization/deserialization of data since it does not persist data.
We take advantage of this and provide very succinct Stream classes to serve as input data sources, which are the following:- Collection Stream
Users can plug a collection (either List or Map) to create an in-memory input stream
e.g: CollectionStream.of(...,{1,2,3,4}) - File Stream
Users can create an in memory input stream reading from a local file
e.g: FileStream.of("/path/to/file") - Event Builder Stream
Event builder helps a user to mimic runtime samza processing environment in its tests, for example adding an exception in the steam, advancing time for window functions
- Collection Stream
- Local Kafka Stream
Users can also consume bounded streams from a kafka topic which serves as initial input. Samza already provide api's to consume from and produce to kafka. For kafka streams we will need serde configurations
...
Samza provides complete flexibility in usage of different data types for input steams, this framework will also provide complete flexibility for usage of primitive and derived data types. Th
...
Public Interfaces
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
/**
* Sample Test case using a collection as an input for a low level application. It demonstrates set up and comparison of a test with minimal(eg. here none) configs and it reads an input stream of integers and multiplies
* each integer with 10
*/
// Create a StreamTask
StreamTask task = new StreamTask() {
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
Integer obj = (Integer) envelope.getMessage();
collector.send(new OutgoingMessageEnvelope(new SystemStream("test-samza","output"), obj*10));
}
};
// Initialize and run the test framework
TestTask
.create(systemName: "test-samza", new StreamTask(), config)
.setJobContainerThreadPoolSize(4)
.addInputStream(CollectionStream.of(streamName: "input", {1,2,3,4}))
.addOutputStream(CollectionStream.empty(streamName: "output"))
.run();
// Assertions on the outputs
TaskAssert.that("test-samza", "Output").containsInAnyOrder({10,30,20,40}); |
Implementation and Test Plan
...