...
Discussion thread | |
---|---|
Vote thread | |
Issue | |
Release | - |
Motivation
Each Paimon task in Flink/Spark job will create TableWriteImpl
for Paimon Table, which uses FileStoreWrite
to write data to file store. Paimon will create KeyValueFileStoreWrite
for key-value file store and initialize memory pool for the writer.
...
To address the above issues, this PIP aims to support writer buffer based on Executor Memory Manager
(such as Flink Managed Memory) for Paimon. Executor
in engine manages the total memory usage for tasks to avoid FullGC or task killed due to excessive memory usage.
Public Interfaces
MemorySegmentAllocator
MemorySegmentAllocator
is added in Paimon to allocate and release segments for MemorySegmentPool
. Paimon sink creates allocator according to job configuration to allocate segments from Executor Memory Manager
or physical memory, such as heap.
...
Code Block | ||
---|---|---|
| ||
/* Add memory segment pool to writer builder. */ @Public public interface WriteBuilder { WriterBuilder withMemoryPool(MemorySegmentPool pool); } |
MemorySegment
MemorySegment
is a final class in Paimon at the moment and task may allocates existing segment instances from memory manager in different engine (Flink/Spark). Paimon needs to define MemorySegment
as an interface to facilitate adapting memory segments from different computing engines to Paimon's segments.
Code Block | ||
---|---|---|
| ||
@Public public interface MemorySegment { .... } |
Proposed Changes
Architecture
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As shown in above architecture, Paimon task creates different memory segment allocators and memory pool for writer according to options when it is initialized. Each writer allocates segments from the allocator and release the memory when the task is finished.
Memory Allocator For Flink
Options And Allocator Definitions
FlinkMemorySegmentAllocator
allocates segments from Managed Memory
for Paimon writer. This mechanism has the following constraints
...
Code Block | ||
---|---|---|
| ||
/* Add memory allocator related options in flink connector. */ public class FlinkConnectorOptions { /* Sink writer allocate segments from managed memory. */ public static final ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY = ConfigOptions.key("sink.use-managed-memory-allocator") .booleanType() .defaultValue(false) .withDescription( "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator."); /** * Weight of writer buffer in managed memory, Flink will compute the memory size for writer according * to the weight, the actual memory used depends on the running environment. */ public static final ConfigOption<Integer> SINK_MANAGED_WRITER_BUFFER_WEIGHT = ConfigOptions.key("sink.managed.writer-buffer-weight") .intType() .defaultValue(256) .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size " + "for writer according to the weight, the actual memory used depends on the running environment."); } /* Memory allocator for flink task which allocates segments from MemoryManager. */ public class FlinkMemorySegmentAllocator implements MemoryAllocator { /* The flink task which will be used to allocate and release segments. */ private final Object owner; /* The memory manager from flink task. */ private final MemoryManager memoryManager; /* Allocate segments from memory manager. */ void allocate( Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException { ...; memoryManager.allocatePages(owner, target, numOfPages); ...; } /* Release the segments allocated by the allocator if the task is closed. */ void release(); } /* Memory segment wrapper for flink. */ public class FlinkMemorySegment implements MemorySegment { org.apache.flink.core.memory.MemorySegment segment; public FlinkMemorySegment(MemorySegment segment); } |
Use Allocator In SQL
Users can set memory weight in SQL for Flink Managed Memory, then Flink sink operator will get the memory pool size and create allocator for Paimon writer.
Code Block | ||
---|---|---|
| ||
INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-weight'='256') */ SELECT * FROM ....; |
Use Allocator In Flink
The process of using Managed Memory Allocator in Flink is divided into two parts: compilation and runtime.
...
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Default Memory Segment And Allocator
If users do not config allocator, Paimon writer will create default allocator to allocate DefaultMemorySegment
from JVM heap as it is doing now.
...