Status
Current state: ["Under Discussion"]
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-112-Support-User-Defined-Metrics-for-Python-UDF-td38609.html
JIRA:
...
Page properties | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
A metric that reports the latest value out of reported values. The current value can be set using set(n: long). You can register a gauge by calling gauge(name: str, obj: Callable[[], int]) on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.
class Gauge(Metric):
"""Gauge Metric interface.
Allows tracking of the latest value of a variable during pipeline
execution."""
def __init__(self, inner_gauge):
.
...
def set(self, value):
...
class MyUDF(ScalarFunction): def __init__(self): self.gauge = None self.length = 0 def open(self, function_context): super().open(function_context) self.gauge = function_context.get_metric_group().gauge("my_gauge", lambda : self.length) def eval(self, i): self.gauge.set(i)length = i return i - 1 |
Meter
...
class MetricGroup(abc.ABC): def counter(self, name: str) -> 'Counter': """ Registers a new `Counter` with Flink. """ pass def gauge(self, name: str, method: Callable[[], int]) -> 'Gauge'None: """ Registers a new `Gauge` with Flink. """ pass def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter': """ Registers a new `Meter` with Flink. """ pass def distribution(self, name: str) -> 'Distribution': """ Registers a new `Distribution` with Flink. """ pass def add_group(self, name: str, extra: str = None) -> 'MetricGroup': """ if extra is not None, creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group is added to the key group's sub-groups. This method returns the value group. The only difference between calling this method and `group.add_group(key).add_group(value)` is that get_all_variables() of the value group return an additional `"<key>"="value"` pair. """ pass def get_scope_components(self) -> []: """ Gets the scope as an array of the scope components, for example `["host-7", "taskmanager-2", "window_word_count", "my-mapper"]` """ pass def get_all_variables(self) -> map: """ Returns a map of all variables and their associated value, for example `{"<host>"="host-7", "<tm_id>"="taskmanager-2"}` """ pass def get_metric_identifier(self, metric_name: str) -> str: """ Returns the fully qualified metric name, for example `host-7.taskmanager-2.window_word_count.my-mapper.metricName` """ pass |
FunctionContext
A `get_metric_group` will be added to the FunctionContext class.
class FunctionContext(object): """ Used to obtain global runtime information about the context in which the user-defined function is executed. The information includes the metric group, and global job parameters, etc. """ def get_metric_group(self) -> MetricGroup: pass |
Implementation Plan
...