Status
Current state: ["Under Discussion"]
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-112-Support-User-Defined-Metrics-for-Python-UDF-td38609.html
JIRA:
Released: 1.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-58 adds the support for Python UDFs, but user-defined metrics have not been supported yet. With metrics, users can report and monitor the UDF status to get a deeper understanding of the execution. In this FLIP, we want to support metrics for Python UDFs.
Goal
- Support user-defined metrics including Counters, Gauges, Meters, Distributions in Python UDFs. (Note: Histogram is not supported in this FLIP, instead, Distributions is supported to report statistics about the distribution of value. See more in the Distribution section.)
- Support defining user scopes.
- Support defining user variables.
Architecture
The high-level workflow could be summarized as follows:
- During initialization(in the open method), the base metric group information will be sent to the Python Operator. The information includes metric group variables, metric components and delimiter which is used to construct the metric identifiers.
- Based on the metric group information, we can reconstruct the base metric group and do metric registrations on the Python side.
- Process input elements.
- Update metrics in the Python UDFs.
- Transmit metrics from Python to Java on finishBundle(). Currently, we have a time and a size limit for each bundle. Since the time limit is usually not too big(default 1s), we can leverage the bundle to transmit metrics from Python to Java. This also brings performance benefits since we don’t have to report metrics for each record.
- Update the metric on the Java side.
Public Interfaces
Registering metrics
You can access the metric system from any Python UDF that extends `pyflink.table.udf.UserDefinedFunction` in the open method by calling function_context.get_metric_group(). This method returns a MetricGroup object on which you can create and register new metrics.
def open(self, function_context): super().open(function_context) self.counter = function_context.get_metric_group().counter("my_counter") |
Metric Types
In this FLIP, we are going to support Counters, Gauges, Meters and Distributions. More details below.
Counter
A Counter is used to count something. The current value can be in- or decreased using inc()/inc(n: int) or dec()/dec(n: int). You can create and register a Counter by calling counter(name: str) on a MetricGroup.
class Counter(Metric): """Counter metric interface. Allows a count to be incremented/decremented during pipeline execution.""" def __init__(self, inner_counter): self._inner_counter = inner_counter def inc(self, n=1): self._inner_counter.inc(n) def dec(self, n=1): self.inc(-n) class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): super().open(function_context) self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, i): self.counter.inc(3) return i - 1 |
Gauge
A metric that reports the latest value out of reported values. The current value can be set using set(n: long). You can register a gauge by calling gauge(name: str) on a MetricGroup. Gauge metrics are restricted to integer-only values.
class Gauge(Metric): """Gauge Metric interface. Allows tracking of the latest value of a variable during pipeline execution.""" def __init__(self, inner_gauge): self._inner_gauge = inner_gauge def set(self, value): self._inner_gauge.set(value) class MyUDF(ScalarFunction): def __init__(self): self.gauge = None def open(self, function_context): super().open(function_context) self.gauge = function_context.get_metric_group().gauge("my_gauge") def eval(self, i): self.gauge.set(i) return i - 1 |
Meter
A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event() method. The occurrence of multiple events at the same time can be registered with mark_event(n: long) method. You can register a meter by calling meter(name: str, time_span_in_seconds) on a MetricGroup. The default value of time_span_in_seconds is 60.
class Meter(Metric): """Meter Metric interface. Metric for measuring throughput.""" def __init__(self, inner_counter, time_span_in_seconds=60): self._inner_counter = inner_counter self._time_span_in_seconds = time_span_in_seconds def make_event(self, value=1): self._inner_counter.inc(value) def get_time_span_in_seconds(self): return self._time_span_in_seconds class MyUDF(ScalarFunction): def __init__(self): self.meter = None def open(self, function_context): super().open(function_context) # an average rate of events per second over 120s, default is 60s. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) def eval(self, i): self.meter.make_event(1) return i - 1 |
Distribution
A metric that reports information(sum/count/min/max/mean) about the distribution of reported values. The value can be updated using update(n: integer). You can register a distribution by calling distribution(name: str) on a MetricGroup. Distribution metrics are restricted to integer-only distributions. Internally, the distribution is implemented by the Flink Gauge as it can report a value of any type.
class Distribution(Metric): """Distribution Metric interface. Allows statistics about the distribution of a variable to be collected during pipeline execution.""" def __init__(self, inner_distribution): self._inner_distribution = inner_distribution def update(self, value): self._inner_distribution.update(value) class MyUDF(ScalarFunction): def __init__(self): self.distribution = None def open(self, function_context): super().open(function_context) self.distribution = function_context.get_metric_group().distribution("my_distribution") def eval(self, i): self.distribution.update(i) return i - 1 |
Scope
The definition of scope is the same as Java. You can find more description about scope here. The API of define user scope is as follows:
function_context .get_metric_group() .add_group("my_metrics") .counter("my_counter") function_context .get_metric_group() .add_group("my_metrics_key", "my_metrics_value") .counter("my_counter") |
User Variables
You can define a user variable by calling MetricGroup.add_group(key: str, value: str). This method affects what MetricGroup.get_metric_identifier, MetricGroup.get_scope_components and MetricGroup.get_all_variables() returns.
function_context .get_metric_group() .add_group("my_metrics_key", "my_metrics_value") .counter("my_counter") |
MetricGroup
A MetricGroup is a named container for Metrics and further metric subgroups. Instances of this class can be used to register new metrics with Flink and to create a nested hierarchy based on the group names. A MetricGroup is uniquely identified by it's place in the hierarchy and name.
class MetricGroup(abc.ABC): def counter(self, name: str) -> 'Counter': """ Registers a new `Counter` with Flink. """ pass def gauge(self, name: str) -> 'Gauge': """ Registers a new `Gauge` with Flink. """ pass def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter': """ Registers a new `Meter` with Flink. """ pass def distribution(self, name: str) -> 'Distribution': """ Registers a new `Distribution` with Flink. """ pass def add_group(self, name: str, extra: str = None) -> 'MetricGroup': """ if extra is not None, creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group is added to the key group's sub-groups. This method returns the value group. The only difference between calling this method and `group.add_group(key).add_group(value)` is that get_all_variables() of the value group return an additional `"<key>"="value"` pair. """ pass def get_scope_components(self) -> []: """ Gets the scope as an array of the scope components, for example `["host-7", "taskmanager-2", "window_word_count", "my-mapper"]` """ pass def get_all_variables(self) -> map: """ Returns a map of all variables and their associated value, for example `{"<host>"="host-7", "<tm_id>"="taskmanager-2"}` """ pass def get_metric_identifier(self, metric_name: str) -> str: """ Returns the fully qualified metric name, for example `host-7.taskmanager-2.window_word_count.my-mapper.metricName` """ pass |
Implementation Plan
- Transmit metric group information from Java to Python
- Support define scopes and variables on Python metric group
- Support Counter/Gauge/Distribution metric type for Python UDF
- Support Meter metric type for Python UDF
- Support Metrics for UDTF