Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...


Vote thread
JIRA
Release


JIRA: <

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-24279
>

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

[This FLIP proposal is a joint work between Zhipeng Zhang and Yun Gao and Dong Lin].


Motivation

When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast() functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:

...

Code Block
languagejava
linenumberstrue
DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");


Basically, when When executing the above user-defined map function, users can access all elements of d2 and d3. This requires indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3. A common solution (adopted in DataSet API) is that before invoking the map function, we require that all elements of d2 and d3should be  be collected and cached on each parallel instances of the MapOperator. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3.


Currently, DataStream API does not support withBroadcast. Given that Flink aims to deprecate the DataSet API, we want to support withBroadcast on DataStream API. We summarize the requirements for supporting withBroadcast as follows:

  • Supports accessing multiple  broadcast inputs.
  • Supports accessing broadcast variables inputs in user defined function  of an operatora stream operator, e.g, OneInputStreamOperator, TwoInputStreamOperator, MultiInputStreamOperator.
  • Avoids the possible deadlock caused by the priority-base data-consuming.

...

We propose to make the following API changes to support withBroadcast() functionality described above. 

...

A stream operator that needs to access broadcast varaibles needs to should implement the follow interface.

Code Block
languagejava
linenumberstrue
@PublicEvolving
public interface HasBroadcastVariable {

    /**
     * sets broadcast variable.
     *
     * @param name name of the broadcast variablestream.
     * @param broadcastVariable list of theelements contained in this broadcast variablestream.
     */
    void setBroadcastVariable(String name, List<?> broadcastVariable);
}


...

Code Block
languagejava
linenumberstrue
/** Utility class to support withBroadcast in DataStream. */
public class BroadcastUtils {
    /**
     * supports withBroadcastStreamwithBroadcast in DataStream API. Broadcast data streams are available at all
     * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
     * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
     *
     * <p>In detail, the broadcast input data streams will be consumed first and further set by
     * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
     * cached by default to avoid the possible deadlocks.
     *
     * @param inputList non-broadcast input list.
     * @param bcStreams map of the broadcast data streams, where the key is the name and the value
     *     is the corresponding data stream.
     * @param userDefinedFunction the user defined logic in which users can access the broadcast
     *     data streams and produce the output data stream. Note that users can add only one
     *     operator in this function, otherwise it raises an exception.
     * @return the output data stream.
     */
    @PublicEvolving
    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) throw Exception {...}
}

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs. Basically we only need to cache the broacast inputs if there is no deadlock. If there is a deadlock, we would also need to cache the non-broadcat inputs those incur the deadlock.


To support withBroadcast in DataStream, there are several possible solutions.

  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator  and caches all of the broadcast and non-broadcast inputs in memory/disk to avoid the possible deadlock. 
    • We prefer not adopting this option because it incurs expensive cache cost.
  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator and use InputSelectable to decide the order of consuming records from different inputs. In this case, we only need to cache the broadcast inputs if there is no deadlock.
    • We cannot use this option because currently Flink does not support checkpointing a MultiInputStreamOperator that implements InputSelectable.
  • Wraps broadcast Inputs in a MultiInputStreamOperator and let it co-locate with the non-broadcast input operator. In this case, we can use global static variable to share the cached inputs in these two operators. Besides, we can analyze the jobGraph and decide whether we need to cache each of the non-broadcast input.
    • We plan to choose this option since we can (1) support checkpointing (2) and control whether to cache each of the input.

Example Usage

This sections shows how to use the proposed API to support operators that use broadcast variables.

...

Code Block
languagejava
public static void main(String[] args) {
	StreamExecutionEnvironment env = ...;
	// create non-broadcast inputs
	DataStream <Integer><?> input1 = env.addSource(...);
	DataStream <Integer><?> input2 = env.addSource(...);
		
	// create inputs that needs to be broadcasted to other operators and put it in a map
	DataStream <?> broadcastInput1 = env.addSource(...);
	DataStream <?> broadcastInput2 = env.addSource(...);
	Map <String, DataStream <?>> broadcastMap = new HashMap <>();
	broadcastMap.put("broadcastInput1", broadcastInput1);
	broadcastMap.put("broadcastInput2", broadcastInput2);

	// call withBroadcastStream
	DataStream <?> output = BroadcastUtils.withBroadcastStream(
		Arrays.asList(input1, input2),
		broadcastMap,
		dataStreams -> {
			DataStream in1 = dataStreams.get(0);
			DataStream in2 = dataStreams.get(1);
			return in1.connect(in2)
				.transform(
					"two-input",
					BasicTypeInfo.INT_TYPE_INFOTypeInformation.of(...),
					new MyTwoInputOp())
				.name("broadcast");
		});
		
	output.addSink(...);
	env.execute();
}

/**
 * A two-input StreamOperator that implements HasBroadcastVariable interface.
 */
private static class MyTwoInputOp extends AbstractStreamOperator<Integer>AbstractStreamOperator<?>
	implements TwoInputStreamOperator<IntegerTwoInputStreamOperator<?, Integer?, Integer>?>, HasBroadcastVariable {

	// a map used to store the broadcast variables.
	Map<String, List<?>> broadcastVariables = new HashMap <>();

	@Override
	public void setBroadcastVariable(String name, List <?> broadcastVariable) {
		broadcastVariables.put(name, broadcastVariable);
	}

	@Override
	public void processElement1(StreamRecord <Integer><?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}

	@Override
	public void processElement2(StreamRecord <Integer><?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}
}

...