Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21925

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-156 and FLIP-56, we proposed fine-grained resource management to improve resource utilization in some scenarios in which coarse-grained resource management does not work well.

...

Note: As neither the operator nor the slot sharing group is exposed in Table / SQL API, how to configure fine-grained resource requirements in Table / SQL API is not included in the scope of this FLIP.

Public Interfaces

In this FLIP, we proposed the following two public interface changes:

  • Introduce a public class SlotSharinGroup, which contains the name and the resource of a slot sharing group.
  • Add setters of SlotSharinGroup in StreamExecutionEnvironment and #slotSharingGroup().

Proposed Changes

SlotSharingGroup

We propose to introduce a public public immutable SlotSharingGroup with which users can describe the fine-grained resource requirements for slot sharing groups.

...

We proposed to introduce a public SlotSharingGroup, with contains the group name as well as the above resource components. User will construct it in a builder pattern.

DataStream API to configure resource requirements

We propose to introduce two DataStream API for configuring SlotSharingGroup#slotSharingGroup and StreamExecutionEnvironment.

Code Block
languagejava
titleslotSharingGroup
public class SingleOutputStreamOperator {
    /**
     * Sets the slot sharing group of this operation. Parallel instances of operations that are in
     * the same slot sharing group will be co-located in the same TaskManager slot, if possible.
     *
     * <p>Operations inherit the slot sharing group of input operations if all input operations are
     * in the same slot sharing group and no slot sharing group was explicitly specified.
     *
     * <p>Initially an operation is in the default slot sharing group. An operation can be put into
     * the default group explicitly by setting the slot sharing group with name {@code "default"}.
     *
     * <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
     *
     * @param slotSharingGroup The slot sharing group which contains the name and the resource spec.
     */
    @PublicEvolving
    public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
}

public class DataStreamSink {
    /**
     * Sets the slot sharing group of this operation. Parallel instances of operations that are in
     * the same slot sharing group will be co-located in the same TaskManager slot, if possible.
     *
     * <p>Operations inherit the slot sharing group of input operations if all input operations are
     * in the same slot sharing group and no slot sharing group was explicitly specified.
     *
     * <p>Initially an operation is in the default slot sharing group. An operation can be put into
     * the default group explicitly by setting the slot sharing group with name {@code "default"}.
     *
     * <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
     *
     * @param slotSharingGroup The slot sharing group which contains the name and the resource spec.
     */
    @PublicEvolving
    public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
}

Currently, user specifies the SSG of an operator through the #slotSharingGroup(String name) in SingleOutputStreamOperator and DataStreamSink, which only defines the name of SlotSharingGroup. We propose to add another #slotSharingGroup(SlotSharingGroup slotSharingGroup) to it, which can also define the resource spec along with its name.

Note that Flink will check that two SlotSharingGroup with the same name should not have different resource spec. An exception will be thown in violation.

To maintain the backward compatibility with the stale #slotSharingGroup(String name), which has been widely used in previous releases, we also propose a SlotSharingGroup setter in StreamExecutionEnvironment.

...

The StreamExecutionEnvironment is the context in which a streaming program is executed. When users want to try fine-grained resource management with their existing jobs, they do not need to touch the existing logic and just need to register a list of SlotSharingGroups into the StreamExecutionEnvironment.

Implementation Plan

Step 1. Introduce the DataStream API

Introduce the SlotSharingGroup and setters in StreamExecutionEnvironment and #slotSharingGroup as shown in section Proposed Changes.

Step 2. Document the fine-grained resource management

Document the fine-grained resource management as a beta feature. Well explains how to use it and what constraints it has.

Known Limitations

Potential resource deadlock in batch jobs

As discussed in FLIP-119, FLIP-156 and FLINK-20865, there might be a resource deadlock when applying fine-grained resource management in batch jobs with PIPELINE edges. Before we fix that issue, this case should be marked as invalid. In this case, Flink will throw an exception with a meaningful and understandable explanation, and tell them to set fine-grained.shuffle-mode.all-blocking, which is an internal config and should be removed after FLINK-20865, to true, with which Flink will force all the edges in this scenario to BLOCKING in compiling stage.

Rejected Alternatives

There is also one reject alternative, ExecutionConfig, where the resource configuration can be located.

ExecutionConfig

The ExecutionConfig includes the configs to define the behavior of the program execution, e.g. the default parallelism, retry strategy in failure, etc.

...