...
StatusStatus
Page properties |
---|
...
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We provided a benchmark [1]. It not only increases the size of data sent over the network but also hurts performance as seen in the preliminary results below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with scaling the number of slots per node from 1 - 16.
...
suite | name | median_time |
broadcast.cloud-11 | broadcast.01 | 8796 |
broadcast.cloud-11 | broadcast.02 | 14802 |
broadcast.cloud-11 | broadcast.04 | 30173 |
broadcast.cloud-11 | broadcast.08 | 56936 |
broadcast.cloud-11 | broadcast.16 | 117507 |
broadcast.ibm-power-1 | broadcast.01 | 6807 |
broadcast.ibm-power-1 | broadcast.02 | 8443 |
broadcast.ibm-power-1 | broadcast.04 | 11823 |
broadcast.ibm-power-1 | broadcast.08 | 21655 |
broadcast.ibm-power-1 | broadcast.16 | 37426 |
After looking into the code base it, it seems that the data is deserialized only once per TM, but the actual data is sent for all slots running the operator with broadcast vars and just gets discarded in case it’s already deserialized.
...
We don’t release the non-read slots properly
- We need to introduce synchronization after every iteration to prevent early releasing
Fifth Approach - Write broadcast data to one blocking subpartition only
This approach writes the broadcast data only to one blocking subpartition. So there is no data redundancy and no unnecessary network traffic. In order to make the broadcast subpartitions blocking we modify the job graph generation accordingly.
This approach currently does not work for native iterations (see https://issues.apache.org/jira/browse/FLINK-1713).
In the case of a simple Map().withBroadcastSet() we achieve the following results on the IBM-cluster:
suite | name | average_time (ms) |
broadcast.ibm-power-1 | broadcast.01 | 6597.33 |
broadcast.ibm-power-1 | broadcast.02 | 5997 |
broadcast.ibm-power-1 | broadcast.04 | 6576.66 |
broadcast.ibm-power-1 | broadcast.08 | 7024.33 |
broadcast.ibm-power-1 | broadcast.16 | 6933.33 |
You can see that the run time stays constant, which is exactly what we want. So the next step is to extend iterations to make it work in general.
But there was also an error with this approach. We create subpartitions which are not used. Subpartitions are only released when they are consumed. So with this approach we produce a memory leak.
Sixth Approach - Use one subpartition only
With this approach we only create one subpartition per task / slot. To make this possible we have to redirect all execution edges accordingly.
Now we have to figure out when to release the blocking partition.
Of each task manager one task will read the subpartition and send a release message to the subpartition when finished. The other tasks which don't read just sent a "I am done" message to the subpartition. We know the number of tasks, so we know when every task is finished.
This approach currently does not work for native iterations (see https://issues.apache.org/jira/browse/FLINK-1713).
In the case of a simple Map().withBroadcastSet() we achieve the following results on the IBM-cluster:
suite | name | average_time (ms) |
broadcast.ibm-power-1 | broadcast.01 | 6433 |
broadcast.ibm-power-1 | broadcast.02 | 6162.66 |
broadcast.ibm-power-1 | broadcast.04 | 6459 |
broadcast.ibm-power-1 | broadcast.08 | 6635.66 |
broadcast.ibm-power-1 | broadcast.16 | 7002.66 |
Compatibility, Deprecation, and Migration Plan
...