Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1889

Released: Samza 1.0

Table of Contents

Purpose

This document outlines a proposal for extending Samza’s Execution Planner to verify agreement in partition count among the stream(s) behind Tables and other streams participating in Stream-Table Joins in applications written using Samza High-Level APIs.

Background

Motivating Example: Stream-Stream Join

...

For instance, to perform the operations illustrated in Fig. 1 on a stream of messages, a user can write the Samza app in listing 1 using Samza high-level API:

Image Removed

Image Added

Fig. 1 — A logical workflow of stream processing operations


Code Block
languagejava
themeEclipse
titleListing 1 — Sample application using Samza high-level API to perform Stream-Stream Join
public class StreamStreamJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity */);

      MessageStream s2 = graph
          .getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s1.join(s2, /* Omitted for brevity */)
        .sendTo(s3);
    }
}


Image Added

Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow.

Listing 1 — Sample application using Samza high-level API to perform Stream-Stream Join

This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an OperatorSpec object, a logical abstraction that describes the operation specified by the app author.

...

Image Removed

...

.

...

Some of these OperatorSpecs represent message streams which can be thought of as channels propagating data into and out of the application, e.g.

...

The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of OperatorSpecs produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.

Image RemovedImage Added

Image RemovedImage Added

Fig. 3 — 2 examples cases of Stream-Stream Joins. After considering the partition counts of the joined input streams, Samza’s Execution Planner accepts the one to the left but rejects the one to the right.

...

  1. Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.

  2. Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property job.intermediate.stream.partitions.

  3. If no value is specified for job.intermediate.stream.partitions, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.

Image RemovedImage Added

Fig. 4 — The OperatorSpec graph of an example high-level Samza application that employs the Partition-By operation. The Execution Planner decides to assign the partition count value 16 to intermediate stream S2′, the same value of input stream S1, since they are joined together.


It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.

Image RemovedImage Added

Fig. 5 — The OperatorSpec graph of an example high-level Samza application rejected by the Execution Planner due to the conflict encountered as it attempts to infer the partition count of S2′ since it is is joined with 2 input streams, S1 and S4, that have different partition counts.

...

The code sample below demonstrates how this can be achieved using Samza High-Level API.

Code Block
languagejava
themeEclipse
titleListing 2 — Sample application using Samza high-level API to perform Stream-Table Join
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {

      MessageStream s1 = graph.getInputStream("S1");
      MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */);

      // Assume local table
      Table t = graph.getTable(/* Omitted for brevity */);

      s1Prime.sendTo(t);

      MessageStream s2 = graph.getInputStream("S2");
      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity */)
        .sendTo(s3);
    }
}


Listing 2 — Sample application using Samza high-level API to perform Stream-Table Join

Image Removed

Image Added

Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2.

But since Tables can be populated with data flowing from input streams (aka local tables), it is still important to ensure that the stream used to populate the table has the same number of partitions as the stream the table is joined with. Failing to do so exposes Stream-Table Joins to the same class of problems Stream-Stream Joins could run into if Samza were to allow joining 2 streams with different partition counts, i.e. invalid Joins.

...

The table below enumerates a number of cases in which Samza’s current Execution Planner does not enforce the necessary constraints on P1 and P2 to ensure the validity of the Stream-Table Join between table T and stream S2.

#

S1

S2

Required Constraint

1

Input stream

Input stream

P1 must be equal to P2

2

Input stream

Intermediate stream

P2 must be set to P1

3

Intermediate stream

Input stream

P1 must be set to P2

4

Intermediate stream

Intermediate stream

If the result of joining S1 and S2 is subsequently joined with an input stream S3, P1 and P2 must be set to P3.

...

As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the OperatorSpec graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the OperatorSpec graph generated for the Samza application in the code sample below.

...

Code Block
languagejava
themeEclipse
titleListing 3 — Sample application using Samza high-level API to perform Stream-Table Join.
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      

...

Table<KV<Integer, 

...

String>> t = graph
        .getTable(/* Omitted for brevity */);

      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity*/);
      
      s1.sendTo(t);  
      
      MessageStream s2 = graph.getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity*/)
        .sendTo(s3);
    }
}


Listing 3 — Sample application using Samza high-level API to perform Stream-Table Join.

Image Removed

Image Added

Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow.


It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:

...

To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs. One possibility that does not require changing Samza’s High-Level APIs is to modify the OperatorSpec graph traversal such that virtual connections are assumed between every SendToTableOperatorSpec and all the StreamTableJoinOperatorSpecs that reference the same table (TableSpec) in the entire OperatorSpec graph.

Image RemovedImage Added

Fig. 8 — A graph representing the OperatorSpec graph generated for a hypothetical Samza High-Level application where stream S1 is filtered and sent-to table T which is subsequently joined with streams S2 and S3. The proposed change to Samza’s Execution Planner revolves around assuming virtual connections between SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs, as denoted by the dotted arrows.

...

Modularizing Partition Count Calculation

I propose to break down the code for verifying the validity of Join operations, in ExecutionPlanner.calculatePartitions(), into 3 different pieces:

  1. OperatorSpec graph analysis

  2. Conversion to StreamEdges

  3. Join input validation

Image Added

Details of each one of these steps are outlined below.

OperatorSpec Graph Analysis

In this phase, we perform all the necessary traversals on the OperatorSpec graph. Specifically, we need 2 traversals of the entire graph starting from InputOperatorSpecs.

  1. First traversal is a preprocessing step for the second traversal, where we create an association between every SendToTableOperatorSpec and all StreamTableJoinOperatorSpecs sharing the same TableSpec, i.e. SendToTableOperatorSpec → {StreamTableJoinOperatorSpecs}.

  2. Second traversal is for creating an association between InputOperatorSpecs and all relevant Join OperatorSpecs, i.e. InputOperatorSpec → {Join OperatorSpecs}. Stream-Table Joins are also covered in this traversal by customizing the OperatorSpec graph traversal through the association created in step 1 above to assume virtual connections.


Both traversals of the OperatorSpec graph are very similar. In essence, we’re just visiting every OperatorSpec and recording some observations upon encountering certain types of OperatorSpecs. This motivates the introduction of the graph traversal utility discussed later in Unifying Graph Traversal.

Operations in this step tackle OperatorSpecs exclusively. The outcome of this phase will be the association created in step 2 above, where InputOperatorSpecs are grouped by the Join OperatorSpecs they participate in.

Conversion to StreamEdges

In this phase, we convert every entry in the association produced by the previous step into a StreamEdge set by simply dropping the key and replacing every InputOperatorSpec with its corresponding StreamEdge.

To support tables with side-input streams, we can also examine every key (Join OperatorSpec) in the association produced by the previous step, retrieve the relevant TableSpec if the key is a StreamTableJoinOperatorSpec, and add a new input StreamEdge that corresponds to the side-input stream, to the appropriate set of StreamEdges.

The output of this step is a collection of StreamEdge sets, each of which represents a group of StreamEdges participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the OperatorSpec graph.

Processing Joined Streams

In this step, we process every group of StreamEdges produced by the previous step. Each group can be composed of:

  1. StreamEdges with defined partition counts only (i.e. input streams), or

  2. StreamEdges with defined/undefined partition counts (i.e. input/intermediate streams), or

  3. StreamEdges with undefined partition counts only (i.e. intermediate streams)

Groups of type (b) are processed such that all StreamEdges with undefined partition counts in a group get assigned the same partition count as the other stream(s) whose partition count is defined in the same group. It is worth noting that this assignment can change the type of other group(s) from (c) to (b) because 1 intermediate StreamEdge can appear in multiple groups. This entails that we perform this assignment iteratively and incrementally in such a way that accounts for the interdependencies between the different groups. Finally, StreamEdges in groups of type (c) are assigned the maximum partition count among all input and output StreamEdges capped at a maximum hardcoded value of 256.

At the end of this process, StreamEdges in every group must have the same partition count or else the Execution Planner will reject the application.

Operations in this step tackle StreamEdges exclusively. No further associations with other entities are necessary.

Unifying Graph Traversal

I propose to introduce the following general stateless graph traversal utility for use in OperatorSpec Graph Analysis.

Code Block
languagejava
themeEclipse
class GraphUtils {
  public static <T> void traverse(T vertex, Consumer<T> visitor, 
      Function<T, Iterable<T>> getNextVertexes) {


    visitor.accept(vertex);
    for (T nextVertex : getNextVertexes.apply(vertex)) {
      traverse(nextVertex, visitor, getNextVertexes);
    }
  }
}


The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpec graph starting from its InputOperatorSpecs:

Code Block
languagejava
themeEclipse
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, System.out::println, 
        OperatorSpec::getRegisteredOperatorSpecs);
}


Introducing such utility has several advantages:

  1. It spares us having to write redundant code for the 2 traversals needed during the OperatorSpec Graph Analysis step.

  2. It makes it easy to carry out more traversals of any part of the OperatorSpec graph should the need arise.

  3. It allows us to write modular visitors, i.e. implementations of Consumer<T>, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.

  4. It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the OperatorSpec graph, as in the second traversal in OperatorSpec Graph Analysis. This way, we can keep the OperatorSpec graph representation as close as possible to the way it was defined by the user without being limited by it.

This utility can be used to carry out the 2 traversals in the OperatorSpec Graph Analysis step as shown below.

The first traversal can be done as follows:

Code Block
languagejava
themeEclipse
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();

for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, sendToTableVisitor, 
        OperatorSpec::getRegisteredOperatorSpecs);
}


class SendToTableVisitor implements Consumer<OperatorSpec> {
    /* Private fields omitted for brevity. */
    /**
     * Invoked once with every {@link OperatorSpec} encountered 
     * during traversal.
     */
    @Override
    public void accept(OperatorSpec operatorSpec) {
        /* Examine operatorSpec to create association. */
      }
    }

    /**
     * Used to retrieve association after traversal is complete.
     */
    public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
         getSendToTableToStreamTableJoin() {
        /* Omitted for brevity. */
    }
}

The SendToTableVisitor is a type dedicated to observing the OperatorSpec graph and building the association between SendToTableOperatorSpecs and the StreamTableJoinOperatorSpecs that share the same TableSpecs.

The second traversal can be similarly achieved, with a custom implementation of the getNextVertexes() function that relies on the association created in the previous step.

Code Block
languagejava
themeEclipse
/**
 * Customizes {@link OperatorSpecGraph} traversal by simulating virtual
 * connections between the associated {@link SendToTableOperatorSpec}s
 * and {@link StreamTableJoinOperatorSpec}s. 
 */ 
class SendToTableConnector 
        implements Function<OperatorSpec, Iterable<OperatorSpec>> {

    private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
        sendToTableToStreamTableJoin;

    public SendToTableConnector(
        Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableToStreamTableJoin) {
      this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
    }

    @Override
    public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {

      if (opSpec instanceof SendToTableOperatorSpec) {
        SendToTableOperatorSpec sendToTableOpSpec = 
            (SendToTableOperatorSpec) opSpec;

        return Collections.unmodifiableCollection(
            sendToTableToStreamTableJoin.get(sendToTableOpSpec));
      }

      return opSpec.getRegisteredOperatorSpecs();
    }
  }