Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Drafting
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
- Exposed monitoring information
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
In FLIP-156 and FLIP-56, we proposed fine-grained resource management to improve resource utilization in some scenarios in which coarse-grained resource management does not work well.
Currently (release-1.13), the SSG-based runtime interface and slot allocation for fine-grained resource requirements have been implemented in FLIP-156 and FLIP-56 respectively, the last piece of delivering fine-grained resource management to users is the User APIs.
In this FLIP, we propose the DataStream API for specifying fine-grained resource requirements in StreamExecutionEnvironment
and #slotSharingGroup()
.
Note: As neither the operator nor the slot sharing group is exposed in Table / SQL API, how to configure fine-grained resource requirements in Table / SQL API is not included in the scope of this FLIP.
Public Interfaces
In this FLIP, we proposed the following two public interface changes:
- Introduce a public class
SlotSharinGroup
, which contains the name and the resource of a slot sharing group. - Add setters of
SlotSharinGroup
inStreamExecutionEnvironment
and#slotSharingGroup()
.
Proposed Changes
SlotSharingGroup
We propose to introduce a public immutable SlotSharingGroup
with which users can describe the fine-grained resource requirements for slot sharing groups.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describe the name and the the different resource factors of a slot sharing group. */
@PublicEvolving
public class SlotSharingGroup implements Serializable {
/** Construct a new builder for the SlotSharingGroup with a specific name. */
public static Builder newBuilder(String name);
/** Builder for the {@link SlotSharingGroup}. */
public static class Builder {
/** Set the CPU cores. */
public Builder setCpuCores(double cpuCores);
/** Set the task heap memory. */
public Builder setTaskHeapMemory(MemorySize taskHeapMemory);
/** Set the task heap memory in MB. */
public Builder setTaskHeapMemoryMB(int taskHeapMemoryMB);
/** Set the task off-heap memory. */
public Builder setTaskOffHeapMemory(MemorySize taskOffHeapMemory);
/** Set the task off-heap memory in MB. */
public Builder setOffTaskHeapMemoryMB(int taskOffHeapMemoryMB);
/** Set the task managed memory. */
public Builder setManagedMemory(MemorySize managedMemory);
/** Set the task managed memory in MB. */
public Builder setManagedMemoryMB(int managedMemoryMB);
/**
* Add the given external resource. The old value with the same resource name will be
* replaced if present.
*/
public Builder setExternalResource(String name, double value);
/** Build the SlotSharingGroup with current factors. */
public SlotSharingGroup build();
}
} |
Currently, the resource components to run a task include CPU cores, heap memory, off-heap memory, managed memory, network memory, and external resources. Except for the network memory, which will be automatically set by the runtime in FLINK-15031, all the resource components should be configured by users.
We proposed to introduce a public SlotSharingGroup
, with contains the group name as well as the above resource components. User will construct it in a builder pattern.
DataStream API to configure resource requirements
We propose to introduce two DataStream API for configuring SlotSharingGroup
: #slotSharingGroup
and StreamExecutionEnvironment
.
Code Block | ||||
---|---|---|---|---|
| ||||
public class SingleOutputStreamOperator {
/**
* Sets the slot sharing group of this operation. Parallel instances of operations that are in
* the same slot sharing group will be co-located in the same TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations are
* in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group with name {@code "default"}.
*
* <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
*
* @param slotSharingGroup The slot sharing group which contains the name and the resource spec.
*/
@PublicEvolving
public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
}
public class DataStreamSink {
/**
* Sets the slot sharing group of this operation. Parallel instances of operations that are in
* the same slot sharing group will be co-located in the same TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations are
* in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group with name {@code "default"}.
*
* <p>Note: the resource spec of the slot sharing group can be replaced by {@link StreamExecutionEnvironment#registerSlotSharingGroup}.
*
* @param slotSharingGroup The slot sharing group which contains the name and the resource spec.
*/
@PublicEvolving
public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup);
} |
Currently, user specifies the SSG of an operator through the #slotSharingGroup(String name)
in SingleOutputStreamOperator
and DataStreamSink
, which only defines the name of SlotSharingGroup
. We propose to add another #slotSharingGroup(SlotSharingGroup slotSharingGroup)
to it, which can also define the resource spec along with its name.
Note that Flink will check that two SlotSharingGroup
with the same name should not have different resource spec. An exception will be thown in violation.
To maintain the backward compatibility with the stale #slotSharingGroup(String name)
, which has been widely used in previous releases, we also propose a SlotSharingGroup
setter in StreamExecutionEnvironment.
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamExecutionEnvironment {
/**
* Specify fine-grained resource requirements for slot sharing groups. The existing resource
* requirement of the same slot sharing group will be replaced.
*
* <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
* deployed into a shared slot. There's no guarantee that the scheduler always deploy the
* grouped operators together. In cases grouped operators are deployed into separate slots, the
* slot resources will be derived from the specified group requirements.
*/
@PublicEvolving
public StreamExecutionEnvironment registerSlotSharingGroup(
String slotSharingGroup, ResourceSpec resourceSpec);
} |
The StreamExecutionEnvironment
is the context in which a streaming program is executed. When users want to try fine-grained resource management with their existing jobs, they do not need to touch the existing logic and just need to register a list of SlotSharingGroups
into the StreamExecutionEnvironment
.
Implementation Plan
Step 1. Introduce the DataStream API
Introduce the SlotSharingGroup
and setters in StreamExecutionEnvironment
and #slotSharingGroup
as shown in section Proposed Changes.
Step 2. Document the fine-grained resource management
Document the fine-grained resource management as a beta feature. Well explains how to use it and what constraints it has.
Known Limitations
Potential resource deadlock in batch jobs
As discussed in FLIP-119, FLIP-156 and FLINK-20865, there might be a resource deadlock when applying fine-grained resource management in batch jobs with PIPELINE edges. Before we fix that issue, this case should be marked as invalid. In this case, Flink will throw an exception with a meaningful and understandable explanation, and tell them to set fine-grained.shuffle-mode.all-blocking, which is an internal config and should be removed after FLINK-20865, to true, with which Flink will force all the edges in this scenario to BLOCKING in compiling stage.
Rejected Alternatives
There is also one reject alternative, ExecutionConfig
, where the resource configuration can be located.
ExecutionConfig
The ExecutionConfig
includes the configs to define the behavior of the program execution, e.g. the default parallelism, retry strategy in failure, etc.
Adding the DataStream API in ExecutionConfig
can also gain good usability and simplify the system. However, it can lead to unnecessary network overhead. As the ExecutionConfig
will be serialized into TaskDeploymentDescriptor
, the SSG-resource map will also be transferred to the TaskManager
with the deployment of each task. Such network overhead is unnecessary and can further aggravate the startup time of large-scale jobsIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.