Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink Memory ModelFlink configuration1Effective Configuration REST API2Metric3Used keyTotal key
Framework Heaptaskmanager.memory.framework.heap.sizememoryConfiguration.frameworkHeapStatus.JVM.Memory.HeapUsedMax
Task Heaptaskmanager.memory.task.heap.sizememoryConfiguration.taskHeap
Framework OffHeaptaskmanager.memory.framework.off-heap.sizememoryConfiguration.frameworkOffHeap---
Task OffHeaptaskmanager.memory.task.off-heap.sizememoryConfiguration.taskOffHeap
Network Memory

taskmanager.memory.network.min

taskmanager.memory.network.max

memoryConfiguration.networkMemory

Status.Shuffle.Netty

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

UsedMemoryTotalMemory
Managed Memorytaskmanager.memory.managed.sizememoryConfiguration.managedMemory

Status.ManagedMemoryFlink.Memory.Managed

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406

(is up for discussion, still)


UsedTotal(is up for discussion, still)
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizememoryConfiguration.jvmMetaspaceStatus.JVM.Memory.MetaspaceUsedMax
JVM Overhead

taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

memoryConfiguration.jvmOverhead---

...

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }

  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }

Step 3: Introduce new metrics for Task's managed memory usage 

...

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

...

add default memory type in MemoryManager

Code Block
languagejava
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;

add register ManagedMemoryUsage in TaskExecutor#requestSlot:

Code Block
public long getManagedMemoryUsed() {
	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
		slot ->
			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - 
				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
	).sum();

}

add instantiateMemoryManagerMetrics in MetricUtils

...

languagejava

...

TaskExecutor

...

.

...

Step 4: Add Metaspace metrics 

...