...
FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49.
The memory model which is exposed through the configuration parameters should be visualized in the same way in the TaskManager's details.
Proposed Changes
According to FLIP-49, we can sort out a table containing the correspondence between configuration and metriccorrelate the configuration parameters and the metrics partially.
JVM Metrics
These JVM metrics are exposed and can be used through the TaskManager's metrics REST API.
...
Flink Memory Model | Flink configuration1 | Effective Configuration REST API2 | Metric3 | Used key | Total key | ||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Framework Heap | taskmanager.memory.framework.heap.size | memoryConfiguration.frameworkHeap | Status.JVM.Memory.Heap | Used | Max | ||||||||
Task Heap | taskmanager.memory.task.heap.size | memoryConfiguration.taskHeap | |||||||||||
Framework OffHeap | taskmanager.memory.framework.off-heap.size | memoryConfiguration.frameworkOffHeap | - | - | - | ||||||||
Task OffHeap | taskmanager.memory.task.off-heap.size | memoryConfiguration.taskOffHeap | |||||||||||
Network Memory | memoryConfiguration.networkMemory | Status.Shuffle.Netty | UsedMemory | TotalMemory | |||||||||
Managed Memory | taskmanager.memory.managed.size | memoryConfiguration.managedMemory | Status.ManagedMemory | (is up for discussion, still) | (is up for discussion, still) | ||||||||
JVM Metaspace | taskmanager.memory.jvm-metaspace.size | memoryConfiguration.jvmMetaspace | Status.JVM.Memory.Metaspace | Used | Max | ||||||||
JVM Overhead | memoryConfiguration.jvmOverhead | - | - | - |
...
- task manager's resource contains this information, show it in
url: /taskmanagers/:taskmanagerid
Code Block language js title JSON Schema of response collapse true { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo", "properties" : { "id" : { "type" : "any" }, "path" : { "type" : "string" }, "dataPort" : { "type" : "integer" }, "timeSinceLastHeartbeat" : { "type" : "integer" }, "slotsNumber" : { "type" : "integer" }, "freeSlots" : { "type" : "integer" }, "hardware" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { "type" : "integer" }, "physicalMemory" : { "type" : "integer" }, "freeMemory" : { "type" : "integer" }, "managedMemory" : { "type" : "integer" } } }, "memoryConfiguration" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration", "properties" : { "frameworkHeap" : { "type" : "long" }, "frameworkOffHeap" : { "type" : "long" }, "taskHeap" : { "type" : "long" }, "taskOffHeap" : { "type" : "long" }, "networkMemory" : { "type" : "long" }, "managedMemory" : { "type" : "long" }, "jvmMetaspace" : { "type" : "long" }, "jvmOverhead" : { "type" : "long" }, "totalFlinkMemory" : { "type" : "long" } "totalProcessMemory" : { "type" : "long" } } }, "metrics" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo", "properties" : { "heapUsed" : { "type" : "integer" }, "heapCommitted" : { "type" : "integer" }, "heapMax" : { "type" : "integer" }, "metaspaceUsed" : { "type" : "integer" }, "metaspaceCommitted" : { "type" : "integer" }, "metaspaceMax" : { "type" : "integer" }, "nonHeapUsed" : { "type" : "integer" }, "nonHeapCommitted" : { "type" : "integer" }, "nonHeapMax" : { "type" : "integer" }, "directCount" : { "type" : "integer" }, "directUsed" : { "type" : "integer" }, "directMax" : { "type" : "integer" }, "mappedCount" : { "type" : "integer" }, "mappedUsed" : { "type" : "integer" }, "mappedMax" : { "type" : "integer" }, "memorySegmentsAvailable" : { "type" : "integer" }, "memorySegmentsTotal" : { "type" : "integer" }, "managedMemoryUsed" : { "type" : "long" }, "managedMemoryTotal" : { "type" : "long" }, "networkMemoryUsed" : { "type" : "long" }, "networkMemoryTotal" : { "type" : "long" }, "garbageCollectors" : { "type" : "array", "items" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo", "properties" : { "name" : { "type" : "string" }, "count" : { "type" : "integer" }, "time" : { "type" : "integer" } } } } } } } }
Implementation Proposal
Step 1: Expose effective configuration parameters of
...
TaskExecutorn Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14435
server | ASF JIRA |
---|---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
key | FLINK-14435 |
TaskManagerResourceInfo
is introduced as a POJO
...
TaskManagerResourceInfo
is introduced as a POJO containing the relevant values proposed in the REST response.- The
TaskManagerResourceInfo
is initialized when initializing theTaskExecutor
in the same way as we do it with theHardwareDescription
. It will be handed over in the same way throughTaskExecutorRegistry
→WorkerRegistration
. - The
TaskManagerResourceInfo
will be added along withHardwareDescription
inResourceManager::requestTaskManagerInfo(ResourceId, Time)
.
Step 2: Introduce new metric for memory usage of
...
NetworkBufferPool Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14422
server | ASF JIRA |
---|---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
key | FLINK-14422 |
add shuffle memory's size metric
Code Block language java public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }
update
NettyShuffleMetricFactory#registerShuffleMetrics
Code Block language java private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }
Step 3: Introduce new metrics for Task's managed memory
...
usage Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14406
server | ASF JIRA |
---|---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
key | FLINK-14406 |
We
...
We still have to discuss how to implement that in the right way. A brief proposal is the following one:
...
add instantiateMemoryManagerMetrics in MetricUtils
Code Block language java public static void instantiateMemoryManagerMetrics( MetricGroup statusMetricGroup, TaskExecutor taskExecutor) { checkNotNull(statusMetricGroup); MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory"); memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal); memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed); }
- register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid
- response: metric add managedMemoryTotal and managedMemoryUsed
Step 4:
...
Add Metaspace metrics Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-19617
server | ASF JIRA |
---|---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
key | FLINK-19617 |
There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the {{/taskmanagers/metrics}} REST API.
Step 5: Update TaskManager details pages
The web UI has to be updated as proposed above.
Follow-Ups
- Create a separate independent endpoint for the effective memory configuration.
...
TaskManagerDetailsHandler#handleRequest
can be extended to cover also the newly added metrics:
TaskManagerMetricsInfo
needs to be extended adding members for the newly added fields:TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore)
can be used to initialize the fields.
Follow-Ups
- The
/metrics
endpoint of eachTaskManager
could be split up into multiple endpoints: /configuration
containing the static values like configuration and hardware description
/metrics
containing the volatile values like the metrics and slot informationTest Plan
Existing tests are updated to verify feature.