THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public static void main(String[] args) { StreamExecutionEnvironment env = ...; // create non-broadcast inputs DataStream <Integer><?> input1 = env.addSource(...); DataStream <Integer><?> input2 = env.addSource(...); // create inputs that needs to be broadcasted to other operators and put it in a map DataStream <?> broadcastInput1 = env.addSource(...); DataStream <?> broadcastInput2 = env.addSource(...); Map <String, DataStream <?>> broadcastMap = new HashMap <>(); broadcastMap.put("broadcastInput1", broadcastInput1); broadcastMap.put("broadcastInput2", broadcastInput2); // call withBroadcastStream DataStream <?> output = BroadcastUtils.withBroadcastStream( Arrays.asList(input1, input2), broadcastMap, dataStreams -> { DataStream in1 = dataStreams.get(0); DataStream in2 = dataStreams.get(1); return in1.connect(in2) .transform( "two-input", BasicTypeInfo.INT_TYPE_INFOTypeInformation.of(...), new MyTwoInputOp()) .name("broadcast"); }); output.addSink(...); env.execute(); } /** * A two-input StreamOperator that implements HasBroadcastVariable interface. */ private static class MyTwoInputOp extends AbstractStreamOperator<Integer>AbstractStreamOperator<?> implements TwoInputStreamOperator<IntegerTwoInputStreamOperator<?, Integer?, Integer>?>, HasBroadcastVariable { // a map used to store the broadcast variables. Map<String, List<?>> broadcastVariables = new HashMap <>(); @Override public void setBroadcastVariable(String name, List <?> broadcastVariable) { broadcastVariables.put(name, broadcastVariable); } @Override public void processElement1(StreamRecord <Integer><?> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } @Override public void processElement2(StreamRecord <Integer><?> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } } |
...