...
Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.
REST API Design
- use TaskExecutorRegistration to expose task manager resource information that matches the memory compositiontask manager's resource contains this information, show it in
- url: /taskmanagers/:taskmanagerid
response
{ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo", "properties" : { "id" : { "type" : "any" }, "path" : { "type" : "string" }, "dataPort" : { "type" : "integer" }, "timeSinceLastHeartbeat" : { "type" : "integer" }, "slotsNumber" : { "type" : "integer" }, "freeSlots" : { "type" : "integer" }, "hardware" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription", "properties" : { "cpuCores" : { "type" : "integer" }, "physicalMemory" : { "type" : "integer" }, "freeMemory" : { "type" : "integer" }, "managedMemory" : { "type" : "integer" } } }, "metrics" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo", "properties" : { "heapUsed" : { "type" : "integer" }, "heapCommitted" : { "type" : "integer" }, "heapMax" : { "type" : "integer" }, "nonHeapUsed" : { "type" : "integer" }, "nonHeapCommitted" : { "type" : "integer" }, "nonHeapMax" : { "type" : "integer" }, "directCount" : { "type" : "integer" }, "directUsed" : { "type" : "integer" }, "directMax" : { "type" : "integer" }, "mappedCount" : { "type" : "integer" }, "mappedUsed" : { "type" : "integer" }, "mappedMax" : { "type" : "integer" }, "memorySegmentsAvailable" : { "type" : "integer" }, "memorySegmentsTotal" : { "type" : "integer" }, "managedMemoryUsed" : { "type" : "long" }, "managedMemoryTotal" : { "type" : "long" }, "networkMemoryUsed" : { "type" : "long" }, "networkMemoryTotal" : { "type" : "long" }, "resource" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo", "properties" : { "frameworkHeap" : { "type" : "long" }, "frameworkOffHeap" : { "type" : "long" }, "taskHeap" : { "type" : "long" }, "taskOffHeap" : { "type" : "long" }, "networkMemory" : { "type" : "long" }, "managedMemory" : { "type" : "long" }, "jvmMetaSpace" : { "type" : "long" }, "jvmOverhead" : { "type" : "long" }, "totalProcessMemory" : { "type" : "long" } } }, "garbageCollectors" : { "type" : "array", "items" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo", "properties" : { "name" : { "type" : "string" }, "count" : { "type" : "integer" }, "time" : { "type" : "integer" } } } } } } } } |
Implementation Proposal
Step 1: Introduce new metric for memory usage of NetworkBufferPool
add shuffle memory's size metric
- add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
Keep in mind that we have to convert available memory into used memory somewhere later on!
Code Block language java title Add network memory members public long
...
getTotalMemorySize()
...
{ return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
...
}
...
public long
...
getAvailableMemorySize()
...
{ return getNumberOfAvailableMemorySegments() * memorySegmentSize;
...
}
update
NettyShuffleMetricFactory#registerShuffleMetrics
Code Block language java private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_AVAILABLE_MEMORY_IN_BYTES = "AvailableMemory"; // ... private static void registerShuffleMetrics(
...
String groupName,
...
MetricGroup metricGroup,
...
NetworkBufferPool networkBufferPool)
...
{ MetricGroup networkGroup = metricGroup.addGroup(groupName);
...
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
...
networkBufferPool::getTotalNumberOfMemorySegments);
...
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
...
networkBufferPool::getNumberOfAvailableMemorySegments);
...
// === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_
...
IN_
...
BYTES, networkBufferPool::getTotalMemorySize);
...
networkGroup.<Long, Gauge<Long>>gauge(METRIC_
...
AVAILABLE_MEMORY_
...
IN_
...
BYTES, networkBufferPool::
...
getAvailableMemorySize);
...
}
...
Step 2: Introduce new metrics for Task's managed memory usage
We still have to discuss how to implement that in the right way. A brief proposal is the following one:
We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot
. The aggregation can be maintained in the TaskExecutor
.
...
- add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
add default memory type in MemoryManager
Code Block language java public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
add register ManagedMemoryUsage in TaskExecutor#requestSlot:
Code Block public long getManagedMemoryUsed()
...
{ return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
...
slot ->
...
slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) -
...
slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
...
).sum();
...
}
add instantiateMemoryManagerMetrics in MetricUtils
Code Block language java public static void instantiateMemoryManagerMetrics(
...
MetricGroup statusMetricGroup,
...
TaskExecutor taskExecutor) {
...
checkNotNull(statusMetricGroup);
...
MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
...
memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", taskExecutor::getManagedMemoryTotal);
...
memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
...
}
- register it in TaskManagerRunner#startTaskManager
- taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
- url: /taskmanagers/:taskmanagerid
- response: metric add managedMemoryTotal and managedMemoryUsed
Step 3: Expose new metrics to REST interface
TaskManagerDetailsHandler#handleRequest
can be extended to cover also the newly added metrics:
TaskManagerMetricsInfo
needs to be extended adding members for the newly added fields:TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore)
can be used to initialize the fields:
Test Plan
Existing tests are updated to verify feature.