Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To address the above issues, this PIP aims to support writer buffer based on Executor Memory Manager  (such as Flink Managed Memory) for Paimon. Executor in engine manages the total memory usage for tasks to avoid FullGC or task killed due to excessive memory usage.

Public Interfaces

MemorySegmentAllocator

MemorySegmentAllocator is added in Paimon to allocate and release segments for MemorySegmentPool . Paimon sink creates allocator according to job configuration to allocate segments from Executor Memory Manager or physical memory, such as heap.

Code Block
languagejava
/**
 * Memory segment allocator will allocate segments from physical memory or memory manager in executor.
 **/
@Public
public interface MemorySegmentAllocator {
    /**
     * Allocates a set of memory segments for memory pool.
     *
     * @param target The list into which to put the allocated memory pages.
     * @param numberOfPages The number of pages to allocate.
     * @throws MemoryAllocationException Thrown, if this memory manager does not
     *     have the requested amount of memory pages any more.
     */
    void allocate(
        Collection<MemorySegment> target,
        int numberOfPages) throws MemoryAllocationException;
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

Paimon sink needs to create memory pool based on allocator above, the memory pool size is from existing option write-buffer-size or calculated by sink operator. So MemorySegmentPool will be created according to allocator and memory pool size when the sink operator is initialized, and then the memory pool is added in WriterBuilder to create Paimon writer.

Code Block
languagejava
/* Add memory segment pool to writertable builderwrite. */
@Public
public interface WriteBuilderTableWrite {
     WriterBuilderTableWrite withMemoryPool(MemorySegmentPool pool);
}

...

Paimon can create MemorySegment based on segment allocated by engine, for example, the org.apache.flink.core.memory.MemorySegment in Flink. Paimon needs to get off-heap-buffer or heap-memory from those segments, they are allocated and released by engines. Paimon cannot release this memory, otherwise it may cause multiple release issues. From above all, we remove the free and allocateOffHeapMemory methods in Paimon's MemorySegment and add description in wrapOfsetHeapMemory method.

Code Block
languagejava
@Public
/**
 * Paimon can allocate heap memory itself, and when the operator exits, JVM will release the allocated heap memory. 
 * But for off-heap memory, Paimon cannot release the memory itself. Engines such as Flink and Spark should allocate 
 * off-heap memory for Paimon, and release the memory when the task goes to termination.
 */ 
@Public
public final class MemorySegment {
    /* Remove this method, Paimon cannot free memory itself. */
    public static MemorySegment allocateOffHeapMemory(int size);

    /* Remote this method, Paimon cannot free memory itself. */
    public void free();
}

...

As shown in above architecture, Paimon task creates different memory segment allocators and memory pool for writer according to options when it is initialized. Each writer allocates segments from the allocator and release the memory when the task is finished.

MemorySegmentAllocator

MemorySegmentAllocator is added to allocate and release segments for MemorySegmentPool from Flink. Paimon sink creates allocator according to job configuration to allocate segments from Flink Memory Manager.

Code Block
languagejava
/**
 * Memory segment allocator will allocate segments from memory manager in Flink.
 **/
@Public
public class MemorySegmentAllocator {
    /* Memory manager in Flink. */
    private final MemoryManager memoryManager;

    /**
     * Allocates a set of memory segments for memory pool.
     *
     * @param target The list into which to put the allocated memory pages.
     * @param numberOfPages The number of pages to allocate.
     * @throws MemoryAllocationException Thrown, if this memory manager does not
     *     have the requested amount of memory pages any more.
     */
    void allocate(
        Collection<MemorySegment> target,
        int numberOfPages) throws MemoryAllocationException;
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

Options And Allocator Definitions

...