Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[Under Discussion]

Discussion threads: 

JIRA:   

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14431

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49

 

Proposed Changes

According to FLIP-49, we can sort out a table containing the correspondence between configuration and metric.


JVM

VM

Flink Compose

user conf key1

configuration key2

metric max3

metric used3

JVM

JVM Heap

FrameWork Heap

taskmanager.memory.framework.heap.size

memoryConfiguration.frameworkHeap

Status.JVM.Memory.Heap.Max

Status.JVM.Memory.Heap.Used

Task Heap

taskmanager.memory.task.heap.size

memoryConfiguration.taskHeap

JVM None-Heap

JVM MetaSpace

taskmanager.memory.jvm-metaspace.size

memoryConfiguration.jvmMetaspace

Status.JVM.Memory.Metaspace.Max (FLINK-19617)

Status.JVM.Memory.Metaspace.Used (FLINK-19617)

JVM Overhead

taskmanager.memory.jvm-overhead.min

memoryConfiguration.jvmOverhead

Status.JVM.Memory.NonHeap.Max



Status.JVM.Memory.NonHeap.Used

taskmanager.memory.jvm-overhead.max

other

-

-

Outside JVM

Mapped

-

-

-

Status.JVM.Memory.Mapped.TotalCapacity

Status.JVM.Memory.Mapped.MemoryUsed

Direct

FrameWork OffHeap

taskmanager.memory.framework.off-heap.size

memoryConfiguration.frameworkOffHeap

Status.JVM.Memory.Direct.TotalCapacity - Status.Shuffle.Netty.TotalMemory

Status.JVM.Memory.Direct.MemoryUsed - Status.Shuffle.Netty.UsedMemory

Task OffHeap Memory

taskmanager.memory.task.off-heap.size

memoryConfiguration.taskOffHeap

Network Memory

taskmanager.memory.network.min

memoryConfiguration.networkMemory

Status.Shuffle.Netty.TotalMemory (FLINK-14422)

Status.Shuffle.Netty.UsedMemory (FLINK-14422)

taskmanager.memory.network.max

Flink Managed

Managed Memory

taskmanager.memory.managed.size

memoryConfiguration.managedMemory

Status.ManagedMemory.Total (FLINK-14406)

Status.ManagedMemory.Used (FLINK-14406)

1 These are the configuration parameters used in the Flink configuration.
2 Additionally, memoryConfiguration.totalFlinkMemory  and totalProcessMemory are exposed through the REST API.
3 The metrics which are exposed through the metrics endpoint.

Frontend Design (out-dated)

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.

REST API Design

  • task manager's resource contains this information, show it in 
  • url: /taskmanagers/:taskmanagerid

    Code Block
    languagejs
    titleJSON Schema of response
    collapsetrue
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
      "properties" : {
        "id" : {
          "type" : "any"
        },
        "path" : {
          "type" : "string"
        },
        "dataPort" : {
          "type" : "integer"
        },
        "timeSinceLastHeartbeat" : {
          "type" : "integer"
        },
        "slotsNumber" : {
          "type" : "integer"
        },
        "freeSlots" : {
          "type" : "integer"
        },
        "hardware" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
          "properties" : {
            "cpuCores" : {
              "type" : "integer"
            },
            "physicalMemory" : {
              "type" : "integer"
            },
            "freeMemory" : {
              "type" : "integer"
            },
            "managedMemory" : {
              "type" : "integer"
            }
          }
        },
        "memoryConfiguration" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
          "properties" : {
            "frameworkHeap" : {
              "type" : "long"
            },
            "frameworkOffHeap" : {
              "type" : "long"
            },
            "taskHeap" : {
              "type" : "long"
            },
            "taskOffHeap" : {
              "type" : "long"
            },
            "networkMemory" : {
              "type" : "long"
            },
            "managedMemory" : {
              "type" : "long"
            },
            "jvmMetaspace" : {
              "type" : "long"
            },
            "jvmOverhead" : {
              "type" : "long"
            },
            "totalFlinkMemory" : {
              "type" : "long"
            }
            "totalProcessMemory" : {
              "type" : "long"
            }
          }
        },
        "metrics" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
          "properties" : {
            "heapUsed" : {
              "type" : "integer"
            },
            "heapCommitted" : {
              "type" : "integer"
            },
            "heapMax" : {
              "type" : "integer"
            },
            "metaspaceUsed" : {
              "type" : "integer"
            },
            "metaspaceCommitted" : {
              "type" : "integer"
            },
            "metaspaceMax" : {
              "type" : "integer"
            },      
            "nonHeapUsed" : {
              "type" : "integer"
            },
            "nonHeapCommitted" : {
              "type" : "integer"
            },
            "nonHeapMax" : {
              "type" : "integer"
            },
            "directCount" : {
              "type" : "integer"
            },
            "directUsed" : {
              "type" : "integer"
            },
            "directMax" : {
              "type" : "integer"
            },
            "mappedCount" : {
              "type" : "integer"
            },
            "mappedUsed" : {
              "type" : "integer"
            },
            "mappedMax" : {
              "type" : "integer"
            },
            "memorySegmentsAvailable" : {
              "type" : "integer"
            },
            "memorySegmentsTotal" : {
              "type" : "integer"
            },
            "managedMemoryUsed" : {
              "type" : "long"
            },
            "managedMemoryTotal" : {
              "type" : "long"
            },
            "networkMemoryUsed" : {
              "type" : "long"
            },
            "networkMemoryTotal" : {
              "type" : "long"
            },
            "garbageCollectors" : {
              "type" : "array",
              "items" : {
                "type" : "object",
                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
                "properties" : {
                  "name" : {
                    "type" : "string"
                  },
                  "count" : {
                    "type" : "integer"
                  },
                  "time" : {
                    "type" : "integer"
                  }
                }
              }
            }
          }
        }
      }
    }


Implementation Proposal

Step 1: Expose configuration parameters of TaskExecutor

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).

Step 2: Introduce new metric for memory usage of NetworkBufferPool

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
    }
    
    public long getUsedMemorySize() {
    	return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize);
    }


  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory";
    
    // ...
    
    private static void registerShuffleMetrics(
    		String groupName,
    		MetricGroup metricGroup,
    		NetworkBufferPool networkBufferPool) {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
    		networkBufferPool::getTotalNumberOfMemorySegments);	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
    		networkBufferPool::getNumberOfAvailableMemorySegments);
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES,
    		networkBufferPool::getTotalMemorySize);
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES,
    		networkBufferPool::getUsedMemorySize);
    }


Step 3: Introduce new metrics for Task's managed memory usage

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

  1. add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
    • add default memory type in MemoryManager

      Code Block
      languagejava
      public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;


    • add register ManagedMemoryUsage in TaskExecutor#requestSlot:

      Code Block
      public long getManagedMemoryUsed() {
      	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
      		slot ->
      			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - 
      				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
      	).sum();
      
      }


    • add instantiateMemoryManagerMetrics in MetricUtils

      Code Block
      languagejava
      public static void instantiateMemoryManagerMetrics(
      	
      	MetricGroup statusMetricGroup, 
      		TaskExecutor taskExecutor) {
      	checkNotNull(statusMetricGroup);
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
      }



    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

Step 4: Expose new metrics to REST interface

TaskManagerDetailsHandler#handleRequest  can be extended to cover also the newly added metrics:

  1. TaskManagerMetricsInfo  needs to be extended adding members for the newly added fields:
  2. TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore) can be used to initialize the fields.

Follow-Ups

  • The /metrics endpoint of each TaskManager could be split up into multiple endpoints:
    • /configuration containing the static values like configuration and hardware description
    • /metrics containing the volatile values like the metrics and slot information

Test Plan

Existing tests are updated to verify feature.