Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion

...

thread

FLIP-75 discussion about the initial design

FLIP-102 discussion after splitting up FLIP-75 into sub-flips

Vote thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14431

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Flink Memory ModelFlink configuration1Effective Configuration REST API2Metric3Used keyTotal key
Framework Heaptaskmanager.memory.framework.heap.sizememoryConfiguration.frameworkHeapStatus.JVM.Memory.HeapUsedMax
Task Heaptaskmanager.memory.task.heap.sizememoryConfiguration.taskHeap
Framework OffHeaptaskmanager.memory.framework.off-heap.sizememoryConfiguration.frameworkOffHeap---
Task OffHeaptaskmanager.memory.task.off-heap.sizememoryConfiguration.taskOffHeap
Network Memory

taskmanager.memory.network.min

taskmanager.memory.network.max

memoryConfiguration.networkMemory

Status.Shuffle.Netty

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14422

UsedMemoryTotalMemory
Managed Memorytaskmanager.memory.managed.sizememoryConfiguration.managedMemory

Status.Flink.Memory.ManagedMemoryManaged

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14406

(is up for discussion, still)


UsedTotal(is up for discussion, still)
JVM Metaspacetaskmanager.memory.jvm-metaspace.sizememoryConfiguration.jvmMetaspaceStatus.JVM.Memory.MetaspaceUsedMax
JVM Overhead

taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

memoryConfiguration.jvmOverhead---

1 These are the configuration parameters used in the Flink configuration.
2 These are the Json paths to address the properties in the HTTP REST API response. Additionally, memoryConfiguration.totalFlinkMemory  and totalProcessMemory are exposed through the REST API.
3 The metrics which are exposed through the TaskManager's metrics REST API.

Frontend Design

...

Redesign the task manager metric page, this would allow users to more clearly understand the relationship between these metrics.

The previous metrics are moved into 'Advanced' since maybe some users still need them.

Image Added

Detail view

Image RemovedImage Added

REST API Design

...

Memory Configuration

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14435

The TaskManager's memory configuration will be exposed through {{

...

/taskmanagers/:taskmanagerid}}. A proposed REST respond is shown in the code block below:

Code Block
languagejs
titleJSON Schema of response
collapsetrue
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
  "properties" : {
    "id" : {
      "type" : "any"
    },
    "path" : {
      "type" : "string"
    },
    "dataPort" : {
      "type" : "integer"
    },
    "timeSinceLastHeartbeat" : {
      "type" : "integer"
    },
    "slotsNumber" : {
      "type" : "integer"
    },
    "freeSlots" : {
      "type" : "integer"
    },
    "hardware" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
      "properties" : {
        "cpuCores" : {
          "type" : "integer"
        },
        "physicalMemory" : {
          "type" : "integer"
        },
        "freeMemory" : {
          "type" : "integer"
        },
        "managedMemory" : {
          "type" : "integer"
        }
      }
    },
    "memoryConfiguration" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskExecutorMemoryConfiguration",
      "properties" : {
        "frameworkHeap" : {
          "type" : "long"
        },
        "frameworkOffHeap" : {
          "type" : "long"
        },
        "taskHeap" : {
          "type" : "long"
        },
        "taskOffHeap" : {
          "type" : "long"
        },
        "networkMemory" : {
          "type" : "long"
        },
        "managedMemory" : {
          "type" : "long"
        },
        "jvmMetaspace" : {
          "type" : "long"
        },
        "jvmOverhead" : {
          "type" : "long"
        },
        "totalFlinkMemory" : {
          "type" : "long"
        }
        "totalProcessMemory" : {
          "type" : "long"
        }
      }
    },
    "metrics" : {
      "type" : "object",
      "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo",
      "properties" : {
        "heapUsed" : {
          "type" : "integer"
        },
        "heapCommitted" : {
          "type" : "integer"
        },
        "heapMax" : {
          "type" : "integer"
        },
        "metaspaceUsed" : {
          "type" : "integer"
        },
        "metaspaceCommitted" : {
          "type" : "integer"
        },
        "metaspaceMax" : {
          "type" : "integer"
        },      
        "nonHeapUsed" : {
          "type" : "integer"
        },
        "nonHeapCommitted" : {
          "type" : "integer"
        },
        "nonHeapMax" : {
          "type" : "integer"
        },
        "directCount" : {
          "type" : "integer"
        },
        "directUsed" : {
          "type" : "integer"
        },
        "directMax" : {
          "type" : "integer"
        },
        "mappedCount" : {
          "type" : "integer"
        },
        "mappedUsed" : {
          "type" : "integer"
        },
        "mappedMax" : {
          "type" : "integer"
        },
        "memorySegmentsAvailable" : {
          "type" : "integer"
        },
        "memorySegmentsTotal" : {
          "type" : "integer"
        },
        "managedMemoryUsed" : {
          "type" : "long"
        },
        "managedMemoryTotal" : {
          "type" : "long"
        },
        "networkMemoryUsed" : {
          "type" : "long"
        },
        "networkMemoryTotal" : {
          "type" : "long"
        },
        "garbageCollectors" : {
          "type" : "array",
          "items" : {
            "type" : "object",
            "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerMetricsInfo:GarbageCollectorInfo",
            "properties" : {
              "name" : {
                "type" : "string"
              },
              "count" : {
                "type" : "integer"
              },
              "time" : {
                "type" : "integer"
              }
            }
          }
        }
      }
    }
  }
}

Metrics exposure

The newly introduced metrics can be accessed through the metrics REST endpoint.

Implementation Proposal

Step 1: Expose effective configuration parameters of TaskExecutorn

...

  1. add shuffle memory's size metric

    Code Block
    languagejava
    public long getTotalMemorySize() { return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; } public long getUsedMemorySize() { return getTotalMemorySize() - (getNumberOfAvailableMemorySegments() * memorySegmentSize); }

  2. update NettyShuffleMetricFactory#registerShuffleMetrics

    Code Block
    languagejava
    private static final String METRIC_TOTAL_MEMORY_IN_BYTES = "TotalMemory"; private static final String METRIC_USED_MEMORY_IN_BYTES = "UsedMemory"; // ... private static void registerShuffleMetrics( String groupName, MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { MetricGroup networkGroup = metricGroup.addGroup(groupName); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); // === new === networkGroup.<Long, Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_IN_BYTES, networkBufferPool::getTotalMemorySize); networkGroup.<Long, Gauge<Long>>gauge(METRIC_USED_MEMORY_IN_BYTES, networkBufferPool::getUsedMemorySize); }

Step 3: Introduce new metrics for Task's managed memory usage 

...

We would have to introduce a new metric that represents the aggregated memory usage of each TaskSlot . The aggregation can be maintained in the TaskExecutor.

...

add default memory type in MemoryManager

Code Block
languagejava
public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP;

add register ManagedMemoryUsage in TaskExecutor#requestSlot:

Code Block
public long getManagedMemoryUsed() {
	return this.taskSlotTable.getAllocatedSlots().stream().mapToLong(
		slot ->
			slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) - 
				slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE)
	).sum();

}

add instantiateMemoryManagerMetrics in MetricUtils

...

languagejava

...

TaskExecutor

...

.

...

Step 4: Add Metaspace metrics 

...

Step 5: Update TaskManager's details page 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-1532819764

The web UI has to be updated as proposed above.

...

  • Create a separate independent endpoint for the effective memory configuration.
  • Deprecate the metrics sub-record returned by /taskmanagers/:taskmanagerid . The metrics endpoint can be used instead. This would simplify the TaskManagerDetailsHandler .

Test Plan

Existing tests are updated to verify feature.