Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. OperatorSpec graph analysis

  2. Conversion to StreamEdges

  3. Join input validation

Image RemovedImage Added

Details of each one of these steps are outlined below.

...

The output of this step is a collection of StreamEdge sets, each of which represents a group of StreamEdges participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the OperatorSpec graph.

Join Input

...

Processing

In this step, we order the StreamEdge groups process every group of StreamEdges produced by the previous step in the following order. Each group can be composed of:

  1. Groups of StreamEdges whose all partition counts are set come first

  2. Groups of StreamEdges with a mix of set and unset partition counts come next.

  3. Groups of StreamEdges where not a single StreamEdge has set its partition count set come next.

  4. with defined partition counts only (i.e. input streams), or

  5. StreamEdges with defined/undefined partition counts (i.e. input/intermediate streams), or

  6. StreamEdges with undefined partition counts only (i.e. intermediate streams)

Groups of type (b) are processed such that all StreamEdges with undefined partition counts in a group get assigned the same partition count as the other stream(s) whose partition count is defined in the same group. It is worth noting that this assignment can change the type of other group(s) from (c) to (b) because 1 intermediate StreamEdge can appear in multiple groups. This entails that we perform this assignment iteratively and incrementally in such a way that accounts for the interdependencies between the different groups. Finally, StreamEdges in groups of type (c) are assigned the maximum partition count among all input and output StreamEdges capped at a maximum hardcoded value of 256.

At the end of this process, StreamEdges in every group must have the same partition count or else the Execution Planner will reject the applicationBy processing StreamEdge groups in this order, i.e. most constrained StreamEdges first, we guarantee that we can verify agreement among input streams and assign partition counts to intermediate streams in a single scan, even if one or more StreamEdges are present in several groups.

Operations in this step tackle StreamEdges exclusively. No further associations with other entities are necessary.

...

The following code snippet demonstrates how this utility can be used to traverse and print the entire OperatorSpecGraph OperatorSpec graph starting from its InputOperatorSpecs InputOperatorSpecs:

Code Block
languagejava
themeEclipse
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, System.out::println, 
       OperatorSpec::getRegisteredOperatorSpecs);
}

...

  1. It spares us having to write redundant code for the 2 traversals needed during the OperatorSpec Graph  Graph Analysis step.

  2. It makes it easy to carry out more traversals of any part of the OperatorSpecGraph OperatorSpec graph should the need arise.

  3. It allows us to write modular visitors, i.e. implementations of Consumer<T>, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.

  4. It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the OperatorSpecGraph OperatorSpec graph, as in the second traversal in OperatorSpecGraph OperatorSpec Graph Analysis. This way, we can keep the OperatorSpecGraph OperatorSpec graph representation as close as possible to the way it was defined by the user without being limited by it.

This utility can be used to carry out the 2 traversals in the OperatorSpecGraph OperatorSpec Graph Analysis step as shown below.

The first traversal can be done as follows:

Code Block
languagejava
themeEclipse
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();

for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs);
}


class SendToTableVisitor implements Consumer<OperatorSpec> {
    /* Private fields omitted for brevity. */
    /**
     * Invoked once with every {@link OperatorSpec} encountered 
     * during traversal.
     */
    @Override
    public void accept(OperatorSpec operatorSpec) {
        /* Examine operatorSpec to create association. */
      }
    }

    /**
     * Used to retrieve association after traversal is complete.
     */
    public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
        getSendToTableToStreamTableJoin() {
        /* Omitted for brevity. */
    }
}

The SendToTableVisitor is  is a type dedicated to observing the OperatorSpecGraph OperatorSpec graph and building the association between SendToTableOperatorSpecs SendToTableOperatorSpecs and the StreamTableJoinOperatorSpecs StreamTableJoinOperatorSpecs that share the same TableSpecs TableSpecs.

The second traversal can be similarly achieved, with a custom implementation of the getNextVertexes()

...

 function that relies on the association created in the previous step.

Code Block
languagejava
themeEclipse
/**
 * Customizes {@link OperatorSpecGraph} traversal by simulating virtual
 * connections between the associated {@link SendToTableOperatorSpec}s
 * and {@link StreamTableJoinOperatorSpec}s. 
 */ 
class SendToTableConnector 
        implements Function<OperatorSpec, Iterable<OperatorSpec>> {

    private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
        sendToTableToStreamTableJoin;

    public SendToTableConnector(
        Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
             sendToTableToStreamTableJoin) {
      this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
    }

    @Override
    public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {

      if (opSpec instanceof SendToTableOperatorSpec) {
        SendToTableOperatorSpec sendToTableOpSpec = 
            (SendToTableOperatorSpec) opSpec;

        return Collections.unmodifiableCollection(
            sendToTableToStreamTableJoin.get(sendToTableOpSpec));
      }

      return opSpec.getRegisteredOperatorSpecs();
    }
  }

...