Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • task manager's resource contains this information, show it in 
  • url: /taskmanagers/:taskmanagerid

    Code Block
    languagejs
    titleJSON Schema of response
    collapsetrue
    {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
      "properties" : {
        "id" : {
          "type" : "any"
        },
        "path" : {
          "type" : "string"
        },
        "dataPort" : {
          "type" : "integer"
        },
        "timeSinceLastHeartbeat" : {
          "type" : "integer"
        },
        "slotsNumber" : {
          "type" : "integer"
        },
        "freeSlots" : {
          "type" : "integer"
        },
        "hardware" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
          "properties" : {
            "cpuCores" : {
              "type" : "integer"
            },
            "physicalMemory" : {
              "type" : "integer"
            },
            "freeMemory" : {
              "type" : "integer"
            },
            "managedMemory" : {
              "type" : "integer"
            }
          }
        },
        "memoryConfiguration" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
          "properties" : {
            "frameworkHeap" : {
              "type" : "long"
            },
            "frameworkOffHeap" : {
              "type" : "long"
            },
            "taskHeap" : {
              "type" : "long"
            },
            "taskOffHeap" : {
              "type" : "long"
            },
            "networkMemory" : {
              "type" : "long"
            },
            "managedMemory" : {
              "type" : "long"
            },
            "jvmMetaspace" : {
              "type" : "long"
            },
            "jvmOverhead" : {
              "type" : "long"
            },
            "totalFlinkMemory" : {
              "type" : "long"
            }
            "totalProcessMemory" : {
              "type" : "long"
            }
          }
        },
        "metrics" : {
          "type" : "object",
          "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
          "properties" : {
            "heapUsed" : {
              "type" : "integer"
            },
            "heapCommitted" : {
              "type" : "integer"
            },
            "heapMax" : {
              "type" : "integer"
            },
            "metaspaceUsed" : {
              "type" : "integer"
            },
            "metaspaceCommitted" : {
              "type" : "integer"
            },
            "metaspaceMax" : {
              "type" : "integer"
            },      
            "nonHeapUsed" : {
              "type" : "integer"
            },
            "nonHeapCommitted" : {
              "type" : "integer"
            },
            "nonHeapMax" : {
              "type" : "integer"
            },
            "directCount" : {
              "type" : "integer"
            },
            "directUsed" : {
              "type" : "integer"
            },
            "directMax" : {
              "type" : "integer"
            },
            "mappedCount" : {
              "type" : "integer"
            },
            "mappedUsed" : {
              "type" : "integer"
            },
            "mappedMax" : {
              "type" : "integer"
            },
            "memorySegmentsAvailable" : {
              "type" : "integer"
            },
            "memorySegmentsTotal" : {
              "type" : "integer"
            },
            "managedMemoryUsed" : {
              "type" : "long"
            },
            "managedMemoryTotal" : {
              "type" : "long"
            },
            "networkMemoryUsed" : {
              "type" : "long"
            },
            "networkMemoryTotal" : {
              "type" : "long"
            },
            "garbageCollectors" : {
              "type" : "array",
              "items" : {
                "type" : "object",
                "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
                "properties" : {
                  "name" : {
                    "type" : "string"
                  },
                  "count" : {
                    "type" : "integer"
                  },
                  "time" : {
                    "type" : "integer"
                  }
                }
              }
            }
          }
        }
      }
    }


Implementation Proposal

Step 1: Expose effective configuration parameters of

...

TaskExecutorn

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14435

  1. TaskManagerResourceInfo is introduced as a POJO containing the relevant values proposed in the REST response.
  2. The TaskManagerResourceInfo is initialized when initializing the TaskExecutor in the same way as we do it with the HardwareDescription. It will be handed over in the same way through TaskExecutorRegistry → WorkerRegistration.
  3. The TaskManagerResourceInfo will be added along with HardwareDescription in ResourceManager::requestTaskManagerInfo(ResourceId, Time).

Step 2: Introduce new metric for memory usage of NetworkBufferPool 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() {
    	return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize;
    }
    
    public long getUsedMemorySize() {
    	return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize);
    }


  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory";
    private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory";
    
    // ...
    
    private static void registerShuffleMetrics(
    		String groupName,
    		MetricGroup metricGroup,
    		NetworkBufferPool networkBufferPool) {
    	MetricGroup networkGroup = metricGroup.addGroup(groupName);
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
    		networkBufferPool::getTotalNumberOfMemorySegments);	
    	networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
    		networkBufferPool::getNumberOfAvailableMemorySegments);
    
    	// === new ===
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES,
    		networkBufferPool::getTotalMemorySize);
    	networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES,
    		networkBufferPool::getUsedMemorySize);
    }


Step 3: Introduce new metrics for Task's managed memory usage 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406

We still have to discuss how to implement that in the right way. A brief proposal is the following one:

...

    • add instantiateMemoryManagerMetrics in MetricUtils

      Code Block
      languagejava
      public static void instantiateMemoryManagerMetrics(
      	
      	MetricGroup statusMetricGroup, 
      		TaskExecutor taskExecutor) {
      	checkNotNull(statusMetricGroup);
      
      	MetricGroup memoryManagerGroup = statusMetricGroup.addGroup("Managed").addGroup("Memory");
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity",  taskExecutor::getManagedMemoryTotal);
      	memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", taskExecutor::getManagedMemoryUsed);
      }



    • register it in TaskManagerRunner#startTaskManager
    • taskmanager metrics add Status.Managed.Memory.TotalCapacity and Status.Managed.Memory.MemoryUsed
    • url: /taskmanagers/:taskmanagerid
    • response: metric add managedMemoryTotal and managedMemoryUsed

Step 4: Add Metaspace metrics 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19617

There are no metrics present, yet, monitoring the JVM's Metaspace pool. The newly introduced metrics are going to be exposed through the /taskmanagers/metrics REST API.

Step 5: Update TaskManager's details page 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15328

The web UI has to be updated as proposed above.

...