Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");


Basically , when executing the above user-defined map function, users can access all elements of d2 and d3. This requires indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3. A common solution (adopted in DataSet API) is that before invoking the map function, we require that all elements of d2 and d3should be  be collected and cached on each parallel instances of the MapOperator. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3.


Currently, DataStream API does not support withBroadcast. Given that Flink aims to deprecate the DataSet API, we want to support withBroadcast on DataStream API. We summarize the requirements for supporting withBroadcast as follows:

  • Supports accessing multiple  broadcast inputs.
  • Supports accessing broadcast variables in user defined function  of an operator.a stream operator, i.e., OneInputStreamOperator, TwoInputStreamOperator, MultiInputStreamOperator.
  • Avoids the possible deadlock caused by the priority-base data-consuming.

...

A stream operator that needs to access broadcast varaibles needs to should implement the follow interface.

Code Block
languagejava
linenumberstrue
@PublicEvolving
public interface HasBroadcastVariable {

    /**
     * sets broadcast variable.
     *
     * @param name name of the broadcast variable.
     * @param broadcastVariable list of theelements contained in this broadcast variablestream.
     */
    void setBroadcastVariable(String name, List<?> broadcastVariable);
}


...

Code Block
languagejava
linenumberstrue
/** Utility class to support withBroadcast in DataStream. */
public class BroadcastUtils {
    /**
     * supports withBroadcastStream in DataStream API. Broadcast data streams are available at all
     * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
     * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
     *
     * <p>In detail, the broadcast input data streams will be consumed first and further set by
     * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
     * cached by default to avoid the possible deadlocks.
     *
     * @param inputList non-broadcast input list.
     * @param bcStreams map of the broadcast data streams, where the key is the name and the value
     *     is the corresponding data stream.
     * @param userDefinedFunction the user defined logic in which users can access the broadcast
     *     data streams and produce the output data stream. Note that users can add only one
     *     operator in this function, otherwise it raises an exception.
     * @return the output data stream.
     */
    @PublicEvolving
    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) throw Exception {...}
}

Proposed Changes

Example Usage

...