Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Zhipeng ZhangYun Gao and Dong Lin].

Motivation

When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast()

...

functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:


Code Block
languagejava
linenumberstrue
DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");

...

This sections shows how to use the proposed API to support operators that needs withBroadcastuse broadcast variables.


Code Block
languagejava
public static void main(String[] args) {
	StreamExecutionEnvironment env = ...;
	// create non-broadcast inputs
	DataStream <Integer> input1 = env.addSource(...);
	DataStream <Integer> input2 = env.addSource(...);
		
	// create inputs that needs to be broadcasted to other operators and put it in a map
	DataStream <?> broadcastInput1 = env.addSource(...);
	DataStream <?> broadcastInput2 = env.addSource(...);
	Map <String, DataStream <?>> broadcastMap = new HashMap <>();
	broadcastMap.put("broadcastInput1", broadcastInput1);
	broadcastMap.put("broadcastInput2", broadcastInput2);

	// call withBroadcastStream
	DataStream <?> output = BroadcastUtils.withBroadcastStream(
		Arrays.asList(input1, input2),
		broadcastMap,
		dataStreams -> {
			DataStream in1 = dataStreams.get(0);
			DataStream in2 = dataStreams.get(1);
			return in1.connect(in2)
				.transform(
					"two-input",
					BasicTypeInfo.INT_TYPE_INFO,
					new MyTwoInputOp())
				.name("broadcast");
		});
		
	output.addSink(...);
	env.execute();
}

/**
 * A two-input StreamOperator that implements HasBroadcastVariable interface.
 */
private static class MyTwoInputOp extends AbstractStreamOperator<Integer>
	implements TwoInputStreamOperator<Integer, Integer, Integer>, HasBroadcastVariable {

	// a map used to store the broadcast variables.
	Map<String, List<?>> broadcastVariables = new HashMap <>();

	@Override
	public void setBroadcastVariable(String name, List <?> broadcastVariable) {
		broadcastVariables.put(name, broadcastVariable);
	}

	@Override
	public void processElement1(StreamRecord <Integer> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}

	@Override
	public void processElement2(StreamRecord <Integer> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}
}

...

  • The code written in DataSet API using DataSet#withBroadcastSet  can call BroadcastUtils#withBroadcastStream for migration.
  • DataStream API does not support withBroadcast() for now. There are no compatibilty issues.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

  • Option-1: Another possible solution support `withBroadcast` is to reuse StreamingRuntimeContext#getBroadcastVariable(...). However, we cannot assume that all stream operator contains a streamingRuntimeContext.