Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Fig. 1 — A logical workflow of stream processing operations


Code Block
languagejava
themeEclipse
titleListing 1 — Sample application using Samza high-level API to perform Stream-Stream Join
public class StreamStreamJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity */);

      MessageStream s2 = graph
          .getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s1.join(s2, /* Omitted for brevity */)
        .sendTo(s3);
    }
}


Image Added

Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow.

Listing 1 — Sample application using Samza high-level API to perform Stream-Stream Join

This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an OperatorSpec object, a logical abstraction that describes the operation specified by the app author.

...

Image Removed

...

.

...


Some of these OperatorSpecs represent message streams which can be thought of as channels propagating data into and out of the application, e.g.

...

The code sample below demonstrates how this can be achieved using Samza High-Level API.

...

Code Block
languagejava
themeEclipse
titleListing 2 — Sample application using Samza high-level API to perform Stream-Table Join
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {

      MessageStream s1 = graph.getInputStream("S1");
      MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */);

      // Assume local table
      Table t = graph.getTable(/* Omitted for brevity */);

      s1Prime.sendTo(t);

      MessageStream s2 = graph.getInputStream("S2");
      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity */)
        .sendTo(s3);
    }
}

...



Image Modified

Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2.


But since Tables can be populated with data flowing from input streams (aka local tables), it is still important to ensure that the stream used to populate the table has the same number of partitions as the stream the table is joined with. Failing to do so exposes Stream-Table Joins to the same class of problems Stream-Stream Joins could run into if Samza were to allow joining 2 streams with different partition counts, i.e. invalid Joins.

...

As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the OperatorSpec graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the OperatorSpec graph generated for the Samza application in the code sample below.

...

Code Block
languagejava
themeEclipse
titleListing 3 — Sample application using Samza high-level API to perform Stream-Table Join.
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      

...

Table<KV<Integer, 

...

String>> t = graph
        .getTable(/* Omitted for brevity */);

      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity*/);
      
      s1.sendTo(t);  
      
      MessageStream s2 = graph.getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity*/)
        .sendTo(s3);
    }
}



Listing 3 — Sample application using Samza high-level API to perform Stream-Table Join.

Image Modified

Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow.


It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:

...

Fig. 8 — A graph representing the OperatorSpec graph generated for a hypothetical Samza High-Level application where stream S1 is filtered and sent-to table T which is subsequently joined with streams S2 and S3. The proposed change to Samza’s Execution Planner revolves around assuming virtual connections between SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs, as denoted by the dotted arrows.

...

Modularizing Partition Count Calculation

I propose to break down the code for verifying the validity of Join operations, in ExecutionPlanner.calculatePartitions(), into 3 different pieces:

  1. OperatorSpec graph analysis

  2. Conversion to StreamEdges

  3. Join input validation

Image Added

Details of each one of these steps are outlined below.

OperatorSpec Graph Analysis

In this phase, we perform all the necessary traversals on the OperatorSpec graph. Specifically, we need 2 traversals of the entire graph starting from InputOperatorSpecs.

  1. First traversal is a preprocessing step for the second traversal, where we create an association between every SendToTableOperatorSpec and all StreamTableJoinOperatorSpecs sharing the same TableSpec, i.e. SendToTableOperatorSpec → {StreamTableJoinOperatorSpecs}.

  2. Second traversal is for creating an association between InputOperatorSpecs and all relevant Join OperatorSpecs, i.e. InputOperatorSpec → {Join OperatorSpecs}. Stream-Table Joins are also covered in this traversal by customizing the OperatorSpec graph traversal through the association created in step 1 above to assume virtual connections.


Both traversals of the OperatorSpec graph are very similar. In essence, we’re just visiting every OperatorSpec and recording some observations upon encountering certain types of OperatorSpecs. This motivates the introduction of the graph traversal utility discussed later in Unifying Graph Traversal.

Operations in this step tackle OperatorSpecs exclusively. The outcome of this phase will be the association created in step 2 above, where InputOperatorSpecs are grouped by the Join OperatorSpecs they participate in.

Conversion to StreamEdges

In this phase, we convert every entry in the association produced by the previous step into a StreamEdge set by simply dropping the key and replacing every InputOperatorSpec with its corresponding StreamEdge.

To support tables with side-input streams, we can also examine every key (Join OperatorSpec) in the association produced by the previous step, retrieve the relevant TableSpec if the key is a StreamTableJoinOperatorSpec, and add a new input StreamEdge that corresponds to the side-input stream, to the appropriate set of StreamEdges.

The output of this step is a collection of StreamEdge sets, each of which represents a group of StreamEdges participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the OperatorSpec graph.

Join Input Validation

In this step, we order the StreamEdge groups produced by the previous step in the following order:

  1. Groups of StreamEdges whose all partition counts are set come first

  2. Groups of StreamEdges with a mix of set and unset partition counts come next.

  3. Groups of StreamEdges where not a single StreamEdge has set its partition count set come next.

By processing StreamEdge groups in this order, i.e. most constrained StreamEdges first, we guarantee that we can verify agreement among input streams and assign partition counts to intermediate streams in a single scan, even if one or more StreamEdges are present in several groups.

Operations in this step tackle StreamEdges exclusively. No further associations with other entities are necessary.

Unifying Graph Traversal

I propose to introduce the following general stateless graph traversal utility for use in OperatorSpec Graph Analysis.

Code Block
languagejava
themeEclipse
class GraphUtils {
  public static <T> void traverse(T vertex, Consumer<T> visitor, 

    Function<T, Iterable<T>> getNextVertexes) {


    visitor.accept(vertex);
    for (T nextVertex : getNextVertexes.apply(vertex)) {
      traverse(nextVertex, visitor, getNextVertexes);
    }
  }
}


The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpecGraph starting from its InputOperatorSpecs:

Code Block
languagejava
themeEclipse
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, System.out::println, 
       OperatorSpec::getRegisteredOperatorSpecs);
}


Introducing such utility has several advantages:

  1. It spares us having to write redundant code for the 2 traversals needed during the OperatorSpec Graph Analysis step.

  2. It makes it easy to carry out more traversals of any part of the OperatorSpecGraph should the need arise.

  3. It allows us to write modular visitors, i.e. implementations of Consumer<T>, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.

  4. It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the OperatorSpecGraph, as in the second traversal in OperatorSpecGraph Analysis. This way, we can keep the OperatorSpecGraph representation as close as possible to the way it was defined by the user without being limited by it.

This utility can be used to carry out the 2 traversals in the OperatorSpecGraph Analysis step as shown below.

  1. The first traversal can be done as follows:

Code Block
languagejava
themeEclipse
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();

for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs);
}


class SendToTableVisitor implements Consumer<OperatorSpec> {
    /* Private fields omitted for brevity. */
    /**
     * Invoked once with every {@link OperatorSpec} encountered 
     * during traversal.
     */
    @Override
    public void accept(OperatorSpec operatorSpec) {
        /* Examine operatorSpec to create association. */
      }
    }

    /**
     * Used to retrieve association after traversal is complete.
     */
    public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
        getSendToTableToStreamTableJoin() {
        /* Omitted for brevity. */
    }
}

The SendToTableVisitor is a type dedicated to observing the OperatorSpecGraph and building the association between SendToTableOperatorSpecs and the StreamTableJoinOperatorSpecs that share the same TableSpecs.

  1. The second traversal can be similarly achieved, with a custom implementation of the getNextVertexes() function that relies on the association created in the previous step.

Code Block
languagejava
themeEclipse
/**
 * Customizes {@link OperatorSpecGraph} traversal by simulating virtual
 * connections between the associated {@link SendToTableOperatorSpec}s
 * and {@link StreamTableJoinOperatorSpec}s. 
 */ 
class SendToTableConnector 
        implements Function<OperatorSpec, Iterable<OperatorSpec>> {

    private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
        sendToTableToStreamTableJoin;

    public SendToTableConnector(
        Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
             sendToTableToStreamTableJoin) {
      this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
    }

    @Override
    public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {

      if (opSpec instanceof SendToTableOperatorSpec) {
        SendToTableOperatorSpec sendToTableOpSpec = 
            (SendToTableOperatorSpec) opSpec;

        return Collections.unmodifiableCollection(
            sendToTableToStreamTableJoin.get(sendToTableOpSpec));
      }

      return opSpec.getRegisteredOperatorSpecs();
    }
  }