...
Flink Memory Model | Flink configuration1 | Effective Configuration REST API2 | Metric3 | Used key | Total key | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Framework Heap | taskmanager.memory.framework.heap.size | memoryConfiguration.frameworkHeap | Status.JVM.Memory.Heap | Used | Max | ||||||||||
Task Heap | taskmanager.memory.task.heap.size | memoryConfiguration.taskHeap | |||||||||||||
Framework OffHeap | taskmanager.memory.framework.off-heap.size | memoryConfiguration.frameworkOffHeap | - | - | - | ||||||||||
Task OffHeap | taskmanager.memory.task.off-heap.size | memoryConfiguration.taskOffHeap | |||||||||||||
Network Memory | memoryConfiguration.networkMemory | Status.Shuffle.Netty | UsedMemory | TotalMemory | |||||||||||
Managed Memory | taskmanager.memory.managed.size | memoryConfiguration.managedMemory | Status.ManagedMemoryFlink.Memory.Managed | (is up for discussion, still) | Used | Total(is up for discussion, still) | |||||||||
JVM Metaspace | taskmanager.memory.jvm-metaspace.size | memoryConfiguration.jvmMetaspace | Status.JVM.Memory.Metaspace | Used | Max | ||||||||||
JVM Overhead | memoryConfiguration.jvmOverhead | - | - | - |
...
add shuffle memory's size metric
Code Block public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }language java update
NettyShuffleMetricFactory#registerShuffleMetrics
Code Block private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }language java
Step 3: Introduce new metrics for Task's managed memory usage
...
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
...
add default memory type in MemoryManager
Code Block | ||
---|---|---|
| ||
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP; |
add register ManagedMemoryUsage in TaskExecutor#requestSlot:
Code Block |
---|
public long getManagedMemoryUsed() {
return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
slot ->
slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) -
slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
).sum();
} |
add instantiateMemoryManagerMetrics in MetricUtils
...
language | java |
---|
...
TaskExecutor
...
.
...
Step 4: Add Metaspace metrics
...