...
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: Samza 1.0
|
location | top |
---|
Table of Contents |
---|
Purpose
This document outlines a proposal for extending Samza’s Execution Planner to verify agreement in partition count among the stream(s) behind Tables and other streams participating in Stream-Table Joins in applications written using Samza High-Level APIs.
...
For instance, to perform the operations illustrated in Fig. 1 on a stream of messages, a user can write the Samza app in listing 1 using Samza high-level API:
...
align | center |
---|
...
Fig. 1 — A logical workflow of stream processing operations |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamStreamJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph .getInputStream("S1") .filter(/* Omitted for brevity */); MessageStream s2 = graph .getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s1.join(s2, /* Omitted for brevity */) .sendTo(s3); } } | ||||||
Align | ||||||
| ||||||
Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow. |
This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an an OperatorSpec
object, a logical abstraction that describes the operation specified by the app author.
Some of these OperatorSpecs
represent message streams which can be thought of as channels propagating data into and out of the application, e.g.
InputOperatorSpecs
represent input data streams, e.g. S1 and S2, from which input messages are read.OutputOperatorSpecs
represent output data streams, e.g. S3, to which processed data is produced.
...
The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of OperatorSpec
s produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.
...
...
align | center |
---|
...
...
Fig. 3 — 2 examples cases of Stream-Stream Joins. After considering the partition counts of the joined input streams, Samza’s Execution Planner accepts the one to the left but rejects the one to the right. |
Inferring Partition Counts of Intermediate Streams
Another closely-related responsibility of Samza’s Execution Planner is inferring partition counts of all intermediate streams present in the OperatorSpec
graph. Such streams are introduced into the OperatorSpec
graph whenever the Partition-By operation is used, and are represented by the same type of OperatorSpec
s used to represent input streams, i.e. InputOperatorSpec
. Unlike input streams however, intermediate streams have no defined partition counts by default. As we said, it is the Execution Planner that decides the partition count of every intermediate stream after traversing the OperatorSpec
graph, according to the following rules:Inferring Partition Counts of Intermediate Streams
Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.
Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property
job.intermediate.stream.partitions
.If no value is specified for
job.intermediate.stream.partitions
, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.
...
align | center |
---|
...
Fig. 4 — The OperatorSpec graph of an example high-level Samza application that employs the Partition-By operation. The Execution Planner decides to assign the partition count value 16 to intermediate stream S2′, the same value of input stream S1, since they are joined together. |
...
align | center |
---|
It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.
...
Fig. 5 — The |
...
it is is joined with 2 input streams, S1 and S4, that have different partition counts. |
Tables and Stream-Table Joins
A recent addition to Samza is the introduction of Table, a key-value abstraction that facilitates accessing remotely stored data. And with this addition, it was also made possible to perform Join operations between Tables and streams.
The code sample below demonstrates how this can be achieved using Samza High-Level API.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph.getInputStream("S1"); MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */); // Assume local table Table t = graph.getTable(/* Omitted for brevity */); s1Prime.sendTo(t); MessageStream s2 = graph.getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s2.join(t, /* Omitted for brevity */) .sendTo(s3); } } | ||||||
Align | ||||||
| ||||||
Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2. |
But since Tables can be populated with data flowing from input streams (aka local tables), it is still important to ensure that the stream used to populate the table has the same number of partitions as the stream the table is joined with. Failing to do so exposes Stream-Table Joins to the same class of problems Stream-Stream Joins could run into if Samza were to allow joining 2 streams with different partition counts, i.e. invalid Joins.
Side-Input Streams
Another recent addition to Samza that is related to Tables is Side-Input Streams. Simply put, this feature allows Samza application authors to specify that a table should be populated — and constantly updated — with data from one or more input streams. Such streams have been given the name Side-Input Streams or Side-Inputs for short.
...
The table below enumerates a number of cases in which Samza’s current Execution Planner does not enforce the necessary constraints on P1 and P2 to ensure the validity of the Stream-Table Join between table T and stream S2.
...
# | S1 | S2 | Required Constraint |
---|---|---|---|
1 | Input stream | Input stream | P1 must be equal to P2 |
2 | Input stream | Intermediate stream | P2 must be set to P1 |
3 | Intermediate stream | Input stream | P1 must be set to P2 |
4 | Intermediate stream | Intermediate stream | If the result of joining S1 and S2 is subsequently joined with an input stream S3, P1 and P2 must be |
...
set to P3. |
Cases #1 and #2 apply equally well if S1 is a side-input stream.
In all these cases, Samza application authors have no defense against essentially invalid Stream-Table Joins.Cases #1 and #2 apply equally well if S1 is a side-input stream.
Problem Analysis
As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the OperatorSpec
graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the OperatorSpec
graph generated for the Samza application in the code sample below.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
Table<KV<Integer, String>> t = graph
.getTable(/* Omitted for brevity */);
MessageStream s1 = graph
.getInputStream("S1")
.filter(/* Omitted for brevity*/);
s1.sendTo(t);
MessageStream s2 = graph.getInputStream("S2");
OutputStream s3 = graph.getOutputStream("S3");
s2.join(t, /* Omitted for brevity*/)
.sendTo(s3);
}
}
| ||||||
Align | ||||||
| ||||||
Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow. |
It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:
A different
OperatorSpec
,StreamTableJoinOperator
, is used to represent Stream-Table Join operations.A new terminal
SendToTableOperatorSpec
is used to represent the operation of producing data to a table.The
StreamTableJoinOperatorSpec
is not connected to theSendToTableOperatorSpec
. It only has a reference to the table (TableSpec
) participating in the Stream-Table Join operation.
...
To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a SendToTableOperatorSpec
and all relevant StreamTableJoinOperatorSpec
s. One possibility that does not require changing Samza’s High-Level APIs is to modify the OperatorSpec
graph traversal such that virtual connections are assumed between every SendToTableOperatorSpec
and all the StreamTableJoinOperatorSpec
s that reference the same table (TableSpec
) in the entire OperatorSpec
graph.
...
align | center |
---|
...
Fig. 8 — A graph representing the |
Modularizing Partition Count Calculation
I propose to break down the code for verifying the validity of Join operations, in ExecutionPlanner.calculatePartitions()
, into 3 different pieces:
OperatorSpec
graph analysisConversion to
StreamEdge
sJoin input validation
Details of each one of these steps are outlined below.
...
Operations in this step tackle StreamEdge
s exclusively. No further associations with other entities are necessary.
Unifying Graph Traversal
I propose to introduce the following general stateless graph traversal utility for use in OperatorSpec
Graph Analysis.
Code Block | ||||
---|---|---|---|---|
| ||||
class GraphUtils { public static <T> void traverse(T vertex, Consumer<T> visitor, Function<T, Iterable<T>> getNextVertexes) { visitor.accept(vertex); for (T nextVertex : getNextVertexes.apply(vertex)) { traverse(nextVertex, visitor, getNextVertexes); } } } |
The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpec
graph starting from its InputOperatorSpec
s:
Code Block | ||||
---|---|---|---|---|
| ||||
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
GraphUtils.traverse(inputOpSpec, System.out::println,
OperatorSpec::getRegisteredOperatorSpecs);
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor(); for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) { GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs); } class SendToTableVisitor implements Consumer<OperatorSpec> { /* Private fields omitted for brevity. */ /** * Invoked once with every {@link OperatorSpec} encountered * during traversal. */ @Override public void accept(OperatorSpec operatorSpec) { /* Examine operatorSpec to create association. */ } } /** * Used to retrieve association after traversal is complete. */ public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> getSendToTableToStreamTableJoin() { /* Omitted for brevity. */ } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Customizes {@link OperatorSpecGraph} traversal by simulating virtual
* connections between the associated {@link SendToTableOperatorSpec}s
* and {@link StreamTableJoinOperatorSpec}s.
*/
class SendToTableConnector
implements Function<OperatorSpec, Iterable<OperatorSpec>> {
private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin;
public SendToTableConnector(
Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin) {
this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
}
@Override
public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {
if (opSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOpSpec =
(SendToTableOperatorSpec) opSpec;
return Collections.unmodifiableCollection(
sendToTableToStreamTableJoin.get(sendToTableOpSpec));
}
return opSpec.getRegisteredOperatorSpecs();
}
} |
...