You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateUnder Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51071.html

JIRAFLINK-21925

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-156 and FLIP-56, we proposed fine-grained resource management to improve resource utilization in some scenarios in which coarse-grained resource management does not work well.

Currently (release-1.13), the SSG-based runtime interface and slot allocation for fine-grained resource requirements have been implemented in FLIP-156 and FLIP-56 respectively, the last piece of delivering fine-grained resource management to users is the User APIs.

In this FLIP, we propose the DataStream API for specifying fine-grained resource requirements in StreamExecutionEnvironment.

Note: As neither the operator nor the slot sharing group is exposed in Table / SQL API, how to configure fine-grained resource requirements in Table / SQL API is not included in the scope of this FLIP.

Public Interfaces

In this FLIP, we proposed the following two public interface changes:

  • Expose ResourceSpec, which describes the fine-grained resource requirements for slot sharing groups, to user.
  • Add setters of ResourceSpec in StreamExecutionEnvironment.

Proposed Changes

ResourceSpec

We propose to expose ResourceSpec with which users can describe the fine-grained resource requirements for slot sharing groups.

ResourceSpec
/**
 * Describes the different resource components of the fine-grained resource requirement.
 *
 * <ol>
 *   <li>CPU cores
 *   <li>Task Heap Memory
 *   <li>Task Off-Heap Memory
 *   <li>Managed Memory
 *   <li>Extended resources
 * </ol>
 */
@PublicEvolving
public class ResourceSpec {
    /**
     * Get the CPU cores.
     *
     * @return the CPU cores
     */
    CPUResource getCpuCores();

    /**
     * Get the task heap memory.
     *
     * @return the task heap memory
     */
    MemorySize getTaskHeapMemory();

    /**
     * Get the task off-heap memory.
     *
     * @return the task off-heap memory.
     */
    MemorySize getTaskOffHeapMemory();

    /**
     * Get the managed memory.
     *
     * @return the managed memory
     */
    MemorySize getManagedMemory();

    /**
     * Get the extended resource with the given name if present.
     *
     * @param name of the extended resource
     * @return an optional extended resource
     */
    Optional<ExternalResource> getExtendedResource(String name);

    /**
     * Get all the extended resources indexed by their name.
     *
     * @return all the extended resources indexed by their name
     */
    Map<String, ExternalResource> getExtendedResources();
}

Currently, the resource components to run a task include CPU cores, heap memory, off-heap memory, managed memory, network memory, and external resources. Except for the network memory, which will be automatically set by the runtime in FLINK-15031, all the resource components should be configured by users.

The existing ResourceSpec would be the most suitable class. However, this class contains internal operation methods like merge and special values like UNKNOWN which are unnecessary to be exposed to the user. So, we move that internal logic to somewhere e.g. ResourceSpecUtils, only read-only interfaces and Builder will be exposed to user as is shown above.

DataStream API to configure resource requirements

We propose to introduce the DataStream API for configuring ResourceSpec of slot sharing groups in StreamExecutionEnvironment. It allows users to configure the ResourceSpec of a specific slot sharing group or configure for a set of slot sharing groups.

StreamExecutionEnvironment
public class StreamExecutionEnvironment {
    /**
     * Specify fine-grained resource requirements for slot sharing groups. The existing resource
     * requirement of the same slot sharing group will be replaced.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operators together. In cases grouped operators are deployed into separate slots, the
     * slot resources will be derived from the specified group requirements.
     */
    @PublicEvolving
    public StreamExecutionEnvironment setSlotSharingGroupResources(
            Map<String, ResourceSpec> slotSharingGroupResources);


    /**
     * Specify fine-grained resource requirements for slot sharing groups. The existing resource
     * requirement of the same slot sharing group will be replaced.
     *
     * <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
     * deployed into a shared slot. There's no guarantee that the scheduler always deploy the
     * grouped operators together. In cases grouped operators are deployed into separate slots, the
     * slot resources will be derived from the specified group requirements.
     */
    @PublicEvolving
    public StreamExecutionEnvironment setSlotSharingGroupResource(
            String slotSharingGroup, ResourceSpec resourceSpec);
}

The StreamExecutionEnvironment is the context in which a streaming program is executed. Defines the resources of SSGs in StreamExecutionEnvironment has the following advantages:

  • Good usability. When users want to try fine-grained resource management, they do not need to touch the existing logic and just need to put a map of SSG names and ResourceSpec into the StreamExecutionEnvironment. The consistent match between SSG names and their resources has also been ensured naturally.
  • Simplifies the system. With this design, we minimize the modification of the existing system.

Thus, we propose to add the above interfaces in the StreamExecutionEnvironment.

Implementation Plan

Step 1. Introduce the DataStream API

Introduce the ResourceSpec and setters in StreamExecutionEnvironment as shown in section Proposed Changes.

Step 2. Document the fine-grained resource management

Document the fine-grained resource management as a beta feature. Well explains how to use it and what constraints it has.

Rejected Alternatives

There are also two reject alternatives, ExecutionConfig, and #slotSharingGroup(String name), where the resource configuration can be located.

ExecutionConfig

The ExecutionConfig includes the configs to define the behavior of the program execution, e.g. the default parallelism, retry strategy in failure, etc.

Adding the DataStream API in ExecutionConfig can also gain good usability and simplify the system. However, it can lead to unnecessary network overhead. As the ExecutionConfig will be serialized into TaskDeploymentDescriptor, the SSG-resource map will also be transferred to the TaskManager with the deployment of each task. Such network overhead is unnecessary and can further aggravate the startup time of large-scale jobs.

#slotSharingGroup(String name, ResourceSpec resource)

Currently, user specifies the SSG of an operator through the #slotSharingGroup(String name) in SingleOutputStreamOperator and DataStreamSink, which only defines the name of SlotSharingGroup. We can add an interface like #slotSharingGroup(String name, ResourceSpec resource).

snippet
// create two data sources, grades and salaries
DataStream<Tuple2<String, Integer>> grades =
        GradeSource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).slotSharingGroup("ssg-1", resourceSpec1);

DataStream<Tuple2<String, Integer>> salaries =
        SalarySource.getSource(env, rate)
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).slotSharingGroup("ssg-2", resourceSpec2);

// run the actual window join program with the same slot sharing group as grades
DataStream<Tuple3<String, Integer, Integer>> joinedStream =
        runWindowJoin(grades, salaries, windowSize).slotSharingGroup("ssg-1", resourceSpec1);

However, this interface may have poor usability. In the above code snippet, the user needs to invoke the #slotSharingGroup(String name, ResourceSpec resource) with the same slot sharing group multiple times. In such cases, the user needs to ensure the configured ResourceSpec is the same each time.

We could allow users to directly define a SlotSharingGroup class for the operator, which could ensure consistency across multiple invocations of #slotSharingGroup(SlotSharingGroup ssg). However, this interface is not compatible with the stale #slotSharingGroup(String name). It would complicate the system to maintain the backward capability if we want to keep both slot sharing group setters.



  • No labels