Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/* Add memory allocator related options in flink connector. */
public class FlinkConnectorOptions {
    /* Sink writer allocate segments from managed memory. */
    public static final ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY =
        ConfigOptions.key("sink.use-managed-memory-allocator")
                .booleanType()
                .defaultValue(false)
                .withDescription(
                        "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.");

    /**
     * Weight of writer buffer in managed memory, Flink will compute the memory size for writer according 
     * to the weight, the actual memory used depends on the running environment.
     */
    public static final ConfigOption<Integer> SINK_MANAGED_WRITER_BUFFER_WEIGHT =
        ConfigOptions.key("sink.managed.writer-buffer-weight")
                .intType()
                .defaultValue(256)
                .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size " +
                     "for writer according to the weight, the actual memory used depends on the running environment.");
}

/* Memory allocator for flink task which allocates segments from MemoryManager. */
public class FlinkMemorySegmentAllocator implements MemoryAllocator {
    /* The flink task which will be used to allocate and release segments. */
    private final Object owner;
    /* The memory manager from flink task. */
    private final MemoryManager memoryManager;
    
    /* Allocate segments from memory manager. */
    void allocate(
            Collection<MemorySegment> target,
            int numberOfPages) throws MemoryAllocationException {
        ...;
        memoryManager.allocatePages(owner, target, numOfPages);
        ...;
    }
    
    /* Release the segments allocated by the allocator if the task is closed. */
    void release();
}

/* Memory segment wrapper for flink. */
public class FlinkMemorySegment implements MemorySegment {
    org.apache.flink.core.memory.MemorySegment segment;
    
    public FlinkMemorySegment(MemorySegment segment);
}

...