Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

{

  "type" : "object",

  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",

  "properties" : {

    "id" : {

      "type" : "any"

    },

    "path" : {

      "type" : "string"

    },

    "dataPort" : {

      "type" : "integer"

    },

    "timeSinceLastHeartbeat" : {

      "type" : "integer"

    },

    "slotsNumber" : {

      "type" : "integer"

    },

    "freeSlots" : {

      "type" : "integer"

    },

    "hardware" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",

      "properties" : {

        "cpuCores" : {

          "type" : "integer"

        },

        "physicalMemory" : {

          "type" : "integer"

        },

        "freeMemory" : {

          "type" : "integer"

        },

        "managedMemory" : {

          "type" : "integer"

        }

      }

    },

    "metrics" : {

      "type" : "object",

      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",

      "properties" : {

        "heapUsed" : {

          "type" : "integer"

        },

        "heapCommitted" : {

          "type" : "integer"

        },

        "heapMax" : {

          "type" : "integer"

        },

        "nonHeapUsed" : {

          "type" : "integer"

        },

        "nonHeapCommitted" : {

          "type" : "integer"

        },

        "nonHeapMax" : {

          "type" : "integer"

        },

        "directCount" : {

          "type" : "integer"

        },

        "directUsed" : {

          "type" : "integer"

        },

        "directMax" : {

          "type" : "integer"

        },

        "mappedCount" : {

          "type" : "integer"

        },

        "mappedUsed" : {

          "type" : "integer"

        },

        "mappedMax" : {

          "type" : "integer"

        },

        "memorySegmentsAvailable" : {

          "type" : "integer"

        },

        "memorySegmentsTotal" : {

          "type" : "integer"

        },

        "managedMemoryUsed" : {

          "type" : "long"

        },

        "managedMemoryTotal" : {

          "type" : "long"

        },

        "networkMemoryUsed" : {

          "type" : "long"

        },

        "networkMemoryTotal" : {

          "type" : "long"

        },

        "resource" : {

          "type" : "object",

          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerResourceInfo",

          "properties" : {

            "frameworkHeap" : {

              "type" : "long"

            },

            "frameworkOffHeap" : {

              "type" : "long"

            },

            "taskHeap" : {

              "type" : "long"

            },

            "taskOffHeap" : {

              "type" : "long"

            },

            "networkMemory" : {

              "type" : "long"

            },

            "managedMemory" : {

              "type" : "long"

            },

            "jvmMetaSpace" : {

              "type" : "long"

            },

            "jvmOverhead" : {

              "type" : "long"

            },

            "totalProcessMemory" : {

              "type" : "long"

            }

          }

        },

        "garbageCollectors" : {

          "type" : "array",

          "items" : {

            "type" : "object",

            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",

            "properties" : {

              "name" : {

                "type" : "string"

              },

              "count" : {

                "type" : "integer"

              },

              "time" : {

                "type" : "integer"

              }

            }

          }

        }

      }

    }

  }

}

Implementation Proposal

Step 1: Expose configuration parameters of TaskExecutor

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).

Step 2: Introduce new metric for memory usage of NetworkBufferPool

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
    }
    
    public long getUsedMemorySize() {
    	return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize);
    }


  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory";
    
    // ...
    
    private static void registerShuffleMetrics(
    		String groupName,
    		MetricGroup metricGroup,
    		NetworkBufferPool networkBufferPool) {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
    		networkBufferPool::getTotalNumberOfMemorySegments);	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
    		networkBufferPool::getNumberOfAvailableMemorySegments);
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES,
    		networkBufferPool::getTotalMemorySize);
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES,
    		networkBufferPool::getUsedMemorySize);
    }


Step

...

3: Introduce new metrics for Task's managed memory usage

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

...

    • add instantiateMemoryManagerMetrics in MetricUtils

      Code Block
      languagejava
      public static void instantiateMemoryManagerMetrics(
      	
      	MetricGroup statusMetricGroup, 
      		TaskExecutor taskExecutor) {
      	checkNotNull(statusMetricGroup);
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
      }



    • register it in TaskManagerRunner#startTaskManager
    • taskmanager’metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

Step

...

4: Expose new metrics to REST interface

TaskManagerDetailsHandler#handleRequest  can be extended to cover also the newly added metrics:

...