Status
Current state: [Under Discussion]
Discussion threads:
- FLIP-75 discussion about the initial design
- FLIP-102 discussion after splitting up FLIP-75 into sub-flips
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49
Proposed Changes
According to FLIP-49, we can sort out a table containing the correspondence between configuration and metric.
JVM | Flink Compose0 | user conf key1 | configuration key2 | metric max3 | metric used3 | |
Heap | FrameWork Heap | memoryConfiguration.frameworkHeap | Status.JVM.Memory.Heap.Max | Status.JVM.Memory.Heap.Used | ||
Task Heap | memoryConfiguration.taskHeap | |||||
Off-Heap | Native | Managed Memory | memoryConfiguration.managedMemory | Status.ManagedMemory.Total | Status.ManagedMemory.Used | |
Direct | FrameWork OffHeap | memoryConfiguration.frameworkOffHeap | Status.JVM.Memory.Direct.TotalCapacity - Status.Shuffle.Netty.TotalMemory | Status.JVM.Memory.Direct.MemoryUsed - Status.Shuffle.Netty.UsedMemory | ||
Task OffHeap Memory | memoryConfiguration.taskOffHeap | |||||
Network Memory | memoryConfiguration.networkMemory | Status.Shuffle.Netty.TotalMemory | Status.Shuffle.Netty.UsedMemory | |||
Mapped | - | - | - | Status.JVM.Memory.Mapped.TotalCapacity | Status.JVM.Memory.Mapped.MemoryUsed | |
Metaspace | JVM Metaspace | memoryConfiguration.jvmMetaspace | Status.JVM.Memory.Metaspace.Max | Status.JVM.Memory.Metaspace.Used | ||
Overhead | JVM Overhead | memoryConfiguration.jvmOverhead | Status.JVM.Memory.NonHeap.Max | Status.JVM.Memory.NonHeap.Used | ||
0 The partitioning how it is described in org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec
.
1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory
and totalProcessMemory
are exposed through the REST API.
3 The metrics which are exposed through the metrics endpoint.
Frontend Design (out-dated)
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
REST API Design
- task manager's resource contains this information, show it in
url: /taskmanagers/:taskmanagerid
Implementation Proposal
Step 1: Expose configuration parameters of TaskExecutor
TaskManagerResourceInfo
is introduced as a POJO containing the relevant values proposed in the REST response.- The
TaskManagerResourceInfo
is initialized when initializing theTaskExecutor
in the same way as we do it with theHardwareDescription
. It will be handed over in the same way throughTaskExecutorRegistry
→WorkerRegistration
. - The
TaskManagerResourceInfo
will be added along withHardwareDescription
inResourceManager::requestTaskManagerInfo(ResourceId, Time)
.
Step 2: Introduce new metric for memory usage of NetworkBufferPool
add shuffle memory's size metric
public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }
update
NettyShuffleMetricFactory#registerShuffleMetrics
private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }
Step 3: Introduce new metrics for Task's managed memory usage
We still have to discuss how to implement that in the right way. A brief proposal is the following one:
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
- add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
add default memory type in MemoryManager
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
add register ManagedMemoryUsage in TaskExecutor#requestSlot:
public long getManagedMemoryUsed() { return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE) ).sum(); }
add instantiateMemoryManagerMetrics in MetricUtils
public static void instantiateMemoryManagerMetrics( MetricGroup statusMetricGroup, TaskExecutor taskExecutor) { checkNotNull(statusMetricGroup); MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory"); memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal); memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed); }
- register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid
- response: metric add managedMemoryTotal and managedMemoryUsed
Step 4: Expose new metrics to REST interface
TaskManagerDetailsHandler#handleRequest
can be extended to cover also the newly added metrics:
TaskManagerMetricsInfo
needs to be extended adding members for the newly added fields:TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore)
can be used to initialize the fields.
Follow-Ups
- The
/metrics
endpoint of eachTaskManager
could be split up into multiple endpoints:/configuration
containing the static values like configuration and hardware description/metrics
containing the volatile values like the metrics and slot information
Test Plan
Existing tests are updated to verify feature.