You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state[Under Discussion]

Discussion threads: 

JIRA:    Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-49 has been accepted and merged in Flink 1.10, the metric in current task manager detail page could not correspond well to the design of FLIP-49

 

Proposed Changes

According to FLIP-49, we can sort out a table containing the correspondence between configuration and metric.


JVM

VM

Flink Compose

user conf key

resource key

metric max

metric used

JVM

JVM Heap

FrameWork Heap

taskmanager.memory.framework.heap.size

resource.frameworkHeap

metrics.heapMax

metrics.heapUsed

Task Heap

taskmanager.memory.task.heap.size

resource.taskHeap

JVM None-Heap

JVM MetaSpace

taskmanager.memory.jvm-metaspace.size

resource.jvmMetaSpace

metrics.nonHeapMax

metrics.nonHeapUsed

JVM Overhead

taskmanager.memory.jvm-overhead.min

resource.jvmOverhead

taskmanager.memory.jvm-overhead.max

other

-

-

Outside JVM

Mapped

-

-

-

metrics.mappedMax

metrics.mappedUsed

Direct

FrameWork OffHeap

taskmanager.memory.framework.off-heap.size

resource.frameworkOffHeap

metrics.directMax - metrics.shuffleMemoryTotal

metrics.directUsed - metrics.shuffleMemoryTotal

Task OffHeap Memory

taskmanager.memory.task.off-heap.size

resource.taskOffHeap

Shuffle Memory

taskmanager.memory.shuffle.min

resource.shuffleMemory

metrics.shuffleMemoryTotal

metrics.shuffleMemoryUsed

taskmanager.memory.shuffle.max

Flink Managed

Managed Memory

taskmanager.memory.managed.size

resource.managedMemory

metrics.managedMemoryTotal

metrics.managedMemoryUsed

Frontend Design

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.

REST API Design

  • use TaskExecutorRegistration to expose task manager resource information that matches the memory composition
  • task manager's resource contains this information, show it in 
    • url: /taskmanagers/:taskmanagerid
    • response 

{

  "type" : "object",

  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",

  "properties" : {

    "id" : {

      "type" : "any"

    },

    "path" : {

      "type" : "string"

    },

    "dataPort" : {

      "type" : "integer"

    },

    "timeSinceLastHeartbeat" : {

      "type" : "integer"

    },

    "slotsNumber" : {

      "type" : "integer"

    },

    "freeSlots" : {

      "type" : "integer"

    },

    "hardware" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",

      "properties" : {

        "cpuCores" : {

          "type" : "integer"

        },

        "physicalMemory" : {

          "type" : "integer"

        },

        "freeMemory" : {

          "type" : "integer"

        },

        "managedMemory" : {

          "type" : "integer"

        }

      }

    },

    "metrics" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",

      "properties" : {

        "heapUsed" : {

          "type" : "integer"

        },

        "heapCommitted" : {

          "type" : "integer"

        },

        "heapMax" : {

          "type" : "integer"

        },

        "nonHeapUsed" : {

managedMemoryUsed          "type" : "integer"

        },

        "nonHeapCommitted" : {

          "type" : "integer"

        },

        "nonHeapMax" : {

          "type" : "integer"

        },

        "directCount" : {

          "type" : "integer"

        },

        "directUsed" : {

          "type" : "integer"

        },

        "directMax" : {

          "type" : "integer"

        },

        "mappedCount" : {

          "type" : "integer"

        },

        "mappedUsed" : {

          "type" : "integer"

        },

        "mappedMax" : {

          "type" : "integer"

        },

        "memorySegmentsAvailable" : {

          "type" : "integer"

        },

        "memorySegmentsTotal" : {

          "type" : "integer"

        },

        "managedMemoryUsed" : {

          "type" : "long"

        },

        "managedMemoryTotal" : {

          "type" : "long"

        },

        "networkMemoryUsed" : {

          "type" : "long"

        },

        "networkMemoryTotal" : {

          "type" : "long"

        },

        "resource" : {

          "type" : "object",

          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo",

          "properties" : {

            "frameworkHeap" : {

              "type" : "long"

            },

            "frameworkOffHeap" : {

              "type" : "long"

            },

            "taskHeap" : {

              "type" : "long"

            },

            "taskOffHeap" : {

              "type" : "long"

            },

            "networkMemory" : {

              "type" : "long"

            },

            "managedMemory" : {

              "type" : "long"

            },

            "jvmMetaSpace" : {

              "type" : "long"

            },

            "jvmOverhead" : {

              "type" : "long"

            },

            "totalProcessMemory" : {

              "type" : "long"

            }

          }

        },

        "garbageCollectors" : {

          "type" : "array",

          "items" : {

            "type" : "object",

            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",

            "properties" : {

              "name" : {

                "type" : "string"

              },

              "count" : {

                "type" : "integer"

              },

              "time" : {

                "type" : "integer"

              }

            }

          }

        }

      }

    }

  }

}


  • add shuffle memory's size metric
    • add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool

public long getTotalMemorySize() {

  return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;

}

public long getAvaliableMemorySize() {

  return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize;

}


    • update NettyShuffleMetricFactory#registerShuffleMetrics

private static void registerShuffleMetrics(

    String groupName,

    MetricGroup metricGroup,

    NetworkBufferPool networkBufferPool) {

    MetricGroup networkGroup = metricGroup.addGroup(groupName);

  networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,

    networkBufferPool::getTotalNumberOfMemorySegments);

  networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,

    networkBufferPool::getNumberOfAvailableMemorySegments);

  networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY,

    networkBufferPool::getTotalMemorySize);

  networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY,

    networkBufferPool::getAvaliableMemorySize);

}


    • taskmanager’s metrics add  Status.Shuffle.Netty.TotalMemoryCapacity and Status.Shuffle.Netty.AvaliableMemory
    • url: /taskmanagers/:taskmanagerid
    • response: metric add shuffleMemoryTotal and shuffleMemoryUsed
  • add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
    • add default memory type in MemoryManager

public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;


    • add register ManagedMemoryUsage in TaskExecutor#requestSlot:

public long getManagedMemoryUsed() {

  return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(

    slot ->

    slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE)- 

    slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)

  ).sum();

}


    • add instantiateMemoryManagerMetrics in MetricUtils

public static void instantiateMemoryManagerMetrics(

    MetricGroup statusMetricGroup, 

    TaskExecutor taskExecutor) {

  checkNotNull(statusMetricGroup);

  MetricGroup memoryManagerGroup =    statusMetricGroup.addGroup("Managed").addGroup("Memory");

  memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);

   memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);

}


    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

Test Plan

Existing tests are updated to verify feature.

  • No labels