Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ]

...

Samza has a pluggable system design allowing users to implement their own system consumers & producers. Typically, consumers consume raw message and wrap them using in an IME. However, it is possible for some systems to introduce subclass of IME and pass them to the tasks instead. For this reason, we need to support for different data types within in-memory collection.

  1. Raw messages: In-memory system will behave like a typical consumer and wrap the raw message using in an IME. The offset and key fields for the message are populated by the in-memory system. Note, the offset is defined as the position of the data in the collection and the key is the hash code of the raw message. If the user needs fine grained control on these fields, they should construct their own IME.
  2. Type of IME:  In-memory system acts as a pass through system consumer, passing the actual message envelope to the task without any wrappingprocessing.

Data Partitioning

Samza is a distributed stream processing framework that achieves parallelism with partitioned data. With a bounded data source, we need to think about how the data is going to be partitioned and how do we map data it is mapped to SystemStreamPartition in Samza. Partitioning is only interesting in the case when the input source is raw message. With IME, partitioning data information is already part of it and in-memory system will respect the partition information within the IME.

...

  • Approach A - Use existing BlockingEnvelopeMap and have one common class that shares the responsibility of consumer as well as producer. The class will be responsible for handling both producing and consuming messages off the same queue.
  • Approach B - Have separate producer and consumer. Tie up the consumer with the producer so that producer has hooks to produce to the same underlying `BlockingEnvelopeMap` that consumer uses.
  • Approach C - Have separate consumer and producer. Introduce a custom queue that are shared between consumer and producer. The queue lifecycle is managed by the SystemAdmin.

Test Plan

High level stateless application

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case w/ multiple input partition using collection based system.
 */
 
...
...
 
ImmutableList<IV> inputA = ...
ImmutableList<IV> inputB = ...
 
List<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableList.of(inputA, inputB));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData

...

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for high level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */
 
...
...
 
ImmutableList<IV> inputA = ...
 
List<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableList.of(inputA));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData
 

Low level stateless application

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application.
 */

...
...

ImmutableList<IV> inputA = ...

List<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

...

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application with durable state.
 * Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
 */

...
...

ImmutableList<IV> inputA = ...

List<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with custom IME

 

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application using custom IME.
 * It demonstrates the use of IME as a data source as opposed to raw message. The users are responsible for creating a 
 * complete IME object with partition information.
 */
 
...
...
 
ImmutableList<MyIME> inputData = Utils.createMyIME(...);
 
List<OV> outputData = ... // mutable
 
StreamDescriptor.Input<Object, Object> input = StreamDescriptor.<>input("input-stream-low-level-app")
	.from(inputData);
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);
 
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Low level application with manual checkpoint

In V1, we don't support checkpointing

Code Block
languagejava
themeEmacs
collapsetrue
/**
 * Sample test case using collection based system for low level application with manual checkpointing.
 * Note: The checkpoint for in-memory system should result in a no-op.
 */

...
...

// Shouldn't be any different from testing a low level application 

...

Users should be able to leverage in-memory collection based system to test Samza SQL application .
Details: TBDprovided Samza SQL integrates with SEP-2.