...
Current state: ["Under Discussion"]
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-112-Support-User-Defined-Metrics-for-Python-UDF-td38609.html
JIRA:
Released: 1.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
You can access the metric system from any Python UDF that extends `pyflink.table.udf.UserDefinedFunction` in the open method by calling function_context.get_metric_group(). This method returns a MetricGroup object on which you can create and register new metrics.
def open(self, function_context): super().open(function_context) self.counter = function_context.get_metric_group().counter("my_counter") |
Metric Types
In this FLIP, we are going to support Counters, Gauges, Meters and Distributions. More details below.
...
A Counter is used to count something. The current value can be in- or decreased using inc()/inc(n: int) or dec()/dec(n: int). You can create and register a Counter by calling counter(name: str) on a MetricGroup.
class Counter(Metric): """Counter metric interface. Allows a count to be incremented/decremented during pipeline execution.""" def __init__(self, inner_counter): self._inner_counter = inner_counter def inc(self, n=1): self._inner_counter.inc(n) def dec(self, n=1): self.inc(-n) class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): super().open(function_context) self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, i): self.counter.inc(3) return i - 1 |
Gauge
A metric that reports the latest value out of reported values. The current value can be set using set(n: long). You can register a gauge by calling gauge(name: str) on a MetricGroup. Gauge metrics are restricted to integer-only values.
class Gauge(Metric): """Gauge Metric interface. Allows tracking of the latest value of a variable during pipeline execution.""" def __init__(self, inner_gauge): self._inner_gauge = inner_gauge def set(self, value): self._inner_gauge.set(value) class MyUDF(ScalarFunction): def __init__(self): self.gauge = None def open(self, function_context): super().open(function_context) self.gauge = function_context.get_metric_group().gauge("my_gauge") def eval(self, i): self.gauge.set(i) return i - 1 |
Meter
A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event() method. The occurrence of multiple events at the same time can be registered with mark_event(n: long) method. You can register a meter by calling meter(name: str, time_span_in_seconds) on a MetricGroup. The default value of time_span_in_seconds is 60.
class Meter(Metric): """Meter Metric interface. Metric for measuring throughput.""" def __init__(self, inner_counter, time_span_in_seconds=60): self._inner_counter = inner_counter self._time_span_in_seconds = time_span_in_seconds def make_event(self, value=1): self._inner_counter.inc(value) def get_time_span_in_seconds(self): return self._time_span_in_seconds class MyUDF(ScalarFunction): def __init__(self): self.meter = None def open(self, function_context): super().open(function_context) # an average rate of events per second over 120s, default is 60s. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) def eval(self, i): self.meter.make_event(1) return i - 1 |
Distribution
A metric that reports information(sum/count/min/max/mean) about the distribution of reported values. The value can be updated using update(n: integer). You can register a distribution by calling distribution(name: str) on a MetricGroup. Distribution metrics are restricted to integer-only distributions. Internally, the distribution is implemented by the Flink Gauge as it can report a value of any type.
class Distribution(Metric): """Distribution Metric interface. Allows statistics about the distribution of a variable to be collected during pipeline execution.""" def __init__(self, inner_distribution): self._inner_distribution = inner_distribution def update(self, value): self._inner_distribution.update(value) class MyUDF(ScalarFunction): def __init__(self): self.distribution = None def open(self, function_context): super().open(function_context) self.distribution = function_context.get_metric_group().distribution("my_distribution") def eval(self, i): self.distribution.update(i) return i - 1 |
Scope
The definition of scope is the same as Java. You can find more description about scope here. The API of define user scope is as follows:
function_context .get_metric_group() .add_group("my_metrics") .counter("my_counter") function_context .get_metric_group() .add_group("my_metrics_key", "my_metrics_value") .counter("my_counter") |
User Variables
You can define a user variable by calling MetricGroup.add_group(key: str, value: str). This method affects what MetricGroup.get_metric_identifier, MetricGroup.get_scope_components and MetricGroup.get_all_variables() returns.
function_context .get_metric_group() .add_group("my_metrics_key", "my_metrics_value") .counter("my_counter") |
MetricGroup
A MetricGroup is a named container for Metrics and further metric subgroups. Instances of this class can be used to register new metrics with Flink and to create a nested hierarchy based on the group names. A MetricGroup is uniquely identified by it's place in the hierarchy and name.
class MetricGroup(abc.ABC): def counter(self, name: str) -> 'Counter': """ Registers a new `Counter` with Flink. """ pass def gauge(self, name: str) -> 'Gauge': """ Registers a new `Gauge` with Flink. """ pass def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter': """ Registers a new `Meter` with Flink. """ pass def distribution(self, name: str) -> 'Distribution': """ Registers a new `Distribution` with Flink. """ pass def add_group(self, name: str, extra: str = None) -> 'MetricGroup': """ if extra is not None, creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group is added to the key group's sub-groups. This method returns the value group. The only difference between calling this method and `group.add_group(key).add_group(value)` is that get_all_variables() of the value group return an additional `"<key>"="value"` pair. """ pass def get_scope_components(self) -> []: """ Gets the scope as an array of the scope components, for example `["host-7", "taskmanager-2", "window_word_count", "my-mapper"]` """ pass def get_all_variables(self) -> map: """ Returns a map of all variables and their associated value, for example `{"<host>"="host-7", "<tm_id>"="taskmanager-2"}` """ pass def get_metric_identifier(self, metric_name: str) -> str: """ Returns the fully qualified metric name, for example `host-7.taskmanager-2.window_word_count.my-mapper.metricName` """ pass |
Implementation Plan
- Transmit metric group information from Java to Python
- Support define scopes and variables on Python metric group
- Support Counter/Gauge/Distribution metric type for Python UDF
- Support Meter metric type for Python UDF
- Support Metrics for UDTF