Status
Current state: [Under Discussion]
Discussion threads:
- FLIP-75 discussion about the initial design
- FLIP-102 discussion after splitting up FLIP-75 into sub-flips
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49
Proposed Changes
According to FLIP-49, we can sort out a table containing the correspondence between configuration and metric.
JVM | VM | Flink Compose | user conf key | resource key | metric max | metric used |
JVM | JVM Heap | FrameWork Heap | taskmanager.memory.framework.heap.size | resource.frameworkHeap | metrics.heapMax | metrics.heapUsed |
Task Heap | taskmanager.memory.task.heap.size | resource.taskHeap | ||||
JVM None-Heap | JVM MetaSpace | taskmanager.memory.jvm-metaspace.size | resource.jvmMetaSpace | metrics.nonHeapMax | metrics.nonHeapUsed | |
JVM Overhead | taskmanager.memory.jvm-overhead.min | resource.jvmOverhead | ||||
taskmanager.memory.jvm-overhead.max | ||||||
other | - | - | ||||
Outside JVM | Mapped | - | - | - | metrics.mappedMax | metrics.mappedUsed |
Direct | FrameWork OffHeap | taskmanager.memory.framework.off-heap.size | resource.frameworkOffHeap | metrics.directMax - metrics.shuffleMemoryTotal | metrics.directUsed - metrics.shuffleMemoryTotal | |
Task OffHeap Memory | taskmanager.memory.task.off-heap.size | resource.taskOffHeap | ||||
Shuffle Memory | taskmanager.memory.shuffle.min | resource.shuffleMemory | metrics.shuffleMemoryTotal | metrics.shuffleMemoryUsed | ||
taskmanager.memory.shuffle.max | ||||||
Flink Managed | Managed Memory | taskmanager.memory.managed.size | resource.managedMemory | metrics.managedMemoryTotal | metrics.managedMemoryUsed |
Frontend Design
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
REST API Design
- task manager's resource contains this information, show it in
- url: /taskmanagers/:taskmanagerid
response
{ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo", "properties" : { "id" : { "type" : "any" }, "path" : { "type" : "string" }, "dataPort" : { "type" : "integer" }, "timeSinceLastHeartbeat" : { "type" : "integer" }, "slotsNumber" : { "type" : "integer" }, "freeSlots" : { "type" : "integer" }, "hardware" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { "type" : "integer" }, "physicalMemory" : { "type" : "integer" }, "freeMemory" : { "type" : "integer" }, "managedMemory" : { "type" : "integer" } } }, "metrics" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo", "properties" : { "heapUsed" : { "type" : "integer" }, "heapCommitted" : { "type" : "integer" }, "heapMax" : { "type" : "integer" }, "nonHeapUsed" : { "type" : "integer" }, "nonHeapCommitted" : { "type" : "integer" }, "nonHeapMax" : { "type" : "integer" }, "directCount" : { "type" : "integer" }, "directUsed" : { "type" : "integer" }, "directMax" : { "type" : "integer" }, "mappedCount" : { "type" : "integer" }, "mappedUsed" : { "type" : "integer" }, "mappedMax" : { "type" : "integer" }, "memorySegmentsAvailable" : { "type" : "integer" }, "memorySegmentsTotal" : { "type" : "integer" }, "managedMemoryUsed" : { "type" : "long" }, "managedMemoryTotal" : { "type" : "long" }, "networkMemoryUsed" : { "type" : "long" }, "networkMemoryTotal" : { "type" : "long" }, "resource" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo", "properties" : { "frameworkHeap" : { "type" : "long" }, "frameworkOffHeap" : { "type" : "long" }, "taskHeap" : { "type" : "long" }, "taskOffHeap" : { "type" : "long" }, "networkMemory" : { "type" : "long" }, "managedMemory" : { "type" : "long" }, "jvmMetaSpace" : { "type" : "long" }, "jvmOverhead" : { "type" : "long" }, "totalProcessMemory" : { "type" : "long" } } }, "garbageCollectors" : { "type" : "array", "items" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo", "properties" : { "name" : { "type" : "string" }, "count" : { "type" : "integer" }, "time" : { "type" : "integer" } } } } } } } } |
Implementation Proposal
Step 1: Introduce new metric for memory usage of NetworkBufferPool
add shuffle memory's size metric
Keep in mind that we have to convert available memory into used memory somewhere later on!Add network memory memberspublic long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getAvailableMemorySize() { return getNumberOfAvailableMemorySegments() * memorySegmentSize; }
update
NettyShuffleMetricFactory#registerShuffleMetrics
private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_AVAILABLE_MEMORY_IN_BYTES = "AvailableMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_AVAILABLE_MEMORY_IN_BYTES, networkBufferPool::getAvailableMemorySize); }
Step 2: Introduce new metrics for Task's managed memory usage
We still have to discuss how to implement that in the right way. A brief proposal is the following one:
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
- add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
add default memory type in MemoryManager
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
add register ManagedMemoryUsage in TaskExecutor#requestSlot:
public long getManagedMemoryUsed() { return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE) ).sum(); }
add instantiateMemoryManagerMetrics in MetricUtils
public static void instantiateMemoryManagerMetrics( MetricGroup statusMetricGroup, TaskExecutor taskExecutor) { checkNotNull(statusMetricGroup); MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory"); memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal); memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed); }
- register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid
- response: metric add managedMemoryTotal and managedMemoryUsed
Step 3: Expose new metrics to REST interface
TaskManagerDetailsHandler#handleRequest
can be extended to cover also the newly added metrics:
TaskManagerMetricsInfo
needs to be extended adding members for the newly added fields:TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore)
can be used to initialize the fields:
Test Plan
Existing tests are updated to verify feature.