Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

class Meter(Metric):

    """Meter Metric interface.

    Metric for measuring throughput."""

    def __init__(self, inner_counter, time_span_in_seconds=60):

        self._inner_counter = inner_counter

        self._time_span_in_seconds = time_span_in_seconds

    def makemark_event(self, value=1):

        self._inner_counter.inc(value)

    def get_time_span_in_seconds(self):

        return self._time_span_in_seconds


class MyUDF(ScalarFunction):

    def __init__(self):

        self.meter = None

    def open(self, function_context):

        super().open(function_context)

        # an average rate of events per second over 120s, default is 60s.

        self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)

    def eval(self, i):

        self.meter.makemark_event(1)

        return i - 1

...