THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) {
GraphUtils.traverse(inputOpSpec, System.out::println,
OperatorSpec::getRegisteredOperatorSpecs);
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
SendToTableVisitor sendToTableVisitor = new SendToTableVisitor(); for (InputOperatorSpec inputOpSpec : specGraph.getInputOperators().values()) { GraphUtils.traverse(inputOpSpec, sendToTableVisitor, OperatorSpec::getRegisteredOperatorSpecs); } class SendToTableVisitor implements Consumer<OperatorSpec> { /* Private fields omitted for brevity. */ /** * Invoked once with every {@link OperatorSpec} encountered * during traversal. */ @Override public void accept(OperatorSpec operatorSpec) { /* Examine operatorSpec to create association. */ } } /** * Used to retrieve association after traversal is complete. */ public Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> getSendToTableToStreamTableJoin() { /* Omitted for brevity. */ } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Customizes {@link OperatorSpecGraph} traversal by simulating virtual * connections between the associated {@link SendToTableOperatorSpec}s * and {@link StreamTableJoinOperatorSpec}s. */ class SendToTableConnector implements Function<OperatorSpec, Iterable<OperatorSpec>> { private final Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableToStreamTableJoin; public SendToTableConnector( Multimap<SendToTableOperatorSpec, StreamTableJoinOperatorSpec> sendToTableToStreamTableJoin) { this.sendToTableToStreamTableJoin = sendToTableToStreamTableJoin; } @Override public Iterable<OperatorSpec> apply(OperatorSpec opSpec) { if (opSpec instanceof SendToTableOperatorSpec) { SendToTableOperatorSpec sendToTableOpSpec = (SendToTableOperatorSpec) opSpec; return Collections.unmodifiableCollection( sendToTableToStreamTableJoin.get(sendToTableOpSpec)); } return opSpec.getRegisteredOperatorSpecs(); } } |
...