You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: "Under Discussion"

Discussion thread: <TODO>

JIRA: < Unable to render Jira issues macro, execution error. >

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Zhipeng ZhangYun Gao and Dong Lin].

Motivation

When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast() functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:


DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");


Basically, when executing the above user-defined map function, users can access all elements of d2 and d3. This requires that all elements of d2 and d3 should be collected and cached on each parallel instances of the MapOperator. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3.


We summarize the requirements for supporting withBroadcast as follows:

  • Supports accessing multiple  broadcast inputs.
  • Supports accessing broadcast variables in user defined function  of an operator.
  • Avoids the possible deadlock caused by the priority-base data-consuming.

Public Interfaces

We propose to make the following API changes to support withBroadcast() functionality described above. 


1) Add the HasBroadcastVariable interface.


A stream operator that needs to access broadcast varaibles needs to implement the follow interface.

@PublicEvolving
public interface HasBroadcastVariable {

    /**
     * sets broadcast variable.
     *
     * @param name name of the broadcast variable.
     * @param broadcastVariable list of the broadcast variable.
     */
    void setBroadcastVariable(String name, List<?> broadcastVariable);
}



2) Add BroadcastUtils class.

We propose to add the following utility function to support withBroadcast function.

/** Utility class to support withBroadcast in DataStream. */
public class BroadcastUtils {
    /**
     * supports withBroadcastStream in DataStream API. Broadcast data streams are available at all
     * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
     * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
     *
     * <p>In detail, the broadcast input data streams will be consumed first and further set by
     * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
     * cached by default to avoid the possible deadlocks.
     *
     * @param inputList non-broadcast input list.
     * @param bcStreams map of the broadcast data streams, where the key is the name and the value
     *     is the corresponding data stream.
     * @param userDefinedFunction the user defined logic in which users can access the broadcast
     *     data streams and produce the output data stream. Note that users can add only one
     *     operator in this function, otherwise it raises an exception.
     * @return the output data stream.
     */
    @PublicEvolving
    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) {...}
}

Proposed Changes

Example Usage

This sections shows how to use the proposed API to support operators that use broadcast variables.


public static void main(String[] args) {
	StreamExecutionEnvironment env = ...;
	// create non-broadcast inputs
	DataStream <Integer> input1 = env.addSource(...);
	DataStream <Integer> input2 = env.addSource(...);
		
	// create inputs that needs to be broadcasted to other operators and put it in a map
	DataStream <?> broadcastInput1 = env.addSource(...);
	DataStream <?> broadcastInput2 = env.addSource(...);
	Map <String, DataStream <?>> broadcastMap = new HashMap <>();
	broadcastMap.put("broadcastInput1", broadcastInput1);
	broadcastMap.put("broadcastInput2", broadcastInput2);

	// call withBroadcastStream
	DataStream <?> output = BroadcastUtils.withBroadcastStream(
		Arrays.asList(input1, input2),
		broadcastMap,
		dataStreams -> {
			DataStream in1 = dataStreams.get(0);
			DataStream in2 = dataStreams.get(1);
			return in1.connect(in2)
				.transform(
					"two-input",
					BasicTypeInfo.INT_TYPE_INFO,
					new MyTwoInputOp())
				.name("broadcast");
		});
		
	output.addSink(...);
	env.execute();
}

/**
 * A two-input StreamOperator that implements HasBroadcastVariable interface.
 */
private static class MyTwoInputOp extends AbstractStreamOperator<Integer>
	implements TwoInputStreamOperator<Integer, Integer, Integer>, HasBroadcastVariable {

	// a map used to store the broadcast variables.
	Map<String, List<?>> broadcastVariables = new HashMap <>();

	@Override
	public void setBroadcastVariable(String name, List <?> broadcastVariable) {
		broadcastVariables.put(name, broadcastVariable);
	}

	@Override
	public void processElement1(StreamRecord <Integer> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}

	@Override
	public void processElement2(StreamRecord <Integer> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}
}


Compatibility, Deprecation, and Migration Plan

  • The code written in DataSet API using DataSet#withBroadcastSet  can call BroadcastUtils#withBroadcastStream for migration.
  • DataStream API does not support withBroadcast() for now. There are no compatibilty issues.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

  • Option-1: Another possible solution support `withBroadcast` is to reuse StreamingRuntimeContext#getBroadcastVariable(...). However, we cannot assume that all stream operator contains a streamingRuntimeContext.
  • No labels