Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ]Accepted

Discussion threadAdd in-memory system

...

I am leaning towards Approach C as its simpler, not tied to BlockingEnvelopeMap and has separation of concerns.

...

Example Usages

High level stateless application

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case w/ multiple input partition using collection based system.
 */
 
...
...
 
ImmutableList<IV> inputA = ...
ImmutableList<IV> inputB = ...
 
List<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableList.of(inputA, inputB));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData

...

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for high level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */
 
...
...
 
ImmutableList<IV> inputA = ...
 
List<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableList.of(inputA));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData
 


Low level stateless application

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application.
 */

...
...

ImmutableList<IV> inputA = ...

List<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData


Low level application with
durable state

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */

...
...

ImmutableList<IV> inputA = ...

List<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with custom IME

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application using custom IME.
 * It demonstrates the use of IME as a data source as opposed to raw message. The users are responsible for creating a 
 * complete IME object with partition information.
 */
 
...
...
 
ImmutableList<MyIME> inputData = Utils.createMyIME(...);
 
List<OV> outputData = ... // mutable
 
StreamDescriptor.Input<Object, Object> input = StreamDescriptor.<>input("input-stream-low-level-app")
	.from(inputData);
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);
 
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with manual checkpoint

Unsupported Use cases

For In V1, we don't support checkpointing

...

languagejava
themeEmacs
collapsetrue

...

will not support the following use cases since it has a depdencies.

  • High level application with durable state
  • Low level application with durable state
  • Application with manual checkpoint. Note. Manual checkpointing will result in a no-op and might not result in desired behaviour.

...

Samza SQL application

Users should be able to leverage in-memory collection based system to test Samza SQL application provided Samza SQL integrates with SEP-2.

...