Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

Discussion thread: -

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

...

4389

Release1.2

...



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1.     Create a data-structure on the Job-/TaskManager containing a metrics snapshot
  2.     Transfer this snapshot to the WebInterface backendback-end
  3.     Store the snapshot in the WebRuntimeMonitor in an easily accessible way
  4.     Expose the stored metrics to the WebInterface via REST API

...

The MetricRegistry will contain a MetricDumperMetricQueryService, which act similarly as acts like an unscheduled reporter.
The Dumper service is a separate actor that creates and returns a Key-Value representation of the entire metric space when queried by the Manager.

The keys represent the name of the metric; formatted according to the following scope format strings:

metrics.scope.jm0:<user_scope>.<name>
metrics.scope.tm1
.
:<tm_id>:<user_scope>.<name>
metrics.scope.jm.job2
.
:<job_id>:<user_scope>.<name>
metrics.scope.tm.job2
.
:<job_id>:<user_scope>.<name>
metrics.scope.tm.task3
.
:<job_id>
.
:<task_id>
.
:<subtask_index>:<user_scope>.<name>
metrics.scope.tm.operator4
.
:<job_id>
.
:<task_id>
.
:<subtask_index>
.
:<operator_name>:<user_scope>.<name>

The initial number serves as a category for the WebInterface, and allows for faster handling as we don't have to parse the entire string before deciding what category it belongs to.
  0 = JobManager
  1 = TaskManager
  2 = Job
  3 = Task
  4 = Operator

For this to work we need to be able to use a different format than the one configured in the configuration, and also cache the resulting strings.
For now we can hard-code a separate scopeString field in the AbstractMetricGroup; a more general solution would be to allow separate ScopeFormat configurations for each reporter, which is a natural follow-up to

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4246
The scope generation will be hard-coded into the separate metric groups, as ScopeFormats are a bit overkill for this. The created scopes are cached to avoid frequent re-computation.

The Value is the value returned by the metric , or a method of the given metric (as Histograms expose multiple methods).
Whether the value is stringified is TBD. Using strings would solve the serialization problem for Gauge metrics, but will require the generation of many short-lived objects on the JM/TM and additional parsing if we want to aggregate metrics in the WebInterface.

The Key-Value pairs can be stored in simple list-like data structure like an Object array.

Transfer to the WebInterface

...

As there aren't any details as to how the separation will work, specifically whether a TaskManager -> WebInterface heartbeat will exist, i will assume that there is no message that we can piggyback on.

As such the WebInterface will regularly query The WebRuntimeMonitor will contain a MetricFetcher which queries the JobManager for all available TaskManagers, and then query each of them for a metric dump.

This will be done in a separate Thread inside the WebRuntimeMonitor, which also has the responsiblity to merge the returned dumps.

Metrics are only fetched if they actually accessed via REST calls, with a minimum time period (10 seconds) between updates.

The fetched metrics are merged and The merged dump is kept in a central location inside the WebRuntimeMonitorMetricFetcher, available to different handlers.

Storage in the WebRuntimeMonitor

My (rough) proposal for a datastructure data-structure is the following:

MetricSnapshotMetricStore {
	void addMetric(String name, Object value);

	JobManager jobManager;

	MetricStore  jobMan class JobManagerMetricStore {
		 Map<String, Object> metrics;
	}

	 } Map<String, TaskManager>TaskManagerMetricStore> taskmanagers;

	 class TaskManagerMetricStore {
		 Map<String, Object> metrics;
	}

	 } Map<String, Job>JobMetricStore> jobs;

	 class JobMetricStore {
		 Map<String, Object> metrics;
		 Map<String, Task>TaskMetricStore > tasks;
	}

	 } class TaskMetricStore {
		 Map<String, Object> metrics;
		 Map<String, Subtask>;
	}

	SubtaskMetricStore>; } class SubtaskMetricStore  {
		 Map<String, Object> metrics;
	 }
 }

 


Note that at any given time only one of these objects will exist.

...

Access to the structure should be guarded by a lock, so that not every map must a concurrent one.There aren't any plans for data retirement right now

Data Retirement

  • JobManager metrics are kept indefinitely, as they have a limited size
  • TaskManager metrics are kept as long as the given TaskManager is registered on the JobManager
  • Job and Task metrics are kept as long as they are running or archived. In other words, if the job is listed in the WebInterface as either running or completed, the metrics are still available.

Access from the WebInterface

Several new Handlers will be added (one for each category) that will access the central MetricSnapshotMetricStore.

The REST call calls for a the list of all available metrics of a task could look like this:

  • JobManager: "/jobmanager/metrics"
  • TaskManager: "/taskmanagers/:taskmanagerid/metrics"
  • Job: "/jobs/:jobid/metrics"
  • Task: "/jobs/:jobid/vertices/:vertexid/metrics"

This will return a JSON array containing the names of all available metrics.

The values for a list of metrics can be requested by appending "?get=[<metric_name1>[,<metric_nameX>]]"

This will return a JSON array containing "id":"values" pairs.jobs/JOB_ID/vertices/TASK_ID/metricsThe REST call for specific metrics of a task could look like this: /jobs/JOB_ID/vertices/TASK_ID/metrics?get=elements-in,elements-out,bytes-per-second-out

Prototype

A working prototype that follows this FLIP can be found here.
    
It allows the WebInterface to display task/operator metrics. Credits to Piotr Godek who provided the code to display of task metrics in the WebInterface.

...

Everything can be tested with unit tests.

Rejected Alternatives

-