Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1889

Released: Samza 1.0

Table of Contents

exclude^$

Purpose

This document outlines a proposal for extending Samza’s Execution Planner to verify agreement in partition count among the stream(s) behind Tables and other streams participating in Stream-Table Joins in applications written using Samza High-Level APIs.

Background

Motivating Example: Stream-Stream Join

...

For instance, to perform the operations illustrated in Fig. 1 on a stream of messages, a user can write the Samza app in listing 1 using Samza high-level API:

Image Removed

Image Added

Fig. 1 — A logical workflow of stream processing operations

...

Code Block
languagejava
themeEclipse
titleListing 1 — Sample application using Samza high-level API to perform Stream-Stream Join
public class StreamStreamJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity */);

      MessageStream s2 = graph
          .getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s1.join(s2, /* Omitted for brevity */)
        .sendTo(s3);
    }
}


Image RemovedImage Added

Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow.

...

The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of OperatorSpecs produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.

Image RemovedImage Added

Image RemovedImage Added

Fig. 3 — 2 examples cases of Stream-Stream Joins. After considering the partition counts of the joined input streams, Samza’s Execution Planner accepts the one to the left but rejects the one to the right.

...

  1. Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.

  2. Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property job.intermediate.stream.partitions.

  3. If no value is specified for job.intermediate.stream.partitions, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.

Image RemovedImage Added

Fig. 4 — The OperatorSpec graph of an example high-level Samza application that employs the Partition-By operation. The Execution Planner decides to assign the partition count value 16 to intermediate stream S2′, the same value of input stream S1, since they are joined together.


It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.

Image RemovedImage Added

Fig. 5 — The OperatorSpec graph of an example high-level Samza application rejected by the Execution Planner due to the conflict encountered as it attempts to infer the partition count of S2′ since it is is joined with 2 input streams, S1 and S4, that have different partition counts.

...

Code Block
languagejava
themeEclipse
titleListing 2 — Sample application using Samza high-level API to perform Stream-Table Join
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {

      MessageStream s1 = graph.getInputStream("S1");
      MessageStream s1Prime = s1.partitionBy(/* Omitted for brevity */);

      // Assume local table
      Table t = graph.getTable(/* Omitted for brevity */);

      s1Prime.sendTo(t);

      MessageStream s2 = graph.getInputStream("S2");
      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity */)
        .sendTo(s3);
    }
}


Image RemovedImage Added

Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2.

...

Code Block
languagejava
themeEclipse
titleListing 3 — Sample application using Samza high-level API to perform Stream-Table Join.
public class StreamTableJoinApp implements StreamApplication {  
   @Override
    public void init(StreamGraph graph, Config config) {
      Table<KV<Integer, String>> t = graph
        .getTable(/* Omitted for brevity */);

      MessageStream s1 = graph
          .getInputStream("S1")
          .filter(/* Omitted for brevity*/);
      
      s1.sendTo(t);  
      
      MessageStream s2 = graph.getInputStream("S2");

      OutputStream s3 = graph.getOutputStream("S3");

      s2.join(t, /* Omitted for brevity*/)
        .sendTo(s3);
    }
}


Image Removed

Image Added

Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow.

...

To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs. One possibility that does not require changing Samza’s High-Level APIs is to modify the OperatorSpec graph traversal such that virtual connections are assumed between every SendToTableOperatorSpec and all the StreamTableJoinOperatorSpecs that reference the same table (TableSpec) in the entire OperatorSpec graph.

Image RemovedImage Added

Fig. 8 — A graph representing the OperatorSpec graph generated for a hypothetical Samza High-Level application where stream S1 is filtered and sent-to table T which is subsequently joined with streams S2 and S3. The proposed change to Samza’s Execution Planner revolves around assuming virtual connections between SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs, as denoted by the dotted arrows.

...

  1. OperatorSpec graph analysis

  2. Conversion to StreamEdges

  3. Join input validation

Image RemovedImage Added

Details of each one of these steps are outlined below.

...

Code Block
languagejava
themeEclipse
class GraphUtils {
  public static <T> void traverse(T vertex, Consumer<T> visitor, 

      Function<T, Iterable<T>> getNextVertexes) {


    visitor.accept(vertex);
    for (T nextVertex : getNextVertexes.apply(vertex)) {
      traverse(nextVertex, visitor, getNextVertexes);
    }
  }
}

...

Code Block
languagejava
themeEclipse
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, System.out::println, 
        OperatorSpec::getRegisteredOperatorSpecs);
}

...

Code Block
languagejava
themeEclipse
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();

for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, sendToTableVisitor, 
        OperatorSpec::getRegisteredOperatorSpecs);
}


class SendToTableVisitor implements Consumer<OperatorSpec> {
    /* Private fields omitted for brevity. */
    /**
     * Invoked once with every {@link OperatorSpec} encountered 
     * during traversal.
     */
    @Override
    public void accept(OperatorSpec operatorSpec) {
        /* Examine operatorSpec to create association. */
      }
    }

    /**
     * Used to retrieve association after traversal is complete.
     */
    public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
         getSendToTableToStreamTableJoin() {
        /* Omitted for brevity. */
    }
}

...

Code Block
languagejava
themeEclipse
/**
 * Customizes {@link OperatorSpecGraph} traversal by simulating virtual
 * connections between the associated {@link SendToTableOperatorSpec}s
 * and {@link StreamTableJoinOperatorSpec}s. 
 */ 
class SendToTableConnector 
        implements Function<OperatorSpec, Iterable<OperatorSpec>> {

    private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
        sendToTableToStreamTableJoin;

    public SendToTableConnector(
        Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
             sendToTableToStreamTableJoin) {
      this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
    }

    @Override
    public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {

      if (opSpec instanceof SendToTableOperatorSpec) {
        SendToTableOperatorSpec sendToTableOpSpec = 
            (SendToTableOperatorSpec) opSpec;

        return Collections.unmodifiableCollection(
            sendToTableToStreamTableJoin.get(sendToTableOpSpec));
      }

      return opSpec.getRegisteredOperatorSpecs();
    }
  }

...