...
[This FLIP proposal is a joint work between Zhipeng Zhang, Yun Gao and Dong Lin].
Motivation
When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast()
...
functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
DataSet<?> d1 = ...; DataSet<?> d2 = ...; DataSet<?> d3 = ...; d1.map(new RichMapFunction <?, ?>() { @Override public Object map(Object aLong) throws Exception{ List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2"); List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3"); ...; } }) .withBroadcastSet(d2, "d2") .withBroadcastSet(d3, "d3"); |
...
This sections shows how to use the proposed API to support operators that needs withBroadcastuse broadcast variables.
Code Block | ||
---|---|---|
| ||
public static void main(String[] args) { StreamExecutionEnvironment env = ...; // create non-broadcast inputs DataStream <Integer> input1 = env.addSource(...); DataStream <Integer> input2 = env.addSource(...); // create inputs that needs to be broadcasted to other operators and put it in a map DataStream <?> broadcastInput1 = env.addSource(...); DataStream <?> broadcastInput2 = env.addSource(...); Map <String, DataStream <?>> broadcastMap = new HashMap <>(); broadcastMap.put("broadcastInput1", broadcastInput1); broadcastMap.put("broadcastInput2", broadcastInput2); // call withBroadcastStream DataStream <?> output = BroadcastUtils.withBroadcastStream( Arrays.asList(input1, input2), broadcastMap, dataStreams -> { DataStream in1 = dataStreams.get(0); DataStream in2 = dataStreams.get(1); return in1.connect(in2) .transform( "two-input", BasicTypeInfo.INT_TYPE_INFO, new MyTwoInputOp()) .name("broadcast"); }); output.addSink(...); env.execute(); } /** * A two-input StreamOperator that implements HasBroadcastVariable interface. */ private static class MyTwoInputOp extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, HasBroadcastVariable { // a map used to store the broadcast variables. Map<String, List<?>> broadcastVariables = new HashMap <>(); @Override public void setBroadcastVariable(String name, List <?> broadcastVariable) { broadcastVariables.put(name, broadcastVariable); } @Override public void processElement1(StreamRecord <Integer> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } @Override public void processElement2(StreamRecord <Integer> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } } |
...
- The code written in DataSet API using
DataSet#withBroadcastSet
can callBroadcastUtils#withBroadcastStream
for migration. - DataStream API does not support withBroadcast() for now. There are no compatibilty issues.
Test Plan
We will provide unit tests to validate the proposed changes.
Rejected Alternatives
- Option-1: Another possible solution support `withBroadcast` is to reuse StreamingRuntimeContext#getBroadcastVariable(...). However, we cannot assume that all stream operator contains a streamingRuntimeContext.