Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Code Block
languagejava
/* Add memory segment pool to writer builder. */
@Public
public interface WriteBuilder {
    WriterBuilder withMemoryPool(MemorySegmentPool pool);
}

MemorySegment

MemorySegment is a final class in Paimon at the moment and task may allocates existing segment instances from memory manager in different engine (Flink/Spark). Paimon needs to define MemorySegment as an interface to facilitate adapting memory segments from different computing engines to Paimon's segmentsPaimon can create MemorySegment based on segment allocated by engine, for example, the org.apache.flink.core.memory.MemorySegment in Flink. Paimon needs to get off-heap-buffer or heap-memory from those segments, they are allocated and released by engines. Paimon cannot release this memory, otherwise it may cause multiple release issues. From above all, we remove the free and allocateOffHeapMemory methods in Paimon's MemorySegment and add description in wrapOfsetHeapMemory method.

Code Block
languagejava
@Public
/**
 * Paimon can allocate heap memory itself, and when the operator exits, JVM will release the allocated heap memory. 
 * But for off-heap memory, Paimon cannot release the memory itself. Engines such as Flink and Spark should allocate 
 * off-heap memory for Paimon, and release the memory when the task goes to termination.
 */
public final interfaceclass MemorySegment {
    /* Remove this method, Paimon cannot free memory itself.... */
    public static MemorySegment allocateOffHeapMemory(int size);

    /* Remote this method, Paimon cannot free memory itself. */
    public void free();
}

Proposed Changes

Architecture

...

Code Block
languagejava
/* Add memory allocator related options in flink connector. */
public class FlinkConnectorOptions {
    /* Sink writer allocate segments from managed memory. */
    public static final ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY =
        ConfigOptions.key("sink.use-managed-memory-allocator")
                .booleanType()
                .defaultValue(false)
                .withDescription(
                        "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.");

    /**
     * Weight of writer buffer in managed memory, Flink will compute the memory size for writer according 
     * to the weight, the actual memory used depends on the running environment.
     */
    public static final ConfigOption<Integer>ConfigOption<MemorySize> SINK_MANAGED_WRITER_BUFFER_WEIGHTMEMORY =
        ConfigOptions.key("sink.managed.writer-buffer-weightmemory")
                .intTypememoryType()
                .defaultValue(MemorySize.ofMebiBytes(256))
                .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size " +
                     "for writer according to the weight, the actual memory used depends on the running environment.");
}

/* Memory allocator for flink task which allocates segments from MemoryManager. */
public class FlinkMemorySegmentAllocator implements MemoryAllocator {
    /* The flink task which will be used to allocate and release segments. */
    private final Object owner;
    /* The memory manager from flink task. */
    private final MemoryManager memoryManager;
    
    /* Allocate segments from memory manager. */
    void allocate(
            Collection<MemorySegment> target,
            int numberOfPages) throws MemoryAllocationException {
        ...;
        memoryManager.allocatePages(owner, target, numOfPages);
        ...;
    }
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

/* Memory segment wrapper for flink. */
public class FlinkMemorySegment implements MemorySegment {
    org.apache.flink.core.memory.MemorySegment segment;
    
    public FlinkMemorySegment(MemorySegment segment);
}

Use Allocator In SQL

Users can set memory weight in SQL for Flink Managed Memory, then Flink sink operator will get the memory pool size  and create allocator for Paimon writer.

Code Block
languagejava
INSERT INTO paimon_table /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-weightmemory'='256256M') */
	SELECT * FROM ....;

...

In compilation, Paimon creates sink operator in FlinkSink. Paimon can get Transformation from the sink operator, declare managed memory by Transformation.declareManagedMemoryUseCaseAtOperatorScope(OPERATOR, weight of buffer)  according to options sink.use-menaged-memory-allocator and sink.managed.writer-buffer-weightmemory .

In runtime, Paimon sink operator calculates writer buffer size based on StreamConfig and MemoryManager in AbstractStoreWriteOperator.setup , then creates FlinkMemorySegmentAllocator and MemorySegmentPool . Finally Paimon writer writes data to the memory pool and flush data to file system.

draw.io Diagram
bordertrue
diagramNamecreate allocator for flink
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth941
revision1

Default Memory Segment And Allocator

If users do not config allocator, Paimon writer will create default allocator to allocate DefaultMemorySegment  from JVM heap as it is doing now.

...

languagejava

...