You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

 

Status

Current stateUnder Discussion

Discussion threadhttps://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3C1465386300767.94345@tu-berlin.de%3E

JIRA Unable to render Jira issues macro, execution error.

Released: as soon as possible

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Problem:

We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per Taskmanager.

We provided a benchmark [1]. It not only increases the size of data sent over the network but also hurts performance as seen in the preliminary results below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with scaling the number of slots per node from 1 - 16.


 

suite

name

median_time

broadcast.cloud-11

broadcast.01

8796

broadcast.cloud-11

broadcast.02

14802

broadcast.cloud-11

broadcast.04

30173

broadcast.cloud-11

broadcast.08

56936

broadcast.cloud-11

broadcast.16

117507

broadcast.ibm-power-1

broadcast.01

6807

broadcast.ibm-power-1

broadcast.02

8443

broadcast.ibm-power-1

broadcast.04

11823

broadcast.ibm-power-1

broadcast.08

21655

broadcast.ibm-power-1

broadcast.16

37426

 

After looking into the code base it, it seems that the data is de-serialized only once per TM, but the actual data is sent for all slots running the operator with broadcast vars and just gets discarded in case it’s already de-serialized.

We do not see a reason the data can't be shared among the slots of a TM and therefore just sent once.

[1] https://github.com/TU-Berlin-DIMA/flink-broadcast

Public Interfaces

Not yet clear.


Proposed Changes

There are several ways to achieve a solution. Here I will explain my thought process:

Current State

Instead of gathering the data only into one slot, we send it to each slot of the taskmanager in a “pull” fashion.

current_state.png


The RecordWriter will ask the OutputEmitter to which channel it should emit records. In the broadcast case, the OutputEmitter will return “write to all available slots”.

RecordWriter.png

General solution approach:

I first try to avoid to change connections, but instead just try to prevent sending unnecessary messages.


First Approach - Naive

Tell OutputEmitter how many taskmanagers there are and returns the same amount of channels:

naive1.png

This will lead to the following communication scheme:

naive2.png

As you see this is exactly what we want. We only send everything once. In order to make this possible we also need to chance BroadcastVariableMaterialization. Because if we start to read the wrong slot, we will skip the data. To prevent this we will start to read all slots and if we recognize that the channel is empty we wait until the data is materialized on the other channel of the taskmanager:


data = read(slot_i);

if (!data.empty()) {

 materialize();

} else {

 wait until materialized();

}


Experiments showed that this different approach to handle BroadcastVariableMaterialization alone saves 50% of the execution time for an iterative workload without reducing messages.

This approach works for a simple map().withBroadcastSet(). But it turns out that scheduling for native iterations is less predictable. So we can also end up with the following scenario:



Second Approach - Tell each task its TaskManagerID

The scheduler knows about all available Taskmanagers. When a new task is scheduled it will also be informed about its taskmanager. The ID of the taskmanger will be within 0 to N-1 (N= number of available taskmanagers). So when BroadcastVariableMaterialization is trying to read we will overwrite the slot by the taskmanager id:

Drawback: 

Maybe you already see the problem here. If Taskmanager 1 finishes before Taskmanager 2. The task 1 with slot 1 thinks that it can already start the Map operation since it thinks the materialization is completed. But the task 1 is executed on the other Taskmanager so we get an exception. 

There are multiple solutions to resolve this issue. We can wait as long as all slots are either empty or finished. This can be implemented by adding the function hasNext() in the ReaderIterator which doesn’t already pull the first record.


Third Approach - Pass each task a list of one assigned slot per taskmanager

With this approach we tell the OutputEmitter exactly which of the slots are required.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

...

Rejected Alternatives

...

  • No labels