Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
class GraphUtils {
  public static <T> void traverse(T vertex, Consumer<T> visitor, 

      Function<T, Iterable<T>> getNextVertexes) {


    visitor.accept(vertex);
    for (T nextVertex : getNextVertexes.apply(vertex)) {
      traverse(nextVertex, visitor, getNextVertexes);
    }
  }
}

...

Code Block
languagejava
themeEclipse
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, System.out::println, 
        OperatorSpec::getRegisteredOperatorSpecs);
}

...

Code Block
languagejava
themeEclipse
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor();

for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
    GraphUtils.traverse(inputOpSpec, sendToTableVisitor, 
        OperatorSpec::getRegisteredOperatorSpecs);
}


class SendToTableVisitor implements Consumer<OperatorSpec> {
    /* Private fields omitted for brevity. */
    /**
     * Invoked once with every {@link OperatorSpec} encountered 
     * during traversal.
     */
    @Override
    public void accept(OperatorSpec operatorSpec) {
        /* Examine operatorSpec to create association. */
      }
    }

    /**
     * Used to retrieve association after traversal is complete.
     */
    public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
         getSendToTableToStreamTableJoin() {
        /* Omitted for brevity. */
    }
}

...

Code Block
languagejava
themeEclipse
/**
 * Customizes {@link OperatorSpecGraph} traversal by simulating virtual
 * connections between the associated {@link SendToTableOperatorSpec}s
 * and {@link StreamTableJoinOperatorSpec}s. 
 */ 
class SendToTableConnector 
        implements Function<OperatorSpec, Iterable<OperatorSpec>> {

    private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> 
        sendToTableToStreamTableJoin;

    public SendToTableConnector(
        Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableToStreamTableJoin) {
      this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
    }

    @Override
    public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {

      if (opSpec instanceof SendToTableOperatorSpec) {
        SendToTableOperatorSpec sendToTableOpSpec = 
            (SendToTableOperatorSpec) opSpec;

        return Collections.unmodifiableCollection(
            sendToTableToStreamTableJoin.get(sendToTableOpSpec));
      }

      return opSpec.getRegisteredOperatorSpecs();
    }
  }

...