...
Fig. 1 — A logical workflow of stream processing operations |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamStreamJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph .getInputStream("S1") .filter(/* Omitted for brevity */); MessageStream s2 = graph .getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s1.join(s2, /* Omitted for brevity */) .sendTo(s3); } } |
Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow. |
This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an OperatorSpec
object, a logical abstraction that describes the operation specified by the app author.
...
...
.
...
Some of these OperatorSpecs
represent message streams which can be thought of as channels propagating data into and out of the application, e.g.
...
The code sample below demonstrates how this can be achieved using Samza High-Level API.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph.getInputStream("S1"); MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */); // Assume local table Table t = graph.getTable(/* Omitted for brevity */); s1Prime.sendTo(t); MessageStream s2 = graph.getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s2.join(t, /* Omitted for brevity */) .sendTo(s3); } } |
...
Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2. |
But since Tables can be populated with data flowing from input streams (aka local tables), it is still important to ensure that the stream used to populate the table has the same number of partitions as the stream the table is joined with. Failing to do so exposes Stream-Table Joins to the same class of problems Stream-Stream Joins could run into if Samza were to allow joining 2 streams with different partition counts, i.e. invalid Joins.
...
As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the OperatorSpec
graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the OperatorSpec
graph generated for the Samza application in the code sample below.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { |
...
Table<KV<Integer, |
...
String>> t = graph .getTable(/* Omitted for brevity */); MessageStream s1 = graph .getInputStream("S1") .filter(/* Omitted for brevity*/); s1.sendTo(t); MessageStream s2 = graph.getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s2.join(t, /* Omitted for brevity*/) .sendTo(s3); } } |
Listing 3 — Sample application using Samza high-level API to perform Stream-Table Join.
Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow. |
It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:
...
Fig. 8 — A graph representing the |
...
Modularizing Partition Count Calculation
I propose to break down the code for verifying the validity of Join operations, in ExecutionPlanner.calculatePartitions()
, into 3 different pieces:
OperatorSpec
graph analysisConversion to
StreamEdge
sJoin input validation
Details of each one of these steps are outlined below.
OperatorSpec Graph Analysis
In this phase, we perform all the necessary traversals on the OperatorSpec
graph. Specifically, we need 2 traversals of the entire graph starting from InputOperatorSpec
s.
First traversal is a preprocessing step for the second traversal, where we create an association between every
SendToTableOperatorSpec
and allStreamTableJoinOperatorSpec
s sharing the sameTableSpec
, i.e.SendToTableOperatorSpec
→ {StreamTableJoinOperatorSpec
s}.Second traversal is for creating an association between
InputOperatorSpec
s and all relevant JoinOperatorSpecs
, i.e.InputOperatorSpec
→ {JoinOperatorSpecs
}. Stream-Table Joins are also covered in this traversal by customizing theOperatorSpec
graph traversal through the association created in step 1 above to assume virtual connections.
Both traversals of the OperatorSpec
graph are very similar. In essence, we’re just visiting every OperatorSpec
and recording some observations upon encountering certain types of OperatorSpec
s. This motivates the introduction of the graph traversal utility discussed later in Unifying Graph Traversal.
Operations in this step tackle OperatorSpec
s exclusively. The outcome of this phase will be the association created in step 2 above, where InputOperatorSpec
s are grouped by the Join OperatorSpec
s they participate in.
Conversion to StreamEdges
In this phase, we convert every entry in the association produced by the previous step into a StreamEdge
set by simply dropping the key and replacing every InputOperatorSpec
with its corresponding StreamEdge
.
To support tables with side-input streams, we can also examine every key (Join OperatorSpec
) in the association produced by the previous step, retrieve the relevant TableSpec
if the key is a StreamTableJoinOperatorSpec
, and add a new input StreamEdge
that corresponds to the side-input stream, to the appropriate set of StreamEdge
s.
The output of this step is a collection of StreamEdge
sets, each of which represents a group of StreamEdge
s participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the OperatorSpec
graph.
Join Input Validation
In this step, we order the StreamEdge groups produced by the previous step in the following order:
Groups of
StreamEdge
s whose all partition counts are set come firstGroups of
StreamEdge
s with a mix of set and unset partition counts come next.Groups of
StreamEdge
s where not a single StreamEdge has set its partition count set come next.
By processing StreamEdge groups in this order, i.e. most constrained StreamEdges first, we guarantee that we can verify agreement among input streams and assign partition counts to intermediate streams in a single scan, even if one or more StreamEdges are present in several groups.
Operations in this step tackle StreamEdge
s exclusively. No further associations with other entities are necessary.
Unifying Graph Traversal
I propose to introduce the following general stateless graph traversal utility for use in OperatorSpec
Graph Analysis.
Code Block | ||||
---|---|---|---|---|
| ||||
class GraphUtils {
public static <T> void traverse(T vertex, Consumer<T> visitor,
Function<T, Iterable<T>> getNextVertexes) {
visitor.accept(vertex);
for (T nextVertex : getNextVertexes.apply(vertex)) {
traverse(nextVertex, visitor, getNextVertexes);
}
}
} |
The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpecGraph starting from its InputOperatorSpecs:
Code Block | ||||
---|---|---|---|---|
| ||||
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
GraphUtils.traverse(inputOpSpec, System.out::println,
OperatorSpec::getRegisteredOperatorSpecs);
} |
Introducing such utility has several advantages:
It spares us having to write redundant code for the 2 traversals needed during the OperatorSpec Graph Analysis step.
It makes it easy to carry out more traversals of any part of the OperatorSpecGraph should the need arise.
It allows us to write modular visitors, i.e. implementations of Consumer<T>, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.
It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the OperatorSpecGraph, as in the second traversal in OperatorSpecGraph Analysis. This way, we can keep the OperatorSpecGraph representation as close as possible to the way it was defined by the user without being limited by it.
This utility can be used to carry out the 2 traversals in the OperatorSpecGraph Analysis step as shown below.
The first traversal can be done as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs);
}
class SendToTableVisitor implements Consumer<OperatorSpec> {
/* Private fields omitted for brevity. */
/**
* Invoked once with every {@link OperatorSpec} encountered
* during traversal.
*/
@Override
public void accept(OperatorSpec operatorSpec) {
/* Examine operatorSpec to create association. */
}
}
/**
* Used to retrieve association after traversal is complete.
*/
public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
getSendToTableToStreamTableJoin() {
/* Omitted for brevity. */
}
} |
The SendToTableVisitor is a type dedicated to observing the OperatorSpecGraph and building the association between SendToTableOperatorSpecs and the StreamTableJoinOperatorSpecs that share the same TableSpecs.
The second traversal can be similarly achieved, with a custom implementation of the getNextVertexes() function that relies on the association created in the previous step.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Customizes {@link OperatorSpecGraph} traversal by simulating virtual
* connections between the associated {@link SendToTableOperatorSpec}s
* and {@link StreamTableJoinOperatorSpec}s.
*/
class SendToTableConnector
implements Function<OperatorSpec, Iterable<OperatorSpec>> {
private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin;
public SendToTableConnector(
Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin) {
this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
}
@Override
public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {
if (opSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOpSpec =
(SendToTableOperatorSpec) opSpec;
return Collections.unmodifiableCollection(
sendToTableToStreamTableJoin.get(sendToTableOpSpec));
}
return opSpec.getRegisteredOperatorSpecs();
}
} |