Discussion thread | https://lists.apache.org/thread/dyq1jyrj8cqb26z84bhqr7xx1pn7ctj8 |
---|---|
Vote thread | |
Issue | |
Release | - |
Motivation
Each Paimon task in Flink/Spark job will create TableWriteImpl
for Paimon Table, which uses FileStoreWrite
to write data to file store. Paimon will create KeyValueFileStoreWrite
for key-value file store and initialize memory pool for the writer.
KeyValueFileStoreWrite
create MergeTreeWriter
for each bucket according to data partition and bucket number in each partition, there are multiple writers in one KeyValueFileStoreWrite
. MergeTreeWriter
has a writer buffer pool, it writes data to the buffer pool first, and flush the data to file system when the buffer is full. The writers in one task will share the memory pool, and the memory architecture of writers are as follows.
There are some problems in above memory architecture for writer buffer
1. Currently Paimon writer only supports heap which may cause fullgc and unstable writing data when it uses too much memory.
2. Each task allocates and manages its own memory pool, if there are too many tasks in one Executor
, it may cause performance issues and even OOM.
To address the above issues, this PIP aims to support writer buffer based on Executor Memory Manager
(such as Flink Managed Memory) for Paimon. Executor
in engine manages the total memory usage for tasks to avoid FullGC or task killed due to excessive memory usage.
Public Interfaces
Paimon sink needs to create memory pool based on allocator above, the memory pool size is from existing option write-buffer-size
or calculated by sink operator. So MemorySegmentPool
will be created according to allocator and memory pool size when the sink operator is initialized, and then the memory pool is added in WriterBuilder
to create Paimon writer.
/* Add memory segment pool to table write. */ @Public public interface TableWrite { TableWrite withMemoryPool(MemorySegmentPool pool); }
MemorySegment
Paimon can create MemorySegment
based on segment allocated by engine, for example, the org.apache.flink.core.memory.MemorySegment
in Flink. Paimon needs to get off-heap-buffer or heap-memory from those segments, they are allocated and released by engines. Paimon cannot release this memory, otherwise it may cause multiple release issues. From above all, we remove the free
and allocateOffHeapMemory
methods in Paimon's MemorySegment
and add description in wrapOfsetHeapMemory
method.
/** * Paimon can allocate heap memory itself, and when the operator exits, JVM will release the allocated heap memory. * But for off-heap memory, Paimon cannot release the memory itself. Engines such as Flink and Spark should allocate * off-heap memory for Paimon, and release the memory when the task goes to termination. */ @Public public final class MemorySegment { /* Remove this method, Paimon cannot free memory itself. */ public static MemorySegment allocateOffHeapMemory(int size); /* Remote this method, Paimon cannot free memory itself. */ public void free(); }
Proposed Changes
Architecture
As shown in above architecture, Paimon task creates different memory segment allocators and memory pool for writer according to options when it is initialized. Each writer allocates segments from the allocator and release the memory when the task is finished.
Memory Allocator For Flink
MemorySegmentAllocator
MemorySegmentAllocator
is added to allocate and release segments for MemorySegmentPool
from Flink. Paimon sink creates allocator according to job configuration to allocate segments from Flink Memory Manager
.
/** * Memory segment allocator will allocate segments from memory manager in Flink. **/ public class MemorySegmentAllocator { /* Memory manager in Flink. */ private final MemoryManager memoryManager; /** * Allocates a set of memory segments for memory pool. * * @param target The list into which to put the allocated memory pages. * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not * have the requested amount of memory pages any more. */ void allocate( Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException; /* Release the segments allocated by the allocator if the task is closed. */ void release(); }
Options And Allocator Definitions
MemorySegmentAllocator
allocates segments from Managed Memory
for Paimon writer. This mechanism has the following constraints
1. Option write-buffer-size
is ignored and the memory pool will use the memory size calculated from MemoryManager
in Flink.
2. Option page-size
is ignored and the memory pool will use Flink segment size.
When the task in Flink is closed, the allocator will release all segments it allocated.
/* Add memory allocator related options in flink connector. */ public class FlinkConnectorOptions { /* Sink writer allocate segments from managed memory. */ public static final ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY = ConfigOptions.key("sink.use-managed-memory-allocator") .booleanType() .defaultValue(false) .withDescription( "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator."); /** * Weight of writer buffer in managed memory, Flink will compute the memory size for writer according * to the weight, the actual memory used depends on the running environment. */ public static final ConfigOption<MemorySize> SINK_MANAGED_WRITER_BUFFER_MEMORY = ConfigOptions.key("sink.managed.writer-buffer-memory") .memoryType() .defaultValue(MemorySize.ofMebiBytes(256)) .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size " + "for writer according to the weight, the actual memory used depends on the running environment."); }
Use Allocator In SQL
Users can set memory weight in SQL for Flink Managed Memory, then Flink sink operator will get the memory pool size and create allocator for Paimon writer.
INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */ SELECT * FROM ....;
Use Allocator In Flink
The process of using Managed Memory Allocator in Flink is divided into two parts: compilation and runtime.
In compilation, Paimon creates sink operator in FlinkSink
. Paimon can get Transformation
from the sink operator, declare managed memory by Transformation.declareManagedMemoryUseCaseAtOperatorScope(OPERATOR, weight of buffer)
according to options sink.use-menaged-memory-allocator
and sink.managed.writer-buffer-memory
.
In runtime, Paimon sink operator calculates writer buffer size based on StreamConfig
and MemoryManager
in AbstractStoreWriteOperator.setup
, then creates FlinkMemorySegmentAllocator
and MemorySegmentPool
. Finally Paimon writer writes data to the memory pool and flush data to file system.