Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Problem:

We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per Taskmanager.

...

[1] https://github.com/TU-Berlin-DIMA/flink-broadcast

Public Interfaces

Not yet clear.


Proposed Changes

There are several ways to achieve a solution. Here I will explain my thought process:

Current State

Instead of gathering the data only into one slot, we send it to each slot of the taskmanager in a “pull” fashion.

...

The RecordWriter will ask the OutputEmitter to which channel it should emit records. In the broadcast case, the OutputEmitter will return “write to all available slots”.

RecordWriter.png

General solution approach:

I first try to avoid to change connections, but instead just try to prevent sending unnecessary messages.


First Approach - Naive

Tell OutputEmitter how many taskmanagers there are and returns the same amount of channels:

...

This approach works for a simple map().withBroadcastSet(). But it turns out that scheduling for native iterations is less predictable. So we can also end up with the following scenario:



Second Approach - Tell each task its TaskManagerID

The scheduler knows about all available Taskmanagers. When a new task is scheduled it will also be informed about its taskmanager. The ID of the taskmanger will be within 0 to N-1 (N= number of available taskmanagers). So when BroadcastVariableMaterialization is trying to read we will overwrite the slot by the taskmanager id:

...

There are multiple solutions to resolve this issue. We can wait as long as all slots are either empty or finished. This can be implemented by adding the function hasNext() in the ReaderIterator which doesn’t already pull the first record.


Third Approach - Pass each task a list of one assigned slot per taskmanager

With this approach we tell the OutputEmitter exactly which of the slots are required.

...

Scheduling is done in parallel. When some of the tasks are already deployed some other tasks are not even scheduled yet. So I haven’t found a way yet to predict on which slot the task will run. This is necessary to find one unique slot per taskmanager.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

...

Rejected Alternatives

...