Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: I restructured the implementation proposal provided before.

...

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.

REST API Design

  • use TaskExecutorRegistration to expose task manager resource information that matches the memory compositiontask manager's resource contains this information, show it in 
    • url: /taskmanagers/:taskmanagerid
    • response

{

  "type" : "object",

  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",

  "properties" : {

    "id" : {

      "type" : "any"

    },

    "path" : {

      "type" : "string"

    },

    "dataPort" : {

      "type" : "integer"

    },

    "timeSinceLastHeartbeat" : {

      "type" : "integer"

    },

    "slotsNumber" : {

      "type" : "integer"

    },

    "freeSlots" : {

      "type" : "integer"

    },

    "hardware" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",

      "properties" : {

        "cpuCores" : {

          "type" : "integer"

        },

        "physicalMemory" : {

          "type" : "integer"

        },

        "freeMemory" : {

          "type" : "integer"

        },

        "managedMemory" : {

          "type" : "integer"

        }

      }

    },

    "metrics" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",

      "properties" : {

        "heapUsed" : {

          "type" : "integer"

        },

        "heapCommitted" : {

          "type" : "integer"

        },

        "heapMax" : {

          "type" : "integer"

        },

        "nonHeapUsed" : {

          "type" : "integer"

        },

        "nonHeapCommitted" : {

          "type" : "integer"

        },

        "nonHeapMax" : {

          "type" : "integer"

        },

        "directCount" : {

          "type" : "integer"

        },

        "directUsed" : {

          "type" : "integer"

        },

        "directMax" : {

          "type" : "integer"

        },

        "mappedCount" : {

          "type" : "integer"

        },

        "mappedUsed" : {

          "type" : "integer"

        },

        "mappedMax" : {

          "type" : "integer"

        },

        "memorySegmentsAvailable" : {

          "type" : "integer"

        },

        "memorySegmentsTotal" : {

          "type" : "integer"

        },

        "managedMemoryUsed" : {

          "type" : "long"

        },

        "managedMemoryTotal" : {

          "type" : "long"

        },

        "networkMemoryUsed" : {

          "type" : "long"

        },

        "networkMemoryTotal" : {

          "type" : "long"

        },

        "resource" : {

          "type" : "object",

          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo",

          "properties" : {

            "frameworkHeap" : {

              "type" : "long"

            },

            "frameworkOffHeap" : {

              "type" : "long"

            },

            "taskHeap" : {

              "type" : "long"

            },

            "taskOffHeap" : {

              "type" : "long"

            },

            "networkMemory" : {

              "type" : "long"

            },

            "managedMemory" : {

              "type" : "long"

            },

            "jvmMetaSpace" : {

              "type" : "long"

            },

            "jvmOverhead" : {

              "type" : "long"

            },

            "totalProcessMemory" : {

              "type" : "long"

            }

          }

        },

        "garbageCollectors" : {

          "type" : "array",

          "items" : {

            "type" : "object",

            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",

            "properties" : {

              "name" : {

                "type" : "string"

              },

              "count" : {

                "type" : "integer"

              },

              "time" : {

                "type" : "integer"

              }

            }

          }

        }

      }

    }

  }

}

Implementation Proposal

Step 1: Introduce new metric for memory usage of NetworkBufferPool

  1. add shuffle memory's size metric

  • add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool
  1. Keep in mind that we have to convert available memory into used memory somewhere later on!

    Code Block
    languagejava
    titleAdd network memory members
    public long 

...

  1. getTotalMemorySize()

...

  1.  {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;

...

  1. 
    }

...

  1. 
    
    public long

...

  1.  getAvailableMemorySize()

...

  1.  {
    	return getNumberOfAvailableMemorySegments() * memorySegmentSize;

...

  1. 
    }



  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_AVAILABLE_MEMORY_IN_BYTES = "AvailableMemory";
    
    // ...
    
    private static void registerShuffleMetrics(

...

  1. 
    		String groupName,

...

  1. 
    		MetricGroup metricGroup,

...

  1. 
    		NetworkBufferPool networkBufferPool)

...

  1.  {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);

...

  1. 
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,

...

  1. 
    		networkBufferPool::getTotalNumberOfMemorySegments);

...

  1. 	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,

...

  1. 
    		networkBufferPool::getNumberOfAvailableMemorySegments);

...

  1. 
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_

...

  1. IN_

...

  1. BYTES,
    		networkBufferPool::getTotalMemorySize);

...

  1. 
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_

...

  1. AVAILABLE_MEMORY_

...

  1. IN_

...

  1. BYTES,
    		networkBufferPool::

...

  1. getAvailableMemorySize);

...

  1. 
    }

...


Step 2: Introduce new metrics for Task's managed memory usage

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

...

  1. add managed memory metric and expose it in rest API, as now managed memory is allocated in the slot, we need to sum it in TaskExecutor.
    • add default memory type in MemoryManager

      Code Block
      languagejava
      public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;


    • add register ManagedMemoryUsage in TaskExecutor#requestSlot:


      Code Block
      public long getManagedMemoryUsed()

...

    •  {
      	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(

...

    • 
      		slot ->

...

    • 
      			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) 

...

    • 
      				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)

...

    • 
      	).sum();

...

    • 
      
      }


    • add instantiateMemoryManagerMetrics in MetricUtils

      Code Block
      languagejava
      public static void instantiateMemoryManagerMetrics(

...

    • 
      	
      	MetricGroup statusMetricGroup, 

...

    • 
      		TaskExecutor taskExecutor) {

...

    • 
      	checkNotNull(statusMetricGroup);

...

    • 
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");

...

    • 
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);

...

    • 
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);

...

    • 
      }



    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

Step 3: Expose new metrics to REST interface

TaskManagerDetailsHandler#handleRequest  can be extended to cover also the newly added metrics:

  1. TaskManagerMetricsInfo  needs to be extended adding members for the newly added fields:
  2. TaskManagerDetailsHandler::createTaskManagerMetricsInfo(MetricsStore.TaskManagerMetricStore)  can be used to initialize the fields:

Test Plan

Existing tests are updated to verify feature.