Status
Current state: [Under Discussion]
Discussion threads:
- FLIP-75 discussion about the initial design
- FLIP-102 discussion after splitting up FLIP-75 into sub-flips
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49
Proposed Changes
According to FLIP-49, we can sort out a table containing the correspondence between configuration and metric.
JVM | VM | Flink Compose | user conf key | resource key | metric max | metric used |
JVM | JVM Heap | FrameWork Heap | taskmanager.memory.framework.heap.size | resource.frameworkHeap | metrics.heapMax | metrics.heapUsed |
Task Heap | taskmanager.memory.task.heap.size | resource.taskHeap | ||||
JVM None-Heap | JVM MetaSpace | taskmanager.memory.jvm-metaspace.size | resource.jvmMetaSpace | metrics.nonHeapMax | metrics.nonHeapUsed | |
JVM Overhead | taskmanager.memory.jvm-overhead.min | resource.jvmOverhead | ||||
taskmanager.memory.jvm-overhead.max | ||||||
other | - | - | ||||
Outside JVM | Mapped | - | - | - | metrics.mappedMax | metrics.mappedUsed |
Direct | FrameWork OffHeap | taskmanager.memory.framework.off-heap.size | resource.frameworkOffHeap | metrics.directMax - metrics.shuffleMemoryTotal | metrics.directUsed - metrics.shuffleMemoryTotal | |
Task OffHeap Memory | taskmanager.memory.task.off-heap.size | resource.taskOffHeap | ||||
Shuffle Memory | taskmanager.memory.shuffle.min | resource.shuffleMemory | metrics.shuffleMemoryTotal | metrics.shuffleMemoryUsed | ||
taskmanager.memory.shuffle.max | ||||||
Flink Managed | Managed Memory | taskmanager.memory.managed.size | resource.managedMemory | metrics.managedMemoryTotal | metrics.managedMemoryUsed |
Frontend Design
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
REST API Design
- use TaskExecutorRegistration to expose task manager resource information that matches the memory composition
- task manager's resource contains this information, show it in
- url: /taskmanagers/:taskmanagerid
- response
{ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo", "properties" : { "id" : { "type" : "any" }, "path" : { "type" : "string" }, "dataPort" : { "type" : "integer" }, "timeSinceLastHeartbeat" : { "type" : "integer" }, "slotsNumber" : { "type" : "integer" }, "freeSlots" : { "type" : "integer" }, "hardware" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { "type" : "integer" }, "physicalMemory" : { "type" : "integer" }, "freeMemory" : { "type" : "integer" }, "managedMemory" : { "type" : "integer" } } }, "metrics" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo", "properties" : { "heapUsed" : { "type" : "integer" }, "heapCommitted" : { "type" : "integer" }, "heapMax" : { "type" : "integer" }, "nonHeapUsed" : { managedMemoryUsed "type" : "integer" }, "nonHeapCommitted" : { "type" : "integer" }, "nonHeapMax" : { "type" : "integer" }, "directCount" : { "type" : "integer" }, "directUsed" : { "type" : "integer" }, "directMax" : { "type" : "integer" }, "mappedCount" : { "type" : "integer" }, "mappedUsed" : { "type" : "integer" }, "mappedMax" : { "type" : "integer" }, "memorySegmentsAvailable" : { "type" : "integer" }, "memorySegmentsTotal" : { "type" : "integer" }, "managedMemoryUsed" : { "type" : "long" }, "managedMemoryTotal" : { "type" : "long" }, "networkMemoryUsed" : { "type" : "long" }, "networkMemoryTotal" : { "type" : "long" }, "resource" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo", "properties" : { "frameworkHeap" : { "type" : "long" }, "frameworkOffHeap" : { "type" : "long" }, "taskHeap" : { "type" : "long" }, "taskOffHeap" : { "type" : "long" }, "networkMemory" : { "type" : "long" }, "managedMemory" : { "type" : "long" }, "jvmMetaSpace" : { "type" : "long" }, "jvmOverhead" : { "type" : "long" }, "totalProcessMemory" : { "type" : "long" } } }, "garbageCollectors" : { "type" : "array", "items" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo", "properties" : { "name" : { "type" : "string" }, "count" : { "type" : "integer" }, "time" : { "type" : "integer" } } } } } } } } |
- add shuffle memory's size metric
- add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getAvaliableMemorySize() { return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize; } |
- update NettyShuffleMetricFactory#registerShuffleMetrics
private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY, networkBufferPool::getAvaliableMemorySize); } |
- taskmanager’s metrics add Status.Shuffle.Netty.TotalMemoryCapacity and Status.Shuffle.Netty.AvaliableMemory
- url: /taskmanagers/:taskmanagerid
- response: metric add shuffleMemoryTotal and shuffleMemoryUsed
- add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
- add default memory type in MemoryManager
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP; |
- add register ManagedMemoryUsage in TaskExecutor#requestSlot:
public long getManagedMemoryUsed() { return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)- slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE) ).sum(); } |
- add instantiateMemoryManagerMetrics in MetricUtils
public static void instantiateMemoryManagerMetrics( MetricGroup statusMetricGroup, TaskExecutor taskExecutor) { checkNotNull(statusMetricGroup); MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory"); memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal); memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed); } |
- register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid
- response: metric add managedMemoryTotal and managedMemoryUsed
Test Plan
Existing tests are updated to verify feature.