You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 34 Next »

Status

Current state[Under Discussion]

Discussion threads: 

JIRA:    Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49.

 

The memory model which is exposed through the configuration parameters should be visualized in the same way in the TaskManager's details.

Proposed Changes

According to FLIP-49, we can correlate the configuration parameters and the metrics partially.

JVM Metrics

These JVM metrics are exposed and can be used through the TaskManager's metrics REST API.

JVMMetricUsed keyTotal key
HeapStatus.JVM.Memory.HeapUsedMax
DirectStatus.JVM.Memory.DirectUsedMax
Metaspace

Status.JVM.Memory.Metaspace 

Unable to render Jira issues macro, execution error.

UsedMax
MappedStatus.JVM.Memory.MappedMemoryUsedTotalCapacity
NonHeapStatus.JVM.Memory.NonHeapMemoryUsedTotalCapacity

Memory Configuration

Flink's memory model (as described in org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec) can be mapped to the following Flink configuration parameters. There are a few that have a correlating Flink metric.

Flink Memory ModelFlink configuration1Effective Configuration REST API2Metric3Used keyTotal key
Framework Heaptaskmanager.memory.framework.heap.sizememoryConfiguration.frameworkHeapStatus.JVM.Memory.HeapUsedMax
Task Heaptaskmanager.memory.task.heap.sizememoryConfiguration.taskHeap
Framework OffHeaptaskmanager.memory.framework.off-heap.sizememoryConfiguration.frameworkOffHeap---
Task OffHeaptaskmanager.memory.task.off-heap.sizememoryConfiguration.taskOffHeap
Network Memory

taskmanager.memory.network.min

taskmanager.memory.network.max

memoryConfiguration.networkMemory

Status.Shuffle.Netty
Unable to render Jira issues macro, execution error.

UsedMemoryTotalMemory
Managed Memorytaskmanager.memory.managed.sizememoryConfiguration.managedMemory

Status.ManagedMemory
Unable to render Jira issues macro, execution error.

(is up for discussion, still)(is up for discussion, still)
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizememoryConfiguration.jvmMetaspaceStatus.JVM.Memory.MetaspaceUsedMax
JVM Overhead

taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

memoryConfiguration.jvmOverhead---

1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory  and totalProcessMemory are exposed through the REST API.
3 The metrics which are exposed through the TaskManager's metrics REST API.

Frontend Design (out-dated)

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.

REST API Design

  • task manager's resource contains this information, show it in 
  • url: /taskmanagers/:taskmanagerid

    JSON Schema of response
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
      "properties" : {
        "id" : {
          "type" : "any"
        },
        "path" : {
          "type" : "string"
        },
        "dataPort" : {
          "type" : "integer"
        },
        "timeSinceLastHeartbeat" : {
          "type" : "integer"
        },
        "slotsNumber" : {
          "type" : "integer"
        },
        "freeSlots" : {
          "type" : "integer"
        },
        "hardware" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
          "properties" : {
            "cpuCores" : {
              "type" : "integer"
            },
            "physicalMemory" : {
              "type" : "integer"
            },
            "freeMemory" : {
              "type" : "integer"
            },
            "managedMemory" : {
              "type" : "integer"
            }
          }
        },
        "memoryConfiguration" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
          "properties" : {
            "frameworkHeap" : {
              "type" : "long"
            },
            "frameworkOffHeap" : {
              "type" : "long"
            },
            "taskHeap" : {
              "type" : "long"
            },
            "taskOffHeap" : {
              "type" : "long"
            },
            "networkMemory" : {
              "type" : "long"
            },
            "managedMemory" : {
              "type" : "long"
            },
            "jvmMetaspace" : {
              "type" : "long"
            },
            "jvmOverhead" : {
              "type" : "long"
            },
            "totalFlinkMemory" : {
              "type" : "long"
            }
            "totalProcessMemory" : {
              "type" : "long"
            }
          }
        },
        "metrics" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
          "properties" : {
            "heapUsed" : {
              "type" : "integer"
            },
            "heapCommitted" : {
              "type" : "integer"
            },
            "heapMax" : {
              "type" : "integer"
            },
            "metaspaceUsed" : {
              "type" : "integer"
            },
            "metaspaceCommitted" : {
              "type" : "integer"
            },
            "metaspaceMax" : {
              "type" : "integer"
            },      
            "nonHeapUsed" : {
              "type" : "integer"
            },
            "nonHeapCommitted" : {
              "type" : "integer"
            },
            "nonHeapMax" : {
              "type" : "integer"
            },
            "directCount" : {
              "type" : "integer"
            },
            "directUsed" : {
              "type" : "integer"
            },
            "directMax" : {
              "type" : "integer"
            },
            "mappedCount" : {
              "type" : "integer"
            },
            "mappedUsed" : {
              "type" : "integer"
            },
            "mappedMax" : {
              "type" : "integer"
            },
            "memorySegmentsAvailable" : {
              "type" : "integer"
            },
            "memorySegmentsTotal" : {
              "type" : "integer"
            },
            "managedMemoryUsed" : {
              "type" : "long"
            },
            "managedMemoryTotal" : {
              "type" : "long"
            },
            "networkMemoryUsed" : {
              "type" : "long"
            },
            "networkMemoryTotal" : {
              "type" : "long"
            },
            "garbageCollectors" : {
              "type" : "array",
              "items" : {
                "type" : "object",
                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
                "properties" : {
                  "name" : {
                    "type" : "string"
                  },
                  "count" : {
                    "type" : "integer"
                  },
                  "time" : {
                    "type" : "integer"
                  }
                }
              }
            }
          }
        }
      }
    }

Implementation Proposal

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).
  1. add shuffle memory's size metric

    public long getTotalMemorySize() {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
    }
    
    public long getUsedMemorySize() {
    	return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize);
    }
  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory";
    
    // ...
    
    private static void registerShuffleMetrics(
    		String groupName,
    		MetricGroup metricGroup,
    		NetworkBufferPool networkBufferPool) {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
    		networkBufferPool::getTotalNumberOfMemorySegments);	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
    		networkBufferPool::getNumberOfAvailableMemorySegments);
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES,
    		networkBufferPool::getTotalMemorySize);
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES,
    		networkBufferPool::getUsedMemorySize);
    }

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

  1. add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
    • add default memory type in MemoryManager

      public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;
    • add register ManagedMemoryUsage in TaskExecutor#requestSlot:

      public long getManagedMemoryUsed() {
      	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
      		slot ->
      			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - 
      				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
      	).sum();
      
      }
    • add instantiateMemoryManagerMetrics in MetricUtils

      public static void instantiateMemoryManagerMetrics(
      	
      	MetricGroup statusMetricGroup, 
      		TaskExecutor taskExecutor) {
      	checkNotNull(statusMetricGroup);
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
      }

    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the {{/taskmanagers/metrics}} REST API.

Step 5: Update TaskManager details pages

The web UI has to be updated as proposed above.

Follow-Ups

  • Create a separate independent endpoint for the effective memory configuration.

Test Plan

Existing tests are updated to verify feature.

  • No labels