Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In-memory system is applicable only for jobs in local execution environment. Remote execution environment isn’t supported.

  2. The scope of in-memory system and the data it handles are limited to a container. I.e. there is no support for process to process interaction or sharing.

  3. Checkpointing is not supported and consumers always start from the beginning in case of restart.

  4. In-memory system doesn’t support persistence and is not the source of truth for the data. The data in the queue is lost when the job restarts or shutdowns unexpectedly.

Design

 

Data Source

The input data source for the in-memory system can be broadly classified as bounded and unbounded data. We are limiting the scope of this SEP to only bounded data source that is immutable as the input source. It simplifies the view of the data and also the initialization step for the consumers. However, in-memory system for intermediate streams supports both bounded and unbounded data. We will introduce a configuration to tune memory pool for the intermediate queue. 

...

Architecture


 

 


Implementation

 

  • A - Use existing `BlockingEnvelopeMap` and have one common class that shares the responsibility of consumer as well as producer. The class will be responsible for handling both producing and consuming messages off the same queue.
  • Approach B - Have separate producer and consumer. Tie up the consumer with the producer so that producer has hooks to produce to the same underlying `BlockingEnvelopeMap` that consumer uses.
  • Approach C - Have separate consumer and producer. Introduce a custom queue that are shared between consumer and producer. The queue lifecycle is managed by the SystemAdmin.

Test Plan

High level application

 

code
Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case w/ multiple input partition using collection based system.
 */
 
...
...
 
ImmutableSet<IV> inputA = ...
ImmutableSet<IV> inputB = ...
 
Set<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableSet.of(inputA, inputB));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData

High level application with durable state

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for high level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */
 
...
...
 
ImmutableSet<IV> inputA = ...
ImmutableSet<IV> inputB = ...
 
Set<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableSet.of(inputA, inputB));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData
 

Low level applicationCollection based systems without the support of sharing state across processes are not applicable for low level application since they are fragmented in their nature and run on different process even within the same host.

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application.
 */

...
...

ImmutableSet<IV> inputA = ...

Set<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableSet.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with durable state

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */

...
...

ImmutableSet<IV> inputA = ...

Set<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableSet.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with manual checkpoint

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application with manual checkpointing.
 * Note: The checkpoint for in-memory system should result in a no-op.
 */

...
...

// Shouldn't be any different from testing a low level application 

Samza SQL application

Users should be able to leverage in-memory collection based system to test Samza SQL application.
Details: TBD

...