...
Code Block | ||||
---|---|---|---|---|
| ||||
DataSet<?> d1 = ...; DataSet<?> d2 = ...; DataSet<?> d3 = ...; d1.map(new RichMapFunction <?, ?>() { @Override public Object map(Object aLong) throws Exception{ List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2"); List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3"); ...; } }) .withBroadcastSet(d2, "d2") .withBroadcastSet(d3, "d3"); |
Basically , when executing the above user-defined map function, users can access all elements of d2 and d3. This requires indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3. A common solution (adopted in DataSet API) is that before invoking the map
function, we require that all elements of d2 and d3should be be collected and cached on each parallel instances of the MapOperator. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3.
Currently, DataStream API does not support withBroadcast. Given that Flink aims to deprecate the DataSet API, we want to support withBroadcast on DataStream API. We summarize the requirements for supporting withBroadcast as follows:
- Supports accessing multiple broadcast inputs.
- Supports accessing broadcast variables in user defined function of an operator.a stream operator, i.e., OneInputStreamOperator, TwoInputStreamOperator, MultiInputStreamOperator.
- Avoids the possible deadlock caused by the priority-base data-consuming.
...
A stream operator that needs to access broadcast varaibles needs to should implement the follow interface.
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface HasBroadcastVariable { /** * sets broadcast variable. * * @param name name of the broadcast variable. * @param broadcastVariable list of theelements contained in this broadcast variablestream. */ void setBroadcastVariable(String name, List<?> broadcastVariable); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Utility class to support withBroadcast in DataStream. */
public class BroadcastUtils {
/**
* supports withBroadcastStream in DataStream API. Broadcast data streams are available at all
* parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
* that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
*
* <p>In detail, the broadcast input data streams will be consumed first and further set by
* {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
* cached by default to avoid the possible deadlocks.
*
* @param inputList non-broadcast input list.
* @param bcStreams map of the broadcast data streams, where the key is the name and the value
* is the corresponding data stream.
* @param userDefinedFunction the user defined logic in which users can access the broadcast
* data streams and produce the output data stream. Note that users can add only one
* operator in this function, otherwise it raises an exception.
* @return the output data stream.
*/
@PublicEvolving
public static <OUT> DataStream<OUT> withBroadcastStream(
List<DataStream<?>> inputList,
Map<String, DataStream<?>> bcStreams,
Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) throw Exception {...}
} |
Proposed Changes
Example Usage
...