Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread
Vote thread
Issue
Release-

Motivation

Each Paimon task in Flink/Spark job will create TableWriteImpl for Paimon Table, which has a uses FileStoreWrite to write data in to file store. Paimon will create KeyValueFileStoreWrite for key-value file store and initialize memory pool for the writer.

KeyValueFileStoreWrite create a MergeTreeWriter for each bucket according to data partition and bucket number in each partition, there are multiple writers in one Paimon sink task KeyValueFileStoreWrite. MergeTreeWriter has a writer buffer pool, it writes data to the buffer pool first, and flush the data to file system when the buffer is full. The writers in one task will share the same memory pool, and the memory architecture of writers are as follows.

...

2. Each task allocates and manages its own memory pool, if there are too many tasks in one Executor, it may cause performance issues and even job failoverOOM.

To address the above issues, this PIP aims to support writer buffer based on Executor Memory Manager  (such as Flink Managed Memory) for Paimon. Executor in engine manages the total memory usage for tasks to avoid FullGC or task killed due to excessive memory usage.

...

Code Block
languagejava
/**
 * Memory segment allocator will allocate segments from physical memory or memory manager in executor.
 **/
@Public
public interface MemorySegmentAllocator {
    /**
     * Allocates a set of memory segments fromfor memory pool.
     *
     * @param target The list into which to put the allocated memory pages.
     * @param numberOfPages The number of pages to allocate.
     * @throws MemoryAllocationException Thrown, if this memory manager does not
     *     have the requested amount of memory pages any more.
     */
    void allocate(
        Collection<MemorySegment> target,
        int numberOfPages) throws MemoryAllocationException;
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

Paimon sink needs to create memory pool based on allocator above, the memory pool size is from existing option write-buffer-size or calculated by sink operator. So MemorySegmentPool will be created according to allocator and memory pool size when the sink operator is initialized, and then  the the memory pool is added in WriterBuilder to create Paimon writer.

...

MemorySegment is a final class in Paimon at the moment and task may allocates existing segment instances from memory manager in different engine (Flink/Spark). So Paimon needs to define MemorySegment  as an interface to facilitate adapting memory segments from different computing engines to Paimon's segments.

...

As shown in above architecture, Paimon task creates different memory segment allocators and memory pool for writer according to options when it is initializinginitialized. Each writer allocates segments from the allocator and release the memory when the task is finished.

...

FlinkMemorySegmentAllocator allocates segments from ManagedMemory Managed Memory for Paimon writer. This mechanism has the following constraints

1. Option write-buffer-size is ignored and the memory pool will use the memory size calculated from MemoryManager in flinkFlink.

2. Option page-size is ignored and the memory pool will use Flink segment size.

...

Code Block
languagejava
/* Add memory allocator related options in flink connector. */
public class FlinkConnectorOptions {
    /* Sink writer allocate segments from managed memory. */
    public static final ConfigOption<Integer>ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY =
        ConfigOptions.key("sink.use-menagedmanaged-memory-allocator")
                .booleanType()
                .defaultValue(false)
                .withDescription(
                        "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.");

    /* Weight of writer buffer in managed memory, Flink will compute the memory size for writer according to the weight, the actual memory used depends on the running environment. */
    public static final ConfigOption<Integer> SINK_MANAGED_WRITER_BUFFER_WEIGHT =
        ConfigOptions.key("sink.managed.writer-buffer-weight")
                .intType()
                .defaultValue(256)
                .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size for writer according to the weight, the actual memory used depends on the running environment.");
}

/* Memory allocator for flink task which allocates segments from MemoryManager. */
public class FlinkMemorySegmentAllocator implements MemoryAllocator {
    /* The flink task which will be used to allocate and release segments. */
    private final Object owner;
    /* The memory manager from flink task. */
    private final MemoryManager memoryManager;
    
    /* Allocate segments from memory manager. */
    void allocate(
            Collection<MemorySegment> target,
            int numberOfPages) throws MemoryAllocationException {
        ...;
        memoryManager.allocatePages(owner, target, numOfPages);
        ...;
    }
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

/* Memory segment wrapper for flink. */
public class FlinkMemorySegment implements MemorySegment {
    org.apache.flink.core.memory.MemorySegment segment;
    
    public FlinkMemorySegment(MemorySegment segment);
}

...

Users can set memory weight in SQL for Flink Managed Memory, Paimon writer then Flink sink operator will get the memory pool size from sink operatorsize  and create allocator for Paimon writer.

Code Block
languagejava
INSERT INTO paimon_table /*+ OPTIONS('sink.use-menagedmanaged-memory-allocator'='true', 'sink.managed.writer-buffer-weight'='256') */
	SELECT * FROM ....;

The process of using a Managed Memory Allocator in Flink is divided into two parts: compilation and runtime.

In compilation, Flink Paimon sink creates sink operator in FlinkSink. Paimon sink can get Transformation from the sink operator, and declare managed memory by Transformation.declareManagedMemoryUseCaseAtOperatorScope(OPERATOR, weight of buffer)  according to options sink.use-menaged-memory-allocator and sink.managed.writer-buffer-weight  .

In runtime, Paimon sink operator calculates writer buffer size based on StreamConfig and MemoryManager in AbstractStoreWriteOperator.setup , then creates FlinkMemorySegmentAllocator and MemorySegmentPool . Finally Paimon writer writes data to the memory pool and flush data to file system.

...

Default Memory Segment And Allocator

If users don't do not config allocator, Paimon writer will create default allocator to allocate DefaultMemorySegment  from JVM heap as it is doing now.

Code Block
languagejava
/* Default allocator for paimon which allocates segments from heap. */
public class HeapMemorySegmentAllocator implements MemorySegmentAllocator {
}

/* Default memory segment for paimon. */
public class DefaultMemorySegment implements MemorySegment {
}

...