Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per Taskmanagertask manager.

We provided a benchmark [1]. It not only increases the size of data sent over the network but also hurts performance as seen in the preliminary results below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with scaling the number of slots per node from 1 - 16.

...

After looking into the code base it, it seems that the data is de-serialized deserialized only once per TM, but the actual data is sent for all slots running the operator with broadcast vars and just gets discarded in case it’s already de-serializeddeserialized.

We do not see a reason the data can't be shared among the slots of a TM and therefore just sent once.

...

Instead of gathering the data only into one slot, we send it to each slot of the taskmanager task manager in a “pull” fashion.

current_state2.pngImage Added

Also note that instead of having as many intermediate results per task manager as target tasks, we have that times the number of slots. Which means a high number of additional connections and setup overhead. Without that overhead it would look like this:

current_state.pngImage Addedcurrent_state.pngImage Removed


The RecordWriter will ask the OutputEmitter to which channel it should emit records. In the broadcast case, the OutputEmitter will return “write to all available slots”.

RecordWriter.pngImage RemovedRecordWriter2.pngImage Added




General solution approach:

...

Tell OutputEmitter how many taskmanagers task managers there are and returns return the same amount of channels:

naive1.pngImage Removednaive1_2.pngImage Added


This will lead to the following communication scheme:

naive2.pngImage Removednaive2_2.pngImage Added

As you see this is exactly what we want. We only send everything once. The red dashed lines mean that the task tries to read but recognizes immediately that the input slot is empty.

In order to make this possible we also need to chance BroadcastVariableMaterialization. Because if we start to read the wrong slot, we will skip the data. To prevent this we will start to read all slots and if we recognize that the channel is empty we wait until the data is materialized on the other channel of the taskmanagertask manager:


data = read(slot_i);


if (!data.empty()) {

...

The scheduler knows about all available Taskmanagerstask managers. When a new task is scheduled it will also be informed about its taskmanagertask manager. The ID of the taskmanger task manger will be within 0 to N-1 (N= number of available taskmanagerstask managers). So when BroadcastVariableMaterialization is trying to read we will overwrite the slot by the taskmanager task manager id:

Image Removedtaskid2.pngImage Added

Drawback:  Maybe you already see the problem here. If Taskmanager 1 finishes before Taskmanager 2. The task 1 with slot 1 thinks that it can already start the Map operation since it thinks the materialization is completed. But the task 1 is executed on the other Taskmanager task manager so we get an exception.  


There are multiple solutions to resolve this issue. We can wait as long as all slots are either empty or finished. This can be implemented by adding the function hasNext() in the ReaderIterator which doesn’t already pull the first record.

...

Third Approach - Pass each task a list of one assigned slot per taskmanagertask manager

With this approach we tell the OutputEmitter exactly which of the slots are required.


Problem: Scheduling is done in parallel. When some of the tasks are already deployed some other tasks are not even scheduled yet. So I haven’t found a way yet to predict on which slot the task will run. This is necessary to find one unique slot per taskmanager.

Compatibility, Deprecation, and Migration Plan

...

task manager.

Fourth Approach - Just read once

After Stephan’s interesting email (http://mail-archives.apache.org/mod_mbox/flink-dev/201607.mbox/%3CCANC1h_vtfNy6LuqjTJ3m0ZZvZQttkYO9GdTNshchhv=eO9cV8w@mail.gmail.com%3E), I implemented part of his idea. This idea mostly affects the BroadcastVariableMaterialization. Instead of reading from all input slots we will only read the first upcoming input slot and save the same Reader in the BroadcastVariableManager. So if it is an iterative workload in the next iteration we will use the very same reader. Since we don’t know which input slot will be the slot we actually read from we will write to all input slots to be save. So there is still some overhead here. But we gain by not sending unnecessary data. The design looks like this:


Image Added

Locally this approach works already for a small number of iterations (=170 iterations, don’t ask me yet, why it stops there). On the cluster it fails. I still need to investigate where the problem is.

Possible issues:

  1. We don’t release the non-read slots properly

  2. We need to introduce synchronization after every iteration to prevent early releasing

Compatibility, Deprecation, and Migration Plan

...

...

Test Plan

...

Rejected Alternatives

...