THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
class GraphUtils { public static <T> void traverse(T vertex, Consumer<T> visitor, Function<T, Iterable<T>> getNextVertexes) { visitor.accept(vertex); for (T nextVertex : getNextVertexes.apply(vertex)) { traverse(nextVertex, visitor, getNextVertexes); } } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) { GraphUtils.traverse(inputOpSpec, System.out::println, OperatorSpec::getRegisteredOperatorSpecs); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor(); for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) { GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs); } class SendToTableVisitor implements Consumer<OperatorSpec> { /* Private fields omitted for brevity. */ /** * Invoked once with every {@link OperatorSpec} encountered * during traversal. */ @Override public void accept(OperatorSpec operatorSpec) { /* Examine operatorSpec to create association. */ } } /** * Used to retrieve association after traversal is complete. */ public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> getSendToTableToStreamTableJoin() { /* Omitted for brevity. */ } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Customizes {@link OperatorSpecGraph} traversal by simulating virtual
* connections between the associated {@link SendToTableOperatorSpec}s
* and {@link StreamTableJoinOperatorSpec}s.
*/
class SendToTableConnector
implements Function<OperatorSpec, Iterable<OperatorSpec>> {
private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec>
sendToTableToStreamTableJoin;
public SendToTableConnector(
Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableToStreamTableJoin) {
this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin;
}
@Override
public Iterable<OperatorSpec> apply(OperatorSpec opSpec) {
if (opSpec instanceof SendToTableOperatorSpec) {
SendToTableOperatorSpec sendToTableOpSpec =
(SendToTableOperatorSpec) opSpec;
return Collections.unmodifiableCollection(
sendToTableToStreamTableJoin.get(sendToTableOpSpec));
}
return opSpec.getRegisteredOperatorSpecs();
}
} |
...