Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49.

 

The memory model which is exposed through the configuration parameters should be visualized in the same way in the TaskManager's details.

Proposed Changes

According to FLIP-49, we can sort out a table containing the correspondence between configuration and metriccorrelate the configuration parameters and the metrics partially.

JVM Metrics

These JVM metrics are exposed and can be used through the TaskManager's metrics REST API.

...

Flink Memory ModelFlink configuration1Effective Configuration REST API2Metric3Used keyTotal key
Framework Heaptaskmanager.memory.framework.heap.sizememoryConfiguration.frameworkHeapStatus.JVM.Memory.HeapUsedMax
Task Heaptaskmanager.memory.task.heap.sizememoryConfiguration.taskHeap
Framework OffHeaptaskmanager.memory.framework.off-heap.sizememoryConfiguration.frameworkOffHeap---
Task OffHeaptaskmanager.memory.task.off-heap.sizememoryConfiguration.taskOffHeap
Network Memory

taskmanager.memory.network.min

taskmanager.memory.network.max

memoryConfiguration.networkMemory

Status.Shuffle.Netty

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

UsedMemoryTotalMemory
Managed Memorytaskmanager.memory.managed.sizememoryConfiguration.managedMemory

Status.ManagedMemory

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406

(is up for discussion, still)(is up for discussion, still)
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizememoryConfiguration.jvmMetaspaceStatus.JVM.Memory.MetaspaceUsedMax
JVM Overhead

taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

memoryConfiguration.jvmOverhead---

...

  • task manager's resource contains this information, show it in 
  • url: /taskmanagers/:taskmanagerid

    Code Block
    languagejs
    titleJSON Schema of response
    collapsetrue
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
      "properties" : {
        "id" : {
          "type" : "any"
        },
        "path" : {
          "type" : "string"
        },
        "dataPort" : {
          "type" : "integer"
        },
        "timeSinceLastHeartbeat" : {
          "type" : "integer"
        },
        "slotsNumber" : {
          "type" : "integer"
        },
        "freeSlots" : {
          "type" : "integer"
        },
        "hardware" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
          "properties" : {
            "cpuCores" : {
              "type" : "integer"
            },
            "physicalMemory" : {
              "type" : "integer"
            },
            "freeMemory" : {
              "type" : "integer"
            },
            "managedMemory" : {
              "type" : "integer"
            }
          }
        },
        "memoryConfiguration" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
          "properties" : {
            "frameworkHeap" : {
              "type" : "long"
            },
            "frameworkOffHeap" : {
              "type" : "long"
            },
            "taskHeap" : {
              "type" : "long"
            },
            "taskOffHeap" : {
              "type" : "long"
            },
            "networkMemory" : {
              "type" : "long"
            },
            "managedMemory" : {
              "type" : "long"
            },
            "jvmMetaspace" : {
              "type" : "long"
            },
            "jvmOverhead" : {
              "type" : "long"
            },
            "totalFlinkMemory" : {
              "type" : "long"
            }
            "totalProcessMemory" : {
              "type" : "long"
            }
          }
        },
        "metrics" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
          "properties" : {
            "heapUsed" : {
              "type" : "integer"
            },
            "heapCommitted" : {
              "type" : "integer"
            },
            "heapMax" : {
              "type" : "integer"
            },
            "metaspaceUsed" : {
              "type" : "integer"
            },
            "metaspaceCommitted" : {
              "type" : "integer"
            },
            "metaspaceMax" : {
              "type" : "integer"
            },      
            "nonHeapUsed" : {
              "type" : "integer"
            },
            "nonHeapCommitted" : {
              "type" : "integer"
            },
            "nonHeapMax" : {
              "type" : "integer"
            },
            "directCount" : {
              "type" : "integer"
            },
            "directUsed" : {
              "type" : "integer"
            },
            "directMax" : {
              "type" : "integer"
            },
            "mappedCount" : {
              "type" : "integer"
            },
            "mappedUsed" : {
              "type" : "integer"
            },
            "mappedMax" : {
              "type" : "integer"
            },
            "memorySegmentsAvailable" : {
              "type" : "integer"
            },
            "memorySegmentsTotal" : {
              "type" : "integer"
            },
            "managedMemoryUsed" : {
              "type" : "long"
            },
            "managedMemoryTotal" : {
              "type" : "long"
            },
            "networkMemoryUsed" : {
              "type" : "long"
            },
            "networkMemoryTotal" : {
              "type" : "long"
            },
            "garbageCollectors" : {
              "type" : "array",
              "items" : {
                "type" : "object",
                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
                "properties" : {
                  "name" : {
                    "type" : "string"
                  },
                  "count" : {
                    "type" : "integer"
                  },
                  "time" : {
                    "type" : "integer"
                  }
                }
              }
            }
          }
        }
      }
    }


Implementation Proposal

...

  1. TaskManagerResourceInfo is introduced as a POJO

...

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).

...

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
    }
    
    public long getUsedMemorySize() {
    	return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize);
    }


  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory";
    
    // ...
    
    private static void registerShuffleMetrics(
    		String groupName,
    		MetricGroup metricGroup,
    		NetworkBufferPool networkBufferPool) {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
    		networkBufferPool::getTotalNumberOfMemorySegments);	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
    		networkBufferPool::getNumberOfAvailableMemorySegments);
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES,
    		networkBufferPool::getTotalMemorySize);
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES,
    		networkBufferPool::getUsedMemorySize);
    }


...

We

...

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

...

    • add instantiateMemoryManagerMetrics in MetricUtils

      Code Block
      languagejava
      public static void instantiateMemoryManagerMetrics(
      	
      	MetricGroup statusMetricGroup, 
      		TaskExecutor taskExecutor) {
      	checkNotNull(statusMetricGroup);
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
      }



    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

...

There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the {{/taskmanagers/metrics}} REST API.

Step 5: Update TaskManager details pages

The web UI has to be updated as proposed above.

Follow-Ups

  • Create a separate independent endpoint for the effective memory configuration.

...

TaskManagerDetailsHandler#handleRequest  can be extended to cover also the newly added metrics:

  1. TaskManagerMetricsInfo  needs to be extended adding members for the newly added fields:
  2. TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore) can be used to initialize the fields.

Follow-Ups

  • The /metrics endpoint of each TaskManager could be split up into multiple endpoints:
  • /configuration containing the static values like configuration and hardware description
  • /metrics containing the volatile values like the metrics and slot information

Test Plan

Existing tests are updated to verify feature.