Status

Discussion threadhttps://lists.apache.org/thread/nwrqd5jtqwks89tbxpcrgto6r2bhdhno
Vote threadhttps://lists.apache.org/thread/xpmhpmodzlwo03n6zzovy36gox84l6zl
JIRA

Unable to render Jira issues macro, execution error.

Release1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the cloud-native environment, it is difficult to determine the appropriate disk space for Batch shuffle, which will affect job stability. In most scenarios, the local disks in the cloud-native environment are isolated at Pod granularity[1]. If users configure a large Pod local disk, the cost is relatively high, and the cost will be wasted when the disk is idle; If users configure a small Pod local disk, large-scale jobs may encounter insufficient disk space error when shuffling data, resulting in poor job stability.

There are several existing solutions to this problem.

  • Mount the cloud disk. Mounting cloud disks can solve the problem of insufficient disk space, but the access performance is usually not as good as that of a local disk. Generally, the cost of mounting the cloud disk is also higher than using the local disks.

  • Use Remote Shuffle Service. This can also solve the problem of insufficient disk space during batch processing. However, maintaining a Shuffle Service cluster independently will introduce additional deployments and maintenance costs, and at least two network transmissions for each piece of data being shuffled.

  • Directly use cloud-native remote storage systems to shuffle data, such as remote object storage. Such storages usually have lows cost and are easy to scale. However, compared to the local disk, this solution has poorer performance and can hardly meet the performance requirements of large-scale jobs. 

We propose to solve this problem with a combination of local disk and remote storage, which leverages the local disk with the given fixed space size for better performance in most scenarios and uses remote storage as a supplement when the local disk is insufficient. We propose to support dynamic switching between the local disk and Remote Storage, that is, when the local disk is full, it automatically switches to Remote Storage, and automatically switches back when the local disk has available spaces again.

Our proposal is based on Hybrid Shuffle, because: 1) the idea of dynamic switching between storages is similar to that of Hybrid Shuffle, which dynamically switches between memory and local disk; 2) we see great potential in Hybrid Shuffle to become the next-generation default Batch shuffle for its better resource adaptiveness and performance. 

The goals of this FLIP are as follows.

  • By default, use the local memory and disk to ensure high shuffle performance if the local storage space is sufficient.

  • Improve shuffle stability for large-scale Batch jobs, by using remote storage as a supplement to avoid job failure when the local disk space is insufficient.

Public Interfaces

  1. The config option is to specify the home dir of the remote storage.

    1. Key: taskmanager.network.hybrid-shuffle.remote.path

    2. Default: no default value

    3. Description: The base home path of remote storage to store shuffle data. If the option is configured, Hybrid Shuffle will use the remote storage path as a supplement to the local disks. If not configured, the remote storage will not be used.

  2. The config option is to specify the minimum reserved space ratio per local disk.

    1. Key: taskmanager.network.hybrid-shuffle.local-disk.min-reserve-space-fraction

    2. Default: 5%

    3. Description: The minimum reserved space fraction per local disk when using Hybrid Shuffle. When using a local disk to store shuffle data, the local disk space may be exhausted if it is used without any limit, leading to job failures. This option controls the minimum reserved disk space fraction which cannot be used to store the shuffle data. When the left available disk space fraction reaches this limit, the new arriving data will be written to the remote storage, if the remote storage is not configured, an exception will be thrown.

Proposed Changes

We propose to extend Hybrid Shuffle to support remote storages. The proposed major changes are as follows.

  1. Refactor Hybrid Shuffle to introduce a generalized abstraction, Tiered Storage Architecture, for dynamically switching between different storages

  2. Adapt existing usages of local memory and disk to the new tiered architecture

  3. Introduce a new remote storage tire

Tiered Storage Architecture

We model Flink shuffle as the process of upstream tasks write data to different storages and downstream tasks read data from the storages accordingly.

Currently, Hybrid Shuffle only supports memory storage and local disk storage. To add more storages, it's necessary to introduce a unified abstraction for the storages, so that:

  1. Hybrid Shuffle can leverage as many storages as possible and dynamically switch between the storages, without having to understand the detailed differences for accessing different storages (data format, memory usage, etc.).

  2. The logics for accessing different storages are independent from each other. 

  3. Making it easy to add a new storage or to remove an existing storage. It should not require changing the hybrid shuffle common (framework) logics or logics for accessing other storages.

We propose Tiered Storage Architecture for storing shuffle data in a unified, universal, and scalable way. The new architecture abstracts different storages and provides a unified access interface. We name each storage as a Tier in the new architecture. 

An overview of the Tiered Storage Architecture is shown in the figure above, mainly includes Write Client, Read Client, and storage tiers. A storage tier is responsible for writing data to and reading data from a particular storage. Write Client is used by upstream tasks (ResultPartitionWriter) to write data to storages. It dynamically decides and switches the storage tier that data should be written to. Read Client is used by downstream tasks (InputGate) for reading data from the storages. It automatically finds the proper storage tier to read data from, respecting the order of data in each subpartition.  

The proposed architecture guarantees complete decoupling and independence between all tiers. Each tier operates independently from the others, resulting in improved flexibility that allows for the addition or removal of tiers without affecting the rest. Additionally, the independent and decoupled tiers do not require synchronization between them, which is a positive impact on performance.

In the following sections, we present two keys in the new architecture design: dynamic storage tier switching and memory management.

Dynamic Storage Tier Switching

Write Client tries to write data to storage tiers according to their priorities, and fallback to the next tier if the previous one cannot accept more data (e.g., reaching space limit). It also tries to switch back to the high-priority tiers once in a while. For simplicity, we use a fixed priority order, Local Memory > Local Disk > Remote Storage, in the first step. In the future, this can be configured or even dynamically decided in runtime if necessary.

Tier Switching Granularity

How often should we switch between storage tiers? Switching at a too high frequency should be avoided because it increases the cost of selecting a storage tier to write to / read from. Moreover, the amount of continuous data written to a storage is usually performance related (compaction, file size / amount, IO randomness, indexing, etc.). 

We introduce the concept Segment as the smallest granularity for switching between tiers. A segment is defined as a batch of continuous records within a subpartition. Data in the same segment are guaranteed written to the same storage tier. Each segment is uniquely identified by the subpartition id and an increasing segment index within that subpartition. Therefore, we can easily locate the data by looking for the segments.

The size of segments is decided by the storage tiers. This is because the proper segment size may be different for different tiers. E.g., a smaller segment size may allow more subpartitions to share the limited space in the local memory tier, while a relatively larger segment may help reduce the number of files in local disk / remote storage tiers.

After selecting a tier, the write client starts writing the segment to the corresponding tier. The client will keep writing until the tier tells the client that the current segment is full. Then the write client will start a new segment. In this way, the tier decides the segment size independently, and the framework(write client) need not be aware of the different segment sizes of each tier.

In order to simplify the segment implementation in the initial version, we opt to set a fixed value for the segment size of each tier. Specifically, the memory tier was assigned a segment size of 10 buffers (320k per segment), while the disk tier and remote storage tier were allocated 128 buffers (4m per segment) for each segment. In the future, we can make adjustments to the segment size dynamically if necessary.

Notice that an end-of-segement does not necessarily lead to a tier switching, because the write client will trigger to switch the tier only when encountering one of the two conditions. 1) The current tier can not accept the next segment anymore. 2)  The tier with higher priority can accept the next segment, so we should switch back to that tier.

Write Client

Write Client determines when to switch tier and which tier to switch. The data is written in the granularity of the Segment. When a segment is finished written, select a new tier again. The writing data process is as follows.

  1. Before writing, ask each tier whether it can accept a new Segment according to the tier priority. If yes, write data to this tier, otherwise ask the next tier.

  2. Write the Segment data to the selected tier. The selected tier will also save the SegmentId, which is used to determine whether this tier contains the corresponding Segment when reading data.

  3. After the Segment is written, write the next Segment and return to Step 1.

Read Client

Read Client is responsible for asking each storage tier whether a Segment exists and reading data from the tier where the Segment is located. The data reading process is as follows.

  1. Read Client asks whether there is a SegmentId in each storage tier according to the priority. If exists, it reads the data directly from this storage tier. If not exists, it asks for the next priority tier.

  2. Read the Segment data from the selected tier.

  3. After the Segment is read, read the next Segment and return to Step 1.

Memory Management

Writer-side Memory Management

Reasonable buffer allocation among multiple storage tiers can improve resource utilization. To allocate memory more reasonably, memory management is redesigned.

Original Hybrid Shuffle

In the current Hybrid Shuffle implementation, the total buffers in a LocalBufferPool can be divided into two parts. The first part is to accumulate buffers to accept the arriving data for each Subpartition, the second part is for the flushing IO. Assuming a job vertex has n Subpartitions, an upstream LocalBufferPool needs at least n+1 buffers: Each Subpartition needs at least one buffer to accumulate the arriving data. Otherwise, upon data arriving to a Subpartition that does not hold a buffer, we would have to flush and release one of the unfinished buffers from other Subpartitions to make space for the new data, leading to frequent flushing of small pieces of data, thus low memory utilization. In addition, at least one extra buffer is required so that the producer task will not be blocked on the flushing IO. So, for Hybrid Shuffle, the number of the first part buffers is n, the second part is 1. 

New Hybrid Shuffle

Similarly, the total buffers in the new architecture also have two parts. The first part should also guarantee at least n buffers, the reason is that each Subpartition needs at least one buffer to accumulate data for the above similar reasons. The second part buffers are exclusive for each tier to improve the IO efficiency because 1) the memory tier should reserve a relatively large number of buffers if possible, to store Segment data and allow more data to be consumed by downstream directly. 2) the disk tier or the remote storage tier should also reserve a small number of buffers, so that it can improve the IO performance by flushing data in small batches. Therefore, the minimum number of buffers required for a LocalBufferPool is as follows:

Wi is the number of exclusive buffers allocated to tier i, and m is the total number of storage tiers.

The maximum number of buffers in a LocalBufferPool is not limited to use more buffers, which is similar to Hybrid Shuffle.  The buffers more than the minimum value will be competed by all the tiers.

The real number of buffers in a LocalBufferPool is dynamically changed according to the number of total available network buffers, but the range is between the minimum and the maximum.

Because the number of exclusive buffers Wi is exposed by calling a method in a tier, so it is decided by the corresponding tier and is independent with each other. For simplifying the implementation, we set W1=100, W2=10, W3=10 in the first version(The memory tier is allocated more buffers because we want to let more data write and consume in the memory tier). In the future, this can be configurable or dynamically determined if needed.

Limitations for Memory tier

When a task is finished, all the buffers should be released, otherwise, the buffers will keep occupying the memory and other tasks may have no more available buffers. For the disk tier or the remote tier, it can release the buffers by flushing data. Generally, The memory tier can also release the buffers because of downstream consumption. But if the downstream has not been started, the buffers have to wait to be released until the downstream start. However, the upstream can not release the resources because it is not consumed, the downstream can not start because of waiting resources, then a deadlock may occur. So we add a limit for the memory tier, the memory tier can only be used when the downstream can consume data. 

Reader-side Memory Management

Original Hybrid Shuffle

In the original Hybrid Shuffle, the reader-side uses InputChannels to read upstream data.

The reader-side needs at least n buffers (Assuming a job vertex has n InputChannels) because each InputChannel needs at least one buffer to accept the reading data. Otherwise, if the data is sent to an InputChannel without buffers, we have to wait for the other InputChannels to release buffers, leading to low performance. 

New Hybrid Shuffle

With the new architecture, the memory tier and the disk tier also use InputChannels to read data, while the remote storage tier directly read data from remote storage, instead of using InputChannels.

To unify the memory management mechanism with the upstream and make each tier decide the exclusive buffers in the tier, we propose to use the formula to manage buffers, 

Ri is the number of exclusive buffers allocated to tier i, and m is the total number of storage tiers. R1 and R2 can be 0 because the n has guaranteed that each InputChannel has allocated 1 buffer, and R3 is 1 since the storage tier requires at least 1 exclusive buffer.

The maximum buffers are also not limited, the reader side can use more buffers when the total network buffers are sufficient. The buffers more than the minimum value will be competed by all the tiers.

Memory Tier and Disk Tier

Compared with the current Hybrid Shuffle, the writing data process to the memory tier and the local disk tier has changed. In the current Hybrid Shuffle, the data storage location may change, the location could move from in-memory ->flush to disk ->load back to memory. In the new architecture, the memory tier and the disk tier are decoupled. Each piece of data is only written to one of the storage tiers and the data location won't be changed once finished written.

Data location movement will cause the data buffer status to change from available ->unavailable ->available. These status changes may bring several negative effects. 1) Complex status changes require the heavy overhead of status synchronization between the memory and file data managers, which may reduce performance; 2) Buffer status changes will lead to inaccurate upstream backlog calculation and affect memory utilization[2]. After using the new architecture, the data storage location and the buffer available status will not change, so these Hybrid Shuffle issues are solved.

With the new architecture, we can still support the previous selective/full spilling strategies of Hybrid Shuffle. For selective spilling strategy, we decide whether the data should be consumed from memory or disk at the time data is written. For full spilling strategy, all data should go to the disk (or remote storage) tier, and the local memory tier is simply disabled.

Compared with the original Hybrid Shuffle,

  1. In terms of performance, it eliminates the complexity and heavy overhead of status synchronization between MemoryDataManager and FileDataManager.

  2. Fixing the inaccurate backlog calculation problem caused by data availability changes (data in memory may be flushed to the disk in the current Hybrid Shuffle).

Deciding which tier to write data upfront may have performance impacts, due to changes in chances for data to be consumed from memory directly. However, our evaluation on a PoC implementation shows that the performance differences can hardly be observed.

Remote Storage Tier

We propose that the remote storage tier read/write the shuffle data directly from the remote storage. The upstream may write the data to the remote storage directly when the remote storage is enabled, and the downstream will read for the remote storage and process it.

Supported Filesystems

Flink project has already supported various remote storages and all of them have the same FileSystem read/write interface. Based on the existing interface, the remote storage tier can support these remote storages naturally. 

Different remote storages have different characteristics, for example, OSS is more tolerant to small files[3], while HDFS can support appending to a file, etc. The design presented in the following sections are targeting object storage systems, which are good at storing massive small files / objects and do not support modifying after the written. In the future, the file organization can be pluggable and customized for more storages (e.g., HDFS) if necessary.

File Organization

We define the shuffle data files from a subpartition as a file group. The files in the group have the same specific parent path in remote storage. Downstream tasks will read the file from corresponding file groups. The downstream starts to consume data only when the segment ends, the end of the segment is determined by checking whether the segment file exists.

File Size

The shuffle data file with a smaller size will have better data visibility of downstream tasks, but a too-small file size may introduce extra index and query cost. We propose to write a separate file per Segment, for simplicity, by default use 4M as the file size in the first version. We can make it configurable if needed in the future.

File Path

We defined the path related to a file group as `HomePath/JobID/ResultPartitionID/SubpartitionID/`, which is globally unique to avoid conflicts between different tasks. The files under this path represent all shuffle data belonging to the subpartition. Each new segment will create a new file, named by segmentId, in the file group directory. The HomePath can be configured by `taskmanager.network.hybrid-shuffle.remote.path`.

Implementation Plan

Compared with the original Hybrid Shuffle, the new architecture changes a lot. To ensure that the existing Hybrid Shuffle will not be affected and the Master branch will remain in the Releaseable state, the new architecture will be developed independently with the current Hybrid Shuffle. according to the following development plan.

  1. Add a temporary config option. Users can switch between the new and old Hybrid Shuffle Mode through the added option.

  2. Implement a new Hybrid Shuffle supporting tiered storage architecture.

  3. Remove the original Hybrid Shuffle implementation and the temporary config option.

Compatibility, Deprecation, and Migration Plan

This is a compatible change and the existing Hybrid Shuffle will not be affected. For a more elegant upgrade, we will implement the new architecture according to the above implementation plan.

Test Plan

The changes will be covered by unit, integration and e2e test cases.

Appendix

[1] In some cases, Pods can also share the storage space of their hosts. For example, Volume uses the hostPath mounting method. However, this method has security, isolation, permissions, and other problems. It is rarely used in production environments, and the official does not recommend the usage way.

[2] https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control

[3] https://www.alibabacloud.com/help/en/object-storage-service/latest/oss-performance-optimization-overview OSS supports storing hundreds of millions of files. For small files, relatively good performance can also be guaranteed.