Status
Current state: [Under Discussion]
...
Page properties | |||
---|---|---|---|
|
...
...
...
...
...
...
...
...
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49.
The memory model which is exposed through the configuration parameters should be visualized in the same way in the TaskManager's details.
Proposed Changes
According to FLIP-49, we can sort out a table containing the correspondence between configuration and metriccorrelate the configuration parameters and the metrics partially.
JVM Metrics
These JVM metrics are exposed and can be used through the TaskManager's metrics REST API.
JVM | VMMetric | Flink Compose | user conf key | resource key | metric max | metric used | JVM | JVM Heap | FrameWork Used key | Total key | |
---|---|---|---|---|---|---|---|---|---|---|---|
Heap | Status.JVM.Memory.Heap | Used | Max | ||||||||
Direct | Status.JVM.Memory.Direct | Used | Max | ||||||||
Metaspace | Status.JVM.Memory.Metaspace
| Used | Max | ||||||||
Mapped | Status.JVM.Memory.Mapped | MemoryUsed | TotalCapacity | ||||||||
NonHeap | Status.JVM.Memory.NonHeap | MemoryUsed | TotalCapacity |
Memory Configuration
Flink's memory model (as described in org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec
) can be mapped to the following Flink configuration parameters. There are a few that have a correlating Flink metric.
Flink Memory Model | Flink configuration1 | Effective Configuration REST API2 | Metric3 | Used key | Total key |
---|---|---|---|---|---|
Framework Heap | taskmanager.memory.framework.heap.size |
memoryConfiguration.frameworkHeap |
metrics.heapMax
Status.JVM.Memory.Heap | Used | Max |
Task Heap | taskmanager.memory.task.heap.size |
memoryConfiguration.taskHeap |
JVM None-Heap
JVM MetaSpace
Framework OffHeap | taskmanager.memory.framework. |
off- |
heap.size |
resource.jvmMetaSpace
metrics.nonHeapMax
metrics.nonHeapUsed
memoryConfiguration.frameworkOffHeap | - | - | - |
Task OffHeap | taskmanager.memory.task.off-heap.size | memoryConfiguration.taskOffHeap | |
Network Memory |
resource.jvmOverhead
other
-
-
Outside JVM
Mapped
-
-
-
metrics.mappedMax
metrics.mappedUsed
Direct
memoryConfiguration.networkMemory | Status.Shuffle.Netty | UsedMemory | TotalMemory | ||||||||
Managed Memory | taskmanager.memory. |
managed. |
size |
memoryConfiguration. |
metrics.directMax - metrics.shuffleMemoryTotal
metrics.directUsed - metrics.shuffleMemoryTotal
managedMemory | Status.Flink.Memory.Managed | Used | Total | ||||||||
JVM Metaspace |
taskmanager.memory. |
jvm- |
metaspace.size |
memoryConfiguration. |
Shuffle Memory
jvmMetaspace | Status.JVM.Memory.Metaspace | Used | Max |
JVM Overhead |
resource.shuffleMemory
metrics.shuffleMemoryTotal
Flink Managed
Managed Memory
taskmanager.memory.managed.size
resource.managedMemory
metrics.managedMemoryTotal
memoryConfiguration.jvmOverhead | - | - | - |
1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory
and totalProcessMemory
are exposed through the REST API.
3 The metrics which are exposed through the TaskManager's metrics REST API.
Frontend Design
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
The previous metrics are moved into 'Advanced' since maybe some users still need them.
Detail view
REST API Design
...
Memory Configuration
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14435
The TaskManager's memory configuration will be exposed through {{
...
/taskmanagers/:taskmanagerid
...
{
...
}}. A proposed REST respond is shown in the code block below:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
{ "type" : "object", |
...
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo", |
...
"properties" : { |
...
"id" : { |
...
"type" : "any" |
...
}, |
...
"path" : { |
...
"type" : "string" |
...
}, |
...
"dataPort" : { |
...
"type" : "integer" |
...
}, |
...
"timeSinceLastHeartbeat" : { |
...
"type" : "integer" |
...
}, |
...
"slotsNumber" : { |
...
"type" : "integer" |
...
}, |
...
"freeSlots" : { |
...
"type" : "integer" |
...
}, |
...
"hardware" : { |
...
"type" : "object", |
...
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", |
...
"properties" : { |
...
"cpuCores" : { |
...
"type" : "integer" |
...
}, |
...
"physicalMemory" : { |
...
"type" : "integer" |
...
}, |
...
"freeMemory" : { |
...
"type" : "integer" |
...
}, |
...
"managedMemory" : { |
...
"type" : "integer" |
...
} |
...
} |
...
}, |
...
"metrics" : {
"memoryConfiguration" : { "type" : "object", |
...
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager: |
...
TaskExecutorMemoryConfiguration", |
...
"properties" : { |
...
" |
...
frameworkHeap" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
frameworkOffHeap" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
taskHeap" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
taskOffHeap" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
networkMemory" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
managedMemory" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
jvmMetaspace" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
jvmOverhead" : { |
...
"type" : " |
...
long" |
...
}, |
...
" |
...
totalFlinkMemory" : { |
...
"type" : " |
...
long" |
...
} |
...
" |
...
totalProcessMemory" : { |
...
"type" : " |
...
long" |
...
} } |
...
"mappedUsed" : {
}, "metrics" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo", "properties" : { "heapUsed" : { "type" : "integer" |
...
}, |
...
" |
...
heapCommitted" : { |
...
"type" : "integer" |
...
}, |
...
" |
...
heapMax" : { |
...
"type" : "integer" |
...
}, |
...
"memorySegmentsTotal" : {
"type" : "integer"
},
"managedMemoryUsed" : {
"type" : "long"
},
"managedMemoryTotal" : {
"type" : "long"
},
"shuffleMemoryUsed" : {
"type" : "long"
},
"shuffleMemoryTotal" : {
"type" : "long"
},
"resource" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo",
"properties" : {
"frameworkHeap" : {
"type" : "long"
},
"frameworkOffHeap" : {
"type" : "long"
},
"taskHeap" : {
"type" : "long"
},
"taskOffHeap" : {
"type" : "long"
},
"shuffleMemory" : {
"type" : "long"
},
"managedMemory" : {
"type" : "long"
},
"jvmMetaSpace" : {
"type" : "long"
},
"jvmOverhead" : {
"type" : "long"
},
"totalProcessMemory" : {
"type" : "long"
}
}
},
"garbageCollectors" : {
"type" : "array",
"items" : {
"type" : "object",
...
"metaspaceUsed" : { "type" : "integer" }, "metaspaceCommitted" : { "type" : "integer" }, "metaspaceMax" : { "type" : "integer" }, "nonHeapUsed" : { "type" : "integer" }, "nonHeapCommitted" : { "type" : "integer" }, "nonHeapMax" : { "type" : "integer" }, "directCount" : { "type" : "integer" }, "directUsed" : { "type" : "integer" }, "directMax" : { "type" : "integer" }, "mappedCount" : { "type" : "integer" }, "mappedUsed" : { "type" : "integer" }, "mappedMax" : { "type" : "integer" }, "memorySegmentsAvailable" : { "type" : "integer" }, "memorySegmentsTotal" : { "type" : "integer" }, "managedMemoryUsed" : { "type" : "long" }, "managedMemoryTotal" : { "type" : "long" }, "networkMemoryUsed" : { "type" : "long" }, "networkMemoryTotal" : { "type" : "long" }, "garbageCollectors" : { "type" : "array", "items" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo", |
...
"properties" : { |
...
"name" : { |
...
"type" : "string" |
...
}, |
...
"count" : { |
...
"type" : "integer" |
...
}, |
...
"time" : { |
...
"type" : "integer" |
...
} |
...
} |
...
} |
...
} |
...
} |
...
} |
...
} |
...
} |
- add shuffle memory's size metric
- add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getAvaliableMemorySize() { return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize; } |
- update NettyShuffleMetricFactory#registerShuffleMetrics
private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY, networkBufferPool::getAvaliableMemorySize); } |
- taskmanager’s metrics add Status.Shuffle.Netty.TotalMemoryCapacity and Status.Shuffle.Netty.AvaliableMemory
- url: /taskmanagers/:taskmanagerid
- response: metric add shuffleMemoryTotal and shuffleMemoryUsed
- add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
- add default memory type in MemoryManager
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP; |
- add getManagedMemoryTotal in TaskExecutor:
public long getManagedMemoryTotal() { return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) ).sum(); } |
- add getManagedMemoryUsed in TaskExecutor:
public long getManagedMemoryUsed() { return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( slot -> slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)- slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE) ).sum(); } |
- add instantiateMemoryManagerMetrics in MetricUtils
...
public static void instantiateMemoryManagerMetrics(
MetricGroup statusMetricGroup,
TaskExecutor taskExecutor) {
checkNotNull(statusMetricGroup);
MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal);
memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
}
Metrics exposure
The newly introduced metrics can be accessed through the metrics REST endpoint.
Implementation Proposal
Step 1: Expose effective configuration parameters of TaskExecutorn
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14435
TaskManagerResourceInfo
is introduced as a POJO containing the relevant values proposed in the REST response.- The
TaskManagerResourceInfo
is initialized when initializing theTaskExecutor
in the same way as we do it with theHardwareDescription
. It will be handed over in the same way throughTaskExecutorRegistry
→WorkerRegistration
. - The
TaskManagerResourceInfo
will be added along withHardwareDescription
inResourceManager::requestTaskManagerInfo(ResourceId, Time)
.
Step 2: Introduce new metric for memory usage of NetworkBufferPool
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14422
add shuffle memory's size metric
update
NettyShuffleMetricFactory#registerShuffleMetrics
Step 3: Introduce new metrics for Task's managed memory usage
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14406
We still have to discuss how to implement that in the right way. A brief proposal is the following one:
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
Step 4: Add Metaspace metrics
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-19617
There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the /taskmanagers/metrics REST API.
Step 5: Update TaskManager's details page
Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-19764
The web UI has to be updated as proposed above.
Follow-Ups
- Create a separate independent endpoint for the effective memory configuration.
- Deprecate the metrics sub-record returned by
/taskmanagers/:taskmanagerid
. The metrics endpoint can be used instead. This would simplify theTaskManagerDetailsHandler
. - register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid response: metric add managedMemoryTotal and managedMemoryUsed
Test Plan
Existing tests are updated to verify feature.