Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Zhipeng Zhang and Yun Gao and Dong Lin].

Motivation

When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast() functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:

...

Code Block
languagejava
linenumberstrue
DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");


Basically when When executing the above user-defined map function, users can access all elements of d2 and d3. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3. A common solution (adopted in DataSet API) is that before invoking the map function, we require that all elements of d2 and d3 be collected and cached on each parallel instances of the MapOperator.

...

  • Supports accessing multiple  broadcast inputs.
  • Supports accessing broadcast variables inputs in user defined function  of a stream operator, i. e.g, OneInputStreamOperator, TwoInputStreamOperator, MultiInputStreamOperator.
  • Avoids the possible deadlock caused by the priority-base data-consuming.

...

We propose to make the following API changes to support withBroadcast() functionality described above. 

...

Code Block
languagejava
linenumberstrue
@PublicEvolving
public interface HasBroadcastVariable {

    /**
     * sets broadcast variable.
     *
     * @param name name of the broadcast variablestream.
     * @param broadcastVariable list of elements contained in this broadcast stream.
     */
    void setBroadcastVariable(String name, List<?> broadcastVariable);
}


...

Code Block
languagejava
linenumberstrue
/** Utility class to support withBroadcast in DataStream. */
public class BroadcastUtils {
    /**
     * supports withBroadcastStreamwithBroadcast in DataStream API. Broadcast data streams are available at all
     * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
     * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
     *
     * <p>In detail, the broadcast input data streams will be consumed first and further set by
     * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
     * cached by default to avoid the possible deadlocks.
     *
     * @param inputList non-broadcast input list.
     * @param bcStreams map of the broadcast data streams, where the key is the name and the value
     *     is the corresponding data stream.
     * @param userDefinedFunction the user defined logic in which users can access the broadcast
     *     data streams and produce the output data stream. Note that users can add only one
     *     operator in this function, otherwise it raises an exception.
     * @return the output data stream.
     */
    @PublicEvolving
    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) throw Exception {...}
}

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs. Basically we only need to cache the broacast inputs if there is no deadlock. If there is a deadlock, we would also need to cache the non-broadcat inputs that incurs the deadlock.


To support withBroadcast in DataStream, there are several possible solutions.

  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator  and caches all of the broadcast and non-broadcast inputs in memory/disk to avoid the possible deadlock. 
    • We prefer not adopting this option because it incurs expensive cache cost.
  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator and use InputSelectable to decide the order of consuming records from different inputs. In this case, we only need to cache the broadcast inputs if there is no deadlock.
    • We cannot use this option because currently Flink does not support checkpointing a MultiInputStreamOperator that implements InputSelectable.
  • Wraps broadcast Inputs in a MultiInputStreamOperator and let it co-locate with the non-broadcast input operator. In this case, we can use global static variable to share the cached inputs in these two operators. Besides, we can analyze the jobGraph and decide whether we need to cache each of the non-broadcast input.
    • We plan to choose this option since we can (1) support checkpointing (2) and control whether to cache each of the input.

Example Usage

This sections shows how to use the proposed API to support operators that use broadcast variables.

...