...
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Released: Samza 1.0
|
Purpose
This document outlines a proposal for extending Samza’s Execution Planner to verify agreement in partition count among the stream(s) behind Tables and other streams participating in Stream-Table Joins in applications written using Samza High-Level APIs.
Background
Motivating Example: Stream-Stream Join
...
For instance, to perform the operations illustrated in Fig. 1 on a stream of messages, a user can write the Samza app in listing 1 using Samza high-level API:
Fig. 1 — A logical workflow of stream processing operations |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamStreamJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph .getInputStream("S1") .filter(/* Omitted for brevity */); MessageStream s2 = graph .getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s1.join(s2, /* Omitted for brevity */) .sendTo(s3); } } |
Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow. |
...
The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of OperatorSpec
s produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.
Fig. 3 — 2 examples cases of Stream-Stream Joins. After considering the partition counts of the joined input streams, Samza’s Execution Planner accepts the one to the left but rejects the one to the right. |
...
Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.
Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property
job.intermediate.stream.partitions
.If no value is specified for
job.intermediate.stream.partitions
, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.
Fig. 4 — The OperatorSpec graph of an example high-level Samza application that employs the Partition-By operation. The Execution Planner decides to assign the partition count value 16 to intermediate stream S2′, the same value of input stream S1, since they are joined together. |
It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.
Fig. 5 — The |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream s1 = graph.getInputStream("S1"); MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */); // Assume local table Table t = graph.getTable(/* Omitted for brevity */); s1Prime.sendTo(t); MessageStream s2 = graph.getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s2.join(t, /* Omitted for brevity */) .sendTo(s3); } } |
Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2. |
...
The table below enumerates a number of cases in which Samza’s current Execution Planner does not enforce the necessary constraints on P1 and P2 to ensure the validity of the Stream-Table Join between table T and stream S2.
# | S1 | S2 | Required Constraint |
---|---|---|---|
1 | Input stream | Input stream | P1 must be equal to P2 |
2 | Input stream | Intermediate stream | P2 must be set to P1 |
3 | Intermediate stream | Input stream | P1 must be set to P2 |
4 | Intermediate stream | Intermediate stream | If the result of joining S1 and S2 is subsequently joined with an input stream S3, P1 and P2 must be set to P3. |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class StreamTableJoinApp implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { Table<KV<Integer, String>> t = graph .getTable(/* Omitted for brevity */); MessageStream s1 = graph .getInputStream("S1") .filter(/* Omitted for brevity*/); s1.sendTo(t); MessageStream s2 = graph.getInputStream("S2"); OutputStream s3 = graph.getOutputStream("S3"); s2.join(t, /* Omitted for brevity*/) .sendTo(s3); } } |
Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow. |
...
To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a SendToTableOperatorSpec
and all relevant StreamTableJoinOperatorSpec
s. One possibility that does not require changing Samza’s High-Level APIs is to modify the OperatorSpec
graph traversal such that virtual connections are assumed between every SendToTableOperatorSpec
and all the StreamTableJoinOperatorSpec
s that reference the same table (TableSpec
) in the entire OperatorSpec
graph.
Fig. 8 — A graph representing the |
...
OperatorSpec
graph analysisConversion to
StreamEdge
sJoin input validation
Details of each one of these steps are outlined below.
...
The output of this step is a collection of StreamEdge
sets, each of which represents a group of StreamEdge
s participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the OperatorSpec
graph.
...
Processing Joined Streams
In this step, we order the StreamEdge groups process every group of StreamEdge
s produced by the previous step in the following order. Each group can be composed of:
Groups of
StreamEdge
s whose all partition counts are set come firstGroups of
StreamEdge
s with a mix of set and unset partition counts come next.Groups of
StreamEdge
s where not a single StreamEdge has set its partition count set come next.with defined partition counts only (i.e. input streams), or
StreamEdges
with defined/undefined partition counts (i.e. input/intermediate streams), orStreamEdges
with undefined partition counts only (i.e. intermediate streams)
Groups of type (b) are processed such that all StreamEdge
s with undefined partition counts in a group get assigned the same partition count as the other stream(s) whose partition count is defined in the same group. It is worth noting that this assignment can change the type of other group(s) from (c) to (b) because 1 intermediate StreamEdge
can appear in multiple groups. This entails that we perform this assignment iteratively and incrementally in such a way that accounts for the interdependencies between the different groups. Finally, StreamEdge
s in groups of type (c) are assigned the maximum partition count among all input and output StreamEdge
s capped at a maximum hardcoded value of 256.
At the end of this process, StreamEdge
s in every group must have the same partition count or else the Execution Planner will reject the applicationBy processing StreamEdge groups in this order, i.e. most constrained StreamEdges first, we guarantee that we can verify agreement among input streams and assign partition counts to intermediate streams in a single scan, even if one or more StreamEdges are present in several groups.
Operations in this step tackle StreamEdge
s exclusively. No further associations with other entities are necessary.
...
Code Block | ||||
---|---|---|---|---|
| ||||
class GraphUtils { public static <T> void traverse(T vertex, Consumer<T> visitor, Function<T, Iterable<T>> getNextVertexes) { visitor.accept(vertex); for (T nextVertex : getNextVertexes.apply(vertex)) { traverse(nextVertex, visitor, getNextVertexes); } } } |
...
The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpecGraph OperatorSpec
graph starting from its InputOperatorSpecs InputOperatorSpec
s:
Code Block | ||||
---|---|---|---|---|
| ||||
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
GraphUtils.traverse(inputOpSpec, System.out::println,
OperatorSpec::getRegisteredOperatorSpecs);
} |
...
It spares us having to write redundant code for the 2 traversals needed during the
OperatorSpec
Graph Graph Analysis step.It makes it easy to carry out more traversals of any part of the OperatorSpecGraph
OperatorSpec
graph should the need arise.It allows us to write modular visitors, i.e. implementations of
Consumer<T>
, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the OperatorSpecGraph
OperatorSpec
graph, as in the second traversal in OperatorSpecGraphOperatorSpec
Graph Analysis. This way, we can keep the OperatorSpecGraphOperatorSpec
graph representation as close as possible to the way it was defined by the user without being limited by it.
This utility can be used to carry out the 2 traversals in the OperatorSpecGraph OperatorSpec
Graph Analysis step as shown below.
The first traversal can be done as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor(); for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) { GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs); } class SendToTableVisitor implements Consumer<OperatorSpec> { /* Private fields omitted for brevity. */ /** * Invoked once with every {@link OperatorSpec} encountered * during traversal. */ @Override public void accept(OperatorSpec operatorSpec) { /* Examine operatorSpec to create association. */ } } /** * Used to retrieve association after traversal is complete. */ public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> getSendToTableToStreamTableJoin() { /* Omitted for brevity. */ } } |
The SendToTableVisitor
is is a type dedicated to observing the OperatorSpecGraph OperatorSpec
graph and building the association between SendToTableOperatorSpecs SendToTableOperatorSpec
s and the StreamTableJoinOperatorSpecs StreamTableJoinOperatorSpec
s that share the same TableSpecs TableSpec
s.
The second traversal can be similarly achieved, with a custom implementation of the getNextVertexes()
...
function that relies on the association created in the previous step.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Customizes {@link OperatorSpecGraph} traversal by simulating virtual
* connections between the associated {@link SendToTableOperatorSpec}s
* and {@link StreamTableJoinOperatorSpec}s.
*/
class SendToTableConnector
implements Function<OperatorSpec, Iterable<OperatorSpec>> {
private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin;
public SendToTableConnector(
Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin) {
this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
}
@Override
public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {
if (opSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOpSpec =
(SendToTableOperatorSpec) opSpec;
return Collections.unmodifiableCollection(
sendToTableToStreamTableJoin.get(sendToTableOpSpec));
}
return opSpec.getRegisteredOperatorSpecs();
}
} |
...