Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • POINTWISE_EDGES_PIPELINED
    • Pipelined regions: 6
      • {A1, B1, C1, C2}
      • {A2,B2, C3, C4}
      • {D1}, {D2}, {D3}, {D4}
      • Pipelined regions: 6
      • Blocking logical edge: 1
      • Minimum slots required: 2
    • ALL_EDGES_PIPELINED
      • Pipelined regions: 1
        • {A1, A2, B2, B3, C1, C2, C3, C4, D1, D2, D3, D4}
      • Blocking logical edges: 0
      • Minimum slots required: 4

    ...

    Because we will schedule a pipelined region as a unit, it can happen that consumer tasks are scheduled too early and hog cluster resources unnecessarily. This is the case if a producing task takes a long time to emit the first record due to being computationally expensive, e.g., aggregate operations and sorting.

    Resource Deadlocks could still happen in certain Cases

    Pipelined region scheduling does not solve all resource deadlock cases. Resource deadlocks can still happen in the following cases:

    1. There are slots of different sizes. This would not happen in 1.11, but can happen when in the future we allow users to configure ResourceSpecs for operators.
    2. There are multiple jobs in a Flink session cluster and their slot allocations compete with each other. 

    These cases do not block this design since case #1 will not happen in 1.11 and case #2 is a legacy problem. Below are more details of these resource deadlock cases.

    Resource deadlocks when allocated slots are too small to fulfill the oldest request

    Below is an example to demonstrate the issue that no slot allocation bulk can be completely fulfilled even if the cluster has enough resources to fulfill each bulk.

    Image Added

    Possible solutions:

    • Option 1: SlotPool releases unused slots to RM and waits for the pending requests in RM to be fulfilled. Slot requests related to the released slots should also be re-sent to RM.
    • Option 2: Force FIFO slot allocation in SlotManager. We can do this after the SlotManager is pluggable (FLINK-14106).

    Resource deadlocks when slots of different sizes are improperly assigned to slot requests

    Below is an example to demonstrate this issue. Note that the 3 requests are in the same bulk.

    Image Added

    Possible solutions:

    • Option 1: SlotPool can detect this issue and trigger a re-assignment of these slots.
    • Option 2: Force strictly matching of slots and slot requests. It should work if the issue “Resource deadlocks when allocated slots are too small to fulfill the oldest request” is already solved.

    Resource deadlocks when slot allocation competition happens between multiple jobs in a session cluster

    With pipelined region scheduling, we can avoid slot allocation competition between regions of one job. However, the slot allocation competition can also happen between multiple jobs if they are in the same Flink session. 

    To solve this, one possible solution is to enable bulk slot allocation for ResourceManager. The ResourceManager should try to fulfill the oldest bulk of requests first and then the next bulk. We may also need to introduce to ResourceManager a slot allocation bulk confirming interface which will be invoked by the JobMaster when it sees the bulk of requests are all fulfilled. This would help in the case that a TaskManger is lost after RM requested slots from it but before the slots are offered to the JM.

    Test Plan

    We will write additional unit and integration tests. Furthermore, the changes are already covered by E2E tests.

    ...