THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
/* Add memory allocator related options in flink connector. */ public class FlinkConnectorOptions { /* Sink writer allocate segments from managed memory. */ public static final ConfigOption<Boolean> SINK_USE_MANAGED_MEMORY = ConfigOptions.key("sink.use-managed-memory-allocator") .booleanType() .defaultValue(false) .withDescription( "If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator."); /** * Weight of writer buffer in managed memory, Flink will compute the memory size for writer according * to the weight, the actual memory used depends on the running environment. */ public static final ConfigOption<Integer> SINK_MANAGED_WRITER_BUFFER_WEIGHT = ConfigOptions.key("sink.managed.writer-buffer-weight") .intType() .defaultValue(256) .withDescription("Weight of writer buffer in managed memory, Flink will compute the memory size " + "for writer according to the weight, the actual memory used depends on the running environment."); } /* Memory allocator for flink task which allocates segments from MemoryManager. */ public class FlinkMemorySegmentAllocator implements MemoryAllocator { /* The flink task which will be used to allocate and release segments. */ private final Object owner; /* The memory manager from flink task. */ private final MemoryManager memoryManager; /* Allocate segments from memory manager. */ void allocate( Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException { ...; memoryManager.allocatePages(owner, target, numOfPages); ...; } /* Release the segments allocated by the allocator if the task is closed. */ void release(); } /* Memory segment wrapper for flink. */ public class FlinkMemorySegment implements MemorySegment { org.apache.flink.core.memory.MemorySegment segment; public FlinkMemorySegment(MemorySegment segment); } |
...