There are two ways in which you can make data available to all parallel instances of a function: Via function parameters/closures or via broadcast sets.

Variable in Function Closure

This variant stores the data in a variable in the Flink program and then makes it part of the function closure.

The following example illustrates that:

final long numElements = ...; // some constant or command line parameter
		
DataSet<Tuple2<Long, Double>> idsWithValue = 
        ids.map(new MapFunction<Long, Tuple2<Long, Double>>() {
                public Tuple2<Long, Double>map(Long value) {
                        return new Tuple2<Long, Double>(value, 1.0 / numElements);
                }
        });

The variable numElements is defined in the Flink program. By referring to it from within the MapFunction, the contents of this variable becomes part of the MapFunctions's closure.

When Flink distributes the MapFunction to the TaskManagers to execute it, it automatically sens the value of numElements with it.

That means that that the variable contents is distributed with the program code and is distributed with the TaskDeployment messages.

Note that you cannot distribute a DataSet like this, because the DataSet does not actually contain any elements, it just describes the distributed data at that point of
the program.

When to use?

Distribute a variable with the Function closure when:

  • The variable is small. For collections, they should hold very few megabytes at most
  • The data is available already in the driver program
  • Typical use case: Constants or config parameters

 

Broadcast Variables

This variant takes a Flink DataSet and makes it available to all function instances that implement RichFunction (JavaDoc).

The following example illustrates that:

 
DataSet<Point> points = env.readCsv(...);
 
DataSet<Centroid> centroids = ... ; // some computation
 
points.map(new RichMapFunction<Point, Integer>() {
 
    private List<Centroid> centroids;

    @Override
    public void open(Configuration parameters) {
        this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
    }
		
    @Override
    public Integer map(Point p) {
        return selectCentroid(centroids, p);
    }
 
}).withBroadcastSet("centroids", centroids);
 


The data flow attaches the centroids data set explicitly to the RichMapFunction. The data will be sent to all RichFunctions in parallel via the Flink network stack that is also used for shuffling the data.


Note that the collection obtained via "getRuntimeContext().getBroadcastVariable("centroids");" is actually shared between all RichFunctions on the same TaskManager.
That way, the system can handle larger broadcast variables efficiently.

When to use?

Distribute data with a broadcast variable when

  • The data is large
  • The data has been produced by some form of computation and is already a DataSet (distributed result)
  • Typical use case: Redistribute intermediate results, such as trained models

Broadcast larger client data via Collection Data Sets and Broadcast Variables

It is possible to distribute client data via collection data sets, together with broadcast variables.

This would be the case if, in the above example, the centroids data set came from a collection on the client, rather than from a computation.

DataSet<Centroid> centroids = env.fromCollection(somewhatLargerCollections); 
  • No labels