...
In-memory system is applicable only for jobs in local execution environment. Remote execution environment isn’t supported.
The scope of in-memory system and the data it handles are limited to a container. I.e. there is no support for process to process interaction or sharing.
Checkpointing is not supported and consumers always start from the beginning in case of restart.
In-memory system doesn’t support persistence and is not the source of truth for the data. The data in the queue is lost when the job restarts or shutdowns unexpectedly.
Design
Data Source & Sink
The input data source for the in-memory system can be broadly classified as bounded and unbounded data. We are limiting the scope of this SEP to only bounded data source that is immutable as the input source. It simplifies the view of the data and also the initialization step for the consumers. However, in-memory system for intermediate streams supports both bounded and unbounded data. We will introduce a configuration to tune memory pool for the intermediate queue. . The sink a.k.a output source is modeled to be mutable.
Data Partitioning
Samza is a distributed stream processing framework that achieves parallelism with partitioned data. With a bounded data source, we need to think about how the data is going to be partitioned and how do we map data to SystemStreamPartition in Samza.
...
In order to exploit the parallelism that the Samza framework offers and to enable users test their job with multiple tasks, we need to support multiple partitioning. There are couple of ways to support multiple partitions.
...
Partitioning at source
In this approach, we push the partitioning to the source. For e.g. we can read of a `Collection<Collection<T>>` and have each collection within the collection assigned to one partition. This is surprisingly simple yet powerful since it eliminates the need for repartitioning phase and allows the user to group the data at his/her whim. The downside w/ this approach is the input collections can be skewed and Samza don’t control the evenness in the distribution of the data. Since the primary use case is testing, the skew should have negligible impact.
B. Partitioning within Samza
Takes a collection from the user and applies a partitioning strategy. The strategy could be as simple as a round robin strategy or random assignment strategy. How do we determine the partition count? We can either have the user specify the number of partitions (introduces new configuration in Samza). Alternatively, we can also automatically come up with partition number based on the input data source. TBD
C. Partitioning within Samza w/ configurable strategy
We follow a similar strategy as “Partitioning within Samza” with the additional optional of supporting user specified groupers. With this approach, we sign up for introducing a public interface that user has to implement and pass it to Samza using config. Downside being it introduces additional configurations and also add on to our existing class loading approach using reflection.
I am leaning towards approach ‘A’ - partitioning at source.
End of Stream
In-memory system will leverage the EOS feature introduced in SEP-6 to mark the end of stream for bounded sources.
Proposed Changes
Architecture
Implementation
- Approach A - Use existing `BlockingEnvelopeMap` and have one common class that shares the responsibility of consumer as well as producer. The class will be responsible for handling both producing and consuming messages off the same queue.
- Approach B - Have separate producer and consumer. Tie up the consumer with the producer so that producer has hooks to produce to the same underlying `BlockingEnvelopeMap` that consumer uses.
- Approach C - Have separate consumer and producer. Introduce a custom queue that are shared between consumer and producer. The queue lifecycle is managed by the SystemAdmin.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Sample test case w/ multiple input partition using collection based system. */ ... ... ImmutableSet<IV>ImmutableList<IV> inputA = ... ImmutableSet<IV>ImmutableList<IV> inputB = ... Set<OV>List<OV> outputData = ... // mutable StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream") .from(ImmutableSetImmutableList.of(inputA, inputB)); StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream") .from(outputData); // application logic StreamApplication app = StreamApplication.create(...); app.from(input) .map(...) .sendTo(output); app.run(); app.waitForFinish(); // assertions on outputData |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Sample test case using collection based system for high level application with durable state. * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor. */ ... ... ImmutableSet<IV>ImmutableList<IV> inputA = ... ImmutableSet<IV>ImmutableList<IV> inputB = ... Set<OV>List<OV> outputData = ... // mutable StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream") .from(ImmutableSetImmutableList.of(inputA, inputB)); StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream") .from(outputData); // application logic StreamApplication app = StreamApplication.create(...); app.from(input) .map(...) .sendTo(output); app.run(); app.waitForFinish(); // assertions on outputData |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Sample test case using collection based system for low level application. */ ... ... ImmutableSet<IV>ImmutableList<IV> inputA = ... Set<OV>List<OV> outputData = ... // mutable StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app") .from(ImmutableSetImmutableList.of(inputA)); StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app") .from(outputData); // application logic StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory()) .addInputs(Collections.singletonList(input)) .addOutputs(Collections.singletonList(output)); app.run(); app.waitForFinish(); // assertions on outputData |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Sample test case using collection based system for low level application with durable state. * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor. */ ... ... ImmutableSet<IV>ImmutableList<IV> inputA = ... Set<OV>List<OV> outputData = ... // mutable StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app") .from(ImmutableSetImmutableList.of(inputA)); StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app") .from(outputData); // application logic StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory()) .addInputs(Collections.singletonList(input, changelog)) .addOutputs(Collections.singletonList(output)); app.run(); app.waitForFinish(); // assertions on outputData |
...