Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html

JIRAFLINK-21925

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21925

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-156 and FLIP-56, we proposed fine-grained resource management to improve resource utilization in some scenarios in which coarse-grained resource management does not work well.

...

In this FLIP, we propose the DataStream API for specifying fine-grained resource requirements in StreamExecutionEnvironment and #slotSharingGroup().

Note: As neither the operator nor the slot sharing group is exposed in Table / SQL API, how to configure fine-grained resource requirements in Table / SQL API is not included in the scope of this FLIP.

Public Interfaces

In this FLIP, we proposed the following two public interface changes:

  • Expose ResourceSpecIntroduce a public class SlotSharinGroup, which describes the fine-grained resource requirements for slot sharing groups, to usercontains the name and the resource of a slot sharing group.
  • Add setters of ResourceSpec in StreamExecutionEnvironment SlotSharinGroup in StreamExecutionEnvironment and #slotSharingGroup().

Proposed Changes

...

SlotSharingGroup

We propose to expose ResourceSpecintroduce a public immutable SlotSharingGroup with which users can describe the fine-grained resource requirements for slot sharing groups.

Code Block
languagejava
titleResourceSpecSlotSharingGroup
/**
 * Describes Describe the name and the the different resource componentsfactors of thea fine-grainedslot resourcesharing requirementgroup.
 */
 * <ol>@PublicEvolving
public *class SlotSharingGroup implements <li>CPUSerializable cores{

 *   <li>Task/** HeapConstruct Memory
a *new builder for <li>Taskthe Off-Heap Memory
 *   <li>Managed Memory
 *SlotSharingGroup with a specific name. */
    <li>Extendedpublic resources
static * </ol>
 */
@PublicEvolving
public interface ResourceSpec {
  Builder newBuilder(String name);

    /**
 Builder for the {@link SlotSharingGroup}. */
   Get thepublic CPUstatic cores.
class Builder {
   *
     /** @returnSet the CPU cores. */
     */
   public CPUResourceBuilder getCpuCoressetCpuCores(double cpuCores);

    /**
     /** GetSet the task heap memory. */
     *
   public Builder setTaskHeapMemory(MemorySize taskHeapMemory);

        /** @returnSet the task heap memory in MB. */
     */
   public MemorySizeBuilder getTaskHeapMemorysetTaskHeapMemoryMB(int taskHeapMemoryMB);

    /**
     /** GetSet the task off-heap memory. */
        public Builder  *
setTaskOffHeapMemory(MemorySize taskOffHeapMemory);

        /** @returnSet the task off-heap memory in MB. */
     */
   public MemorySizeBuilder getTaskOffHeapMemorysetOffTaskHeapMemoryMB(int taskOffHeapMemoryMB);

    /**
     /** GetSet the task managed memory. */
         *
public Builder setManagedMemory(MemorySize managedMemory);

        /** @returnSet the task managed memory in MB. */
     */
   public MemorySizeBuilder getManagedMemorysetManagedMemoryMB(int managedMemoryMB);

        /**
         * GetAdd the given extendedexternal resource. The old value with the same givenresource name ifwill present.be
     *
     * @paramreplaced name of the extended resource
 if present.
    * @return an optional extended resource*/
     */
   public Optional<ExternalResource>Builder getExtendedResourcesetExternalResource(String name, double value);

    /**
     /** GetBuild all the extendedSlotSharingGroup resourceswith indexed by their name.
    current factors. */
     * @return all thepublic extended resources indexed by their nameSlotSharingGroup build();
     */
    Map<String, ExternalResource> getExtendedResources();}
}

Currently, the resource components to run a task include CPU cores, heap memory, off-heap memory, managed memory, network memory, and external resources. Except for the network memory, which will be automatically set by the runtime in FLINK-15031, all the resource components should be configured by users.

The existing ResourceSpec would be the most suitable class. However, this class contains internal operation methods like merge and special values like UNKNOWN which are unnecessary to be exposed to the user. So, we move that internal logic to somewhere e.g. ResourceSpecUtils, only read-only interfaces and Builder will be exposed to user as is shown aboveWe proposed to introduce a public SlotSharingGroup, with contains the group name as well as the above resource components. User will construct it in a builder pattern.

DataStream API to configure resource requirements

We propose to introduce the two DataStream API for configuring ResourceSpec of slot sharing groups in StreamExecutionEnvironment. It allows users to configure the ResourceSpec of a specific slot sharing group or configure for a set of slot sharing groups. SlotSharingGroup#slotSharingGroup and StreamExecutionEnvironment.

Code Block
languagejava
titleStreamExecutionEnvironmentslotSharingGroup
public class StreamExecutionEnvironmentSingleOutputStreamOperator {
    /**
     * Specify fine-grained resource requirements for slot sharing groups. The existing resource Sets the slot sharing group of this operation. Parallel instances of operations that are in
     * the same slot sharing group will be co-located in the same TaskManager slot, if possible.
     *
     * requirement<p>Operations inherit the slot sharing group of input operations if all input operations are
     * in the same slot sharing group and no slot sharing group was explicitly specified.
     *
     * <p>Initially an operation is in the default slot sharing group will. An operation can be put into
     * the default group explicitly by setting the slot sharing group with name {@code "default"}.
     *
     * <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
     *
     * <p>Note@param thatslotSharingGroup aThe slot sharing group which hintscontains the schedulername thatand the grouped operators CAN be
  resource spec.
     */
    @PublicEvolving
    public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
}

public class DataStreamSink {
    /**
    deployed into* aSets sharedthe slot sharing group of this operation. Parallel There'sinstances noof guaranteeoperations that are in
     * the scheduler always deploy the same slot sharing group will be co-located in the same TaskManager slot, if possible.
     *
     * grouped operators together. In cases grouped operators are deployed into separate slots, the<p>Operations inherit the slot sharing group of input operations if all input operations are
     * in the same slot sharing group and no slot sharing group was explicitly specified.
     *
     * <p>Initially an operation is in the default slot resources will be derived from the specified group requirements sharing group. An operation can be put into
     * the default group explicitly by setting the slot sharing group with name {@code "default"}.
     */

     * <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
     @PublicEvolving
    public StreamExecutionEnvironment setSlotSharingGroupResources(
   *
     * @param slotSharingGroup The slot sharing group which contains the name and the resource spec.
     */
    @PublicEvolving
    public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
}

Currently, user specifies the SSG of an operator through the #slotSharingGroup(String name) in SingleOutputStreamOperator and DataStreamSink, which only defines the name of SlotSharingGroup. We propose to add another #slotSharingGroup(SlotSharingGroup slotSharingGroup) to it, which can also define the resource spec along with its name.

Note that Flink will check that two SlotSharingGroup with the same name should not have different resource spec. An exception will be thown in violation.

To maintain the backward compatibility with the stale #slotSharingGroup(String name), which has been widely used in previous releases, we also propose a SlotSharingGroup setter in StreamExecutionEnvironment.

Code Block
languagejava
titleStreamExecutionEnvironment
public class StreamExecutionEnvironment {Map<String, ResourceSpec> slotSharingGroupResources);


    /**
     * Specify fine-grained resource requirements for slot sharing groups. The existing resource
     * requirement of the same slot sharing group will be replaced.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operators together. In cases grouped operators are deployed into separate slots, the
     * slot resources will be derived from the specified group requirements.
     */
    @PublicEvolving
    public StreamExecutionEnvironment setSlotSharingGroupResourceregisterSlotSharingGroup(
            String slotSharingGroup, ResourceSpec resourceSpec);
}

The StreamExecutionEnvironment is the context in which a streaming program is executed. Defines the resources of SSGs in StreamExecutionEnvironment has the following advantages:

...

 When users want to try fine-grained resource management with their existing jobs, they do not need to touch the existing logic and just need to

...

register a

...

Thus, we propose to add the above interfaces in list of SlotSharingGroups into the StreamExecutionEnvironment.

Implementation Plan

Step 1. Introduce the DataStream API

Introduce the ResourceSpecSlotSharingGroup and setters in StreamExecutionEnvironment and #slotSharingGroup as shown in section Proposed Changes.

Step 2. Document the fine-grained resource management

Document the fine-grained resource management as a beta feature. Well explains how to use it and what constraints it has.

Known Limitations

Potential resource deadlock in batch jobs

As discussed in FLIP-119, FLIP-156 and FLINK-20865, there might be a resource deadlock when applying fine-grained resource management in batch jobs with PIPELINE edges. Before we fix that issue, this case should be marked as invalid. In this case, Flink will throw an exception with a meaningful and understandable explanation, and tell them to set fine-grained.shuffle-mode.all-blocking, which is an internal config and should be removed after FLINK-20865, to true, with which Flink will force all the edges in this scenario to BLOCKING in compiling stage.

Rejected Alternatives

There are is also two one reject alternativesalternative, ExecutionConfig, and #slotSharingGroup(String name), where the resource configuration can be located.

ExecutionConfig

The ExecutionConfig includes the configs to define the behavior of the program execution, e.g. the default parallelism, retry strategy in failure, etc.

Adding the DataStream API in ExecutionConfig can also gain good usability and simplify the system. However, it can lead to unnecessary network overhead. As the ExecutionConfig will be serialized into TaskDeploymentDescriptor, the SSG-resource map will also be transferred to the TaskManager with the deployment of each task. Such network overhead is unnecessary and can further aggravate the startup time of large-scale jobs.

#slotSharingGroup(String name, ResourceSpec resource)

Currently, user specifies the SSG of an operator through the #slotSharingGroup(String name) in SingleOutputStreamOperator and DataStreamSink, which only defines the name of SlotSharingGroup. We can add an interface like #slotSharingGroup(String name, ResourceSpec resource).

Code Block
languagejava
titlesnippet
// create two data sources, grades and salaries
DataStream<Tuple2<String, Integer>> grades =
        GradeSource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).slotSharingGroup("ssg-1", resourceSpec1);

DataStream<Tuple2<String, Integer>> salaries =
        SalarySource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).slotSharingGroup("ssg-2", resourceSpec2);

// run the actual window join program with the same slot sharing group as grades
DataStream<Tuple3<String, Integer, Integer>> joinedStream =
        runWindowJoin(grades, salaries, windowSize).slotSharingGroup("ssg-1", resourceSpec1);

However, this interface may have poor usability. In the above code snippet, the user needs to invoke the #slotSharingGroup(String name, ResourceSpec resource) with the same slot sharing group multiple times. In such cases, the user needs to ensure the configured ResourceSpec is the same each time.

We could allow users to directly define a SlotSharingGroup class for the operator, which could ensure consistency across multiple invocations of #slotSharingGroup(SlotSharingGroup ssg). However, this interface is not compatible with the stale #slotSharingGroup(String name). It would complicate the system to maintain the backward capability if we want to keep both slot sharing group setters.