Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion

...

thread

FLIP-75 discussion about the initial design

FLIP-102 discussion after splitting up FLIP-75 into sub-flips

Vote thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14431

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49.

 

The memory model which is exposed through the configuration parameters should be visualized in the same way in the TaskManager's details.

Proposed Changes

According to FLIP-49, we can sort out a table containing the correspondence between configuration and metriccorrelate the configuration parameters and the metrics partially.

JVM Metrics

These JVM metrics are exposed and can be used through the TaskManager's metrics REST API.

Heap

FrameWork Heap

taskmanager.memory.framework.heap.size

memoryConfiguration.frameworkHeap

JVM

Flink Compose0

user conf key1

configuration key2

metric max3

metric used3

MetricUsed keyTotal key
Heap
Status.JVM.Memory.Heap
.
UsedMax
DirectStatus.JVM.Memory.
Heap.
DirectUsedMax
Task Heap
Metaspace
taskmanagerNative

Managed Memory

taskmanager.memory.managed.size

memoryConfiguration.managedMemory

Status.ManagedMemory.Total 

Status.

memory.task.heap.size

memoryConfiguration.taskHeap

Off-Heap

JVM.Memory.Metaspace 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

14406

19617

UsedMax
MappedStatus.JVM.Memory.MappedMemoryUsedTotalCapacity
NonHeapStatus.JVM.
ManagedMemory.Used 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406
Direct
Memory.NonHeapMemoryUsedTotalCapacity

Memory Configuration

Flink's memory model (as described in org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec) can be mapped to the following Flink configuration parameters. There are a few that have a correlating Flink metric.

FrameWork OffHeap

Flink Memory ModelFlink configuration1Effective Configuration REST API2Metric3Used keyTotal key
Framework Heap
taskmanager.memory.framework.
off-
heap.sizememoryConfiguration.
frameworkOffHeap
frameworkHeapStatus.JVM.Memory.
Direct.TotalCapacity - Status.Shuffle.Netty.TotalMemory
HeapUsedMax
Task Heaptaskmanager.memory.task.heap.sizememoryConfiguration.taskHeap
Framework OffHeaptaskmanager.memory.framework.off-heap.sizememoryConfiguration.frameworkOffHeap---
Task OffHeap

Status.JVM.Memory.Direct.MemoryUsed - Status.Shuffle.Netty.UsedMemory

Task OffHeap Memory
taskmanager.memory.task.off-heap.sizememoryConfiguration.taskOffHeap
Network Memory

taskmanager.memory.network.min

memoryConfiguration

taskmanager.

networkMemoryStatus

memory.

Shuffle

network.

Netty.TotalMemory 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

max

memoryConfiguration.networkMemory

Status.Shuffle.Netty

.UsedMemory 


Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

taskmanager.memory.network.max

Mapped

-

-

-

Status.JVM.Memory.Mapped.TotalCapacity

Status.JVM.Memory.Mapped.MemoryUsed

Metaspace

JVM Metaspace


UsedMemoryTotalMemory
Managed Memorytaskmanager.memory.
jvm-metaspace
managed.sizememoryConfiguration.
jvmMetaspace
managedMemory

Status.

JVM

Flink.Memory.

Metaspace.Max

Managed

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

19617

Status.JVM.Memory.Metaspace.Used

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19617

14406

UsedTotal
JVM Metaspace
OverheadJVM Overhead
taskmanager.memory.jvm-
overhead
metaspace.
min
sizememoryConfiguration.
jvmOverhead
jvmMetaspaceStatus.JVM.Memory.
NonHeap.
MetaspaceUsedMax
Status.
JVM Overhead

taskmanager.

Memory

memory.

NonHeap

jvm-overhead.

Used

min

taskmanager.memory.jvm-overhead.max

...

memoryConfiguration.jvmOverhead---

1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory  and totalProcessMemory are exposed through the REST API.
3 The metrics which are exposed through the TaskManager's metrics

...

REST API.

Frontend Design

...

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics..

The previous metrics are moved into 'Advanced' since maybe some users still need them.

Image Added

Detail view

Image AddedImage Removed

REST API Design

...

Memory Configuration

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14435

The TaskManager's memory configuration will be exposed through {{

...

/taskmanagers/:taskmanagerid}}. A proposed REST respond is shown in the code block below:

Code Block
languagejs
titleJSON Schema of response
collapsetrue
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
  "properties" : {
    "id" : {
      "type" : "any"
    },
    "path" : {
      "type" : "string"
    },
    "dataPort" : {
      "type" : "integer"
    },
    "timeSinceLastHeartbeat" : {
      "type" : "integer"
    },
    "slotsNumber" : {
      "type" : "integer"
    },
    "freeSlots" : {
      "type" : "integer"
    },
    "hardware" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
      "properties" : {
        "cpuCores" : {
          "type" : "integer"
        },
        "physicalMemory" : {
          "type" : "integer"
        },
        "freeMemory" : {
          "type" : "integer"
        },
        "managedMemory" : {
          "type" : "integer"
        }
      }
    },
    "memoryConfiguration" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
      "properties" : {
        "frameworkHeap" : {
          "type" : "long"
        },
        "frameworkOffHeap" : {
          "type" : "long"
        },
        "taskHeap" : {
          "type" : "long"
        },
        "taskOffHeap" : {
          "type" : "long"
        },
        "networkMemory" : {
          "type" : "long"
        },
        "managedMemory" : {
          "type" : "long"
        },
        "jvmMetaspace" : {
          "type" : "long"
        },
        "jvmOverhead" : {
          "type" : "long"
        },
        "totalFlinkMemory" : {
          "type" : "long"
        }
        "totalProcessMemory" : {
          "type" : "long"
        }
      }
    },
    "metrics" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
      "properties" : {
        "heapUsed" : {
          "type" : "integer"
        },
        "heapCommitted" : {
          "type" : "integer"
        },
        "heapMax" : {
          "type" : "integer"
        },
        "metaspaceUsed" : {
          "type" : "integer"
        },
        "metaspaceCommitted" : {
          "type" : "integer"
        },
        "metaspaceMax" : {
          "type" : "integer"
        },      
        "nonHeapUsed" : {
          "type" : "integer"
        },
        "nonHeapCommitted" : {
          "type" : "integer"
        },
        "nonHeapMax" : {
          "type" : "integer"
        },
        "directCount" : {
          "type" : "integer"
        },
        "directUsed" : {
          "type" : "integer"
        },
        "directMax" : {
          "type" : "integer"
        },
        "mappedCount" : {
          "type" : "integer"
        },
        "mappedUsed" : {
          "type" : "integer"
        },
        "mappedMax" : {
          "type" : "integer"
        },
        "memorySegmentsAvailable" : {
          "type" : "integer"
        },
        "memorySegmentsTotal" : {
          "type" : "integer"
        },
        "managedMemoryUsed" : {
          "type" : "long"
        },
        "managedMemoryTotal" : {
          "type" : "long"
        },
        "networkMemoryUsed" : {
          "type" : "long"
        },
        "networkMemoryTotal" : {
          "type" : "long"
        },
        "garbageCollectors" : {
          "type" : "array",
          "items" : {
            "type" : "object",
            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
            "properties" : {
              "name" : {
                "type" : "string"
              },
              "count" : {
                "type" : "integer"
              },
              "time" : {
                "type" : "integer"
              }
            }
          }
        }
      }
    }
  }
}

Metrics exposure

The newly introduced metrics can be accessed through the metrics REST endpoint.

Implementation Proposal

Step 1: Expose effective configuration parameters of

...

TaskExecutorn

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14435

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).

Step 2: Introduce new metric for memory usage of NetworkBufferPoolNetworkBufferPool 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }

  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }

Step 3: Introduce new metrics for Task's managed memory usageusage 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

...

add default memory type in MemoryManager

Code Block
languagejava
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;

add register ManagedMemoryUsage in TaskExecutor#requestSlot:

Code Block
public long getManagedMemoryUsed() {
	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
		slot ->
			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - 
				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
	).sum();

}

add instantiateMemoryManagerMetrics in MetricUtils

Code Block
languagejava
public static void instantiateMemoryManagerMetrics(
	
	MetricGroup statusMetricGroup, 
		TaskExecutor taskExecutor) {
	checkNotNull(statusMetricGroup);

	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
}

Step 4: Add Metaspace metrics 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19617

There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the /taskmanagers/metrics REST API.

Step 5: Update TaskManager's details page 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19764

The web UI has to be updated as proposed above.

Follow-Ups

  • Create a separate independent endpoint for the effective memory configuration.
  • Deprecate the metrics sub-record returned by /taskmanagers/:taskmanagerid . The metrics endpoint can be used instead. This would simplify the TaskManagerDetailsHandler .

...

Step 4: Expose new metrics to REST interface

TaskManagerDetailsHandler#handleRequest  can be extended to cover also the newly added metrics:

  1. TaskManagerMetricsInfo  needs to be extended adding members for the newly added fields:
  2. TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore) can be used to initialize the fields.

Follow-Ups

  • The /metrics endpoint of each TaskManager could be split up into multiple endpoints:
  • /configuration containing the static values like configuration and hardware description
  • /metrics containing the volatile values like the metrics and slot information

Test Plan

Existing tests are updated to verify feature.