Status
Current state: "Under Discussion"
Discussion thread: <TODO>
JIRA: < >
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Zhipeng Zhang, Yun Gao and Dong Lin].
Motivation
When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast() function, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:
DataSet<?> d1 = ...; DataSet<?> d2 = ...; DataSet<?> d3 = ...; d1.map(new RichMapFunction <?, ?>() { @Override public Object map(Object aLong) throws Exception{ List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2"); List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3"); ...; } }) .withBroadcastSet(d2, "d2") .withBroadcastSet(d3, "d3");
Basically, when executing the above user-defined map function, users can access all elements of d2 and d3. This requires that all elements of d2 and d3 should be collected and cached on each parallel instances of the MapOperator. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3.
We summarize the requirements for supporting withBroadcast as follows:
- Supports accessing multiple broadcast inputs.
- Supports accessing broadcast variables in user defined function of an operator.
- Avoids the possible deadlock caused by the priority-base data-consuming.
Public Interfaces
We propose to make the following API changes to support withBroadcast() functionality described above.
1) Add the HasBroadcastVariable interface.
A stream operator that needs to access broadcast varaibles needs to implement the follow interface.
@PublicEvolving public interface HasBroadcastVariable { /** * sets broadcast variable. * * @param name name of the broadcast variable. * @param broadcastVariable list of the broadcast variable. */ void setBroadcastVariable(String name, List<?> broadcastVariable); }
2) Add BroadcastUtils class.
We propose to add the following utility function to support withBroadcast function.
/** Utility class to support withBroadcast in DataStream. */ public class BroadcastUtils { /** * supports withBroadcastStream in DataStream API. Broadcast data streams are available at all * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}. * * <p>In detail, the broadcast input data streams will be consumed first and further set by * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are * cached by default to avoid the possible deadlocks. * * @param inputList non-broadcast input list. * @param bcStreams map of the broadcast data streams, where the key is the name and the value * is the corresponding data stream. * @param userDefinedFunction the user defined logic in which users can access the broadcast * data streams and produce the output data stream. Note that users can add only one * operator in this function, otherwise it raises an exception. * @return the output data stream. */ @PublicEvolving public static <OUT> DataStream<OUT> withBroadcastStream( List<DataStream<?>> inputList, Map<String, DataStream<?>> bcStreams, Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) {...} }
Proposed Changes
Example Usage
This sections shows how to use the proposed API to support operators that needs withBroadcast.
public static void main(String[] args) { StreamExecutionEnvironment env = ...; // create non-broadcast inputs DataStream <Integer> input1 = env.addSource(...); DataStream <Integer> input2 = env.addSource(...); // create inputs that needs to be broadcasted to other operators and put it in a map DataStream <?> broadcastInput1 = env.addSource(...); DataStream <?> broadcastInput2 = env.addSource(...); Map <String, DataStream <?>> broadcastMap = new HashMap <>(); broadcastMap.put("broadcastInput1", broadcastInput1); broadcastMap.put("broadcastInput2", broadcastInput2); // call withBroadcastStream DataStream <?> output = BroadcastUtils.withBroadcastStream( Arrays.asList(input1, input2), broadcastMap, dataStreams -> { DataStream in1 = dataStreams.get(0); DataStream in2 = dataStreams.get(1); return in1.connect(in2) .transform( "two-input", BasicTypeInfo.INT_TYPE_INFO, new MyTwoInputOp()) .name("broadcast"); }); output.addSink(...); env.execute(); } /** * A two-input StreamOperator that implements HasBroadcastVariable interface. */ private static class MyTwoInputOp extends AbstractStreamOperator<Integer> implements TwoInputStreamOperator<Integer, Integer, Integer>, HasBroadcastVariable { // a map used to store the broadcast variables. Map<String, List<?>> broadcastVariables = new HashMap <>(); @Override public void setBroadcastVariable(String name, List <?> broadcastVariable) { broadcastVariables.put(name, broadcastVariable); } @Override public void processElement1(StreamRecord <Integer> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } @Override public void processElement2(StreamRecord <Integer> streamRecord) throws Exception { List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1"); List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2"); // process element using the broadcast inputs // ... } }
Compatibility, Deprecation, and Migration Plan
- The code written in DataSet API using
DataSet#withBroadcastSet
can callBroadcastUtils#withBroadcastStream
for migration. - DataStream API does not support withBroadcast() for now. There are no compatibilty issues.
Test Plan
We will provide unit tests to validate the proposed changes.
Rejected Alternatives
- Option-1: Another possible solution support `withBroadcast` is to reuse StreamingRuntimeContext#getBroadcastVariable(...). However, we cannot assume that all stream operator contains a streamingRuntimeContext.