Status
...
Page properties | |
---|---|
|
...
|
...
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Flink Memory Model | Flink configuration1 | Effective Configuration REST API2 | Metric3 | Used key | Total key | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Framework Heap | taskmanager.memory.framework.heap.size | memoryConfiguration.frameworkHeap | Status.JVM.Memory.Heap | Used | Max | ||||||||||
Task Heap | taskmanager.memory.task.heap.size | memoryConfiguration.taskHeap | |||||||||||||
Framework OffHeap | taskmanager.memory.framework.off-heap.size | memoryConfiguration.frameworkOffHeap | - | - | - | ||||||||||
Task OffHeap | taskmanager.memory.task.off-heap.size | memoryConfiguration.taskOffHeap | |||||||||||||
Network Memory | memoryConfiguration.networkMemory | Status.Shuffle.Netty | UsedMemory | TotalMemory | |||||||||||
Managed Memory | taskmanager.memory.managed.size | memoryConfiguration.managedMemory | Status.Flink.Memory.ManagedMemoryManaged | (is up for discussion, still) | Used | Total(is up for discussion, still) | |||||||||
JVM Metaspace | taskmanager.memory.jvm-metaspace.size | memoryConfiguration.jvmMetaspace | Status.JVM.Memory.Metaspace | Used | Max | ||||||||||
JVM Overhead | memoryConfiguration.jvmOverhead | - | - | - |
1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory
and totalProcessMemory
are exposed through the REST API.
3 The metrics which are exposed through the TaskManager's metrics REST API.
Frontend Design
...
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
The previous metrics are moved into 'Advanced' since maybe some users still need them.
Detail view
REST API Design
...
Memory Configuration
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-14435
The TaskManager's memory configuration will be exposed through {{
...
/taskmanagers/:taskmanagerid}}. A proposed REST respond is shown in the code block below:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
"properties" : {
"id" : {
"type" : "any"
},
"path" : {
"type" : "string"
},
"dataPort" : {
"type" : "integer"
},
"timeSinceLastHeartbeat" : {
"type" : "integer"
},
"slotsNumber" : {
"type" : "integer"
},
"freeSlots" : {
"type" : "integer"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
"properties" : {
"cpuCores" : {
"type" : "integer"
},
"physicalMemory" : {
"type" : "integer"
},
"freeMemory" : {
"type" : "integer"
},
"managedMemory" : {
"type" : "integer"
}
}
},
"memoryConfiguration" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
"properties" : {
"frameworkHeap" : {
"type" : "long"
},
"frameworkOffHeap" : {
"type" : "long"
},
"taskHeap" : {
"type" : "long"
},
"taskOffHeap" : {
"type" : "long"
},
"networkMemory" : {
"type" : "long"
},
"managedMemory" : {
"type" : "long"
},
"jvmMetaspace" : {
"type" : "long"
},
"jvmOverhead" : {
"type" : "long"
},
"totalFlinkMemory" : {
"type" : "long"
}
"totalProcessMemory" : {
"type" : "long"
}
}
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
"properties" : {
"heapUsed" : {
"type" : "integer"
},
"heapCommitted" : {
"type" : "integer"
},
"heapMax" : {
"type" : "integer"
},
"metaspaceUsed" : {
"type" : "integer"
},
"metaspaceCommitted" : {
"type" : "integer"
},
"metaspaceMax" : {
"type" : "integer"
},
"nonHeapUsed" : {
"type" : "integer"
},
"nonHeapCommitted" : {
"type" : "integer"
},
"nonHeapMax" : {
"type" : "integer"
},
"directCount" : {
"type" : "integer"
},
"directUsed" : {
"type" : "integer"
},
"directMax" : {
"type" : "integer"
},
"mappedCount" : {
"type" : "integer"
},
"mappedUsed" : {
"type" : "integer"
},
"mappedMax" : {
"type" : "integer"
},
"memorySegmentsAvailable" : {
"type" : "integer"
},
"memorySegmentsTotal" : {
"type" : "integer"
},
"managedMemoryUsed" : {
"type" : "long"
},
"managedMemoryTotal" : {
"type" : "long"
},
"networkMemoryUsed" : {
"type" : "long"
},
"networkMemoryTotal" : {
"type" : "long"
},
"garbageCollectors" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
"properties" : {
"name" : {
"type" : "string"
},
"count" : {
"type" : "integer"
},
"time" : {
"type" : "integer"
}
}
}
}
}
}
}
} |
Metrics exposure
The newly introduced metrics can be accessed through the metrics REST endpoint.
Implementation Proposal
Step 1: Expose effective configuration parameters of TaskExecutorn
...
add shuffle memory's size metric
Code Block public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }language java update
NettyShuffleMetricFactory#registerShuffleMetrics
Code Block private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }language java
Step 3: Introduce new metrics for Task's managed memory usage
...
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
...
add default memory type in MemoryManager
Code Block | ||
---|---|---|
| ||
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP; |
add register ManagedMemoryUsage in TaskExecutor#requestSlot:
Code Block |
---|
public long getManagedMemoryUsed() {
return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
slot ->
slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) -
slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
).sum();
} |
add instantiateMemoryManagerMetrics in MetricUtils
...
language | java |
---|
...
TaskExecutor
...
.
...
Step 4: Add Metaspace metrics
...
Step 5: Update TaskManager's details page
Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-1532819764
The web UI has to be updated as proposed above.
...
- Create a separate independent endpoint for the effective memory configuration.
- Deprecate the metrics sub-record returned by
/taskmanagers/:taskmanagerid
. The metrics endpoint can be used instead. This would simplify theTaskManagerDetailsHandler
.
Test Plan
Existing tests are updated to verify feature.