Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A metric that reports the latest value out of reported values. The current value can be set using set(n: long).  You can register a gauge by calling gauge(name: str, obj: Callable[[], int]) on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.

class Gauge(Metric):

    """Gauge Metric interface.

    Allows tracking of the latest value of a variable during pipeline

    execution."""

    def __init__(self, inner_gauge):

        self._inner_gauge = inner_gauge

    def set(self, value):

        self._inner_gauge.set(value)

class MyUDF(ScalarFunction):

    def __init__(self):

        self.gauge = None        self.length = 0

    def open(self, function_context):

        super().open(function_context)

        self.gauge =         function_context.get_metric_group().gauge("my_gauge", lambda : self.length)

    def eval(self, i):

        self.gauge.set(i)length = i

        return i - 1

Meter

...


class MetricGroup(abc.ABC):

    def counter(self, name: str) -> 'Counter':

        """

        Registers a new `Counter` with Flink.

        """

        pass

    def gauge(self, name: str, method: Callable[[], int]) -> 'Gauge'None:

        """

        Registers a new `Gauge` with Flink.

        """

        pass

    def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':

        """

        Registers a new `Meter` with Flink.

        """

        pass

    def distribution(self, name: str) -> 'Distribution':

        """

        Registers a new `Distribution` with Flink.

        """

        pass

    def add_group(self, name: str, extra: str = None) -> 'MetricGroup':

        """

        if extra is not None, creates a new key-value MetricGroup pair. The key group

        is added to this groups sub-groups, while the value group is added to the key

        group's sub-groups. This method returns the value group.

        The only difference between calling this method and

        `group.add_group(key).add_group(value)` is that get_all_variables()

        of the value group return an additional `"<key>"="value"` pair.

        """

        pass

    def get_scope_components(self) -> []:

        """

        Gets the scope as an array of the scope components, for example

        `["host-7", "taskmanager-2", "window_word_count", "my-mapper"]`

        """

        pass

    def get_all_variables(self) -> map:

        """

        Returns a map of all variables and their associated value, for example

        `{"<host>"="host-7", "<tm_id>"="taskmanager-2"}`

        """

        pass

    def get_metric_identifier(self, metric_name: str) -> str:

        """

        Returns the fully qualified metric name, for example

        `host-7.taskmanager-2.window_word_count.my-mapper.metricName`

        """

        pass

...