Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public static void main(String[] args) {
	StreamExecutionEnvironment env = ...;
	// create non-broadcast inputs
	DataStream <Integer><?> input1 = env.addSource(...);
	DataStream <Integer><?> input2 = env.addSource(...);
		
	// create inputs that needs to be broadcasted to other operators and put it in a map
	DataStream <?> broadcastInput1 = env.addSource(...);
	DataStream <?> broadcastInput2 = env.addSource(...);
	Map <String, DataStream <?>> broadcastMap = new HashMap <>();
	broadcastMap.put("broadcastInput1", broadcastInput1);
	broadcastMap.put("broadcastInput2", broadcastInput2);

	// call withBroadcastStream
	DataStream <?> output = BroadcastUtils.withBroadcastStream(
		Arrays.asList(input1, input2),
		broadcastMap,
		dataStreams -> {
			DataStream in1 = dataStreams.get(0);
			DataStream in2 = dataStreams.get(1);
			return in1.connect(in2)
				.transform(
					"two-input",
					BasicTypeInfo.INT_TYPE_INFOTypeInformation.of(...),
					new MyTwoInputOp())
				.name("broadcast");
		});
		
	output.addSink(...);
	env.execute();
}

/**
 * A two-input StreamOperator that implements HasBroadcastVariable interface.
 */
private static class MyTwoInputOp extends AbstractStreamOperator<Integer>AbstractStreamOperator<?>
	implements TwoInputStreamOperator<IntegerTwoInputStreamOperator<?, Integer?, Integer>?>, HasBroadcastVariable {

	// a map used to store the broadcast variables.
	Map<String, List<?>> broadcastVariables = new HashMap <>();

	@Override
	public void setBroadcastVariable(String name, List <?> broadcastVariable) {
		broadcastVariables.put(name, broadcastVariable);
	}

	@Override
	public void processElement1(StreamRecord <Integer><?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}

	@Override
	public void processElement2(StreamRecord <Integer><?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}
}

...