...
class Meter(Metric): """Meter Metric interface. Metric for measuring throughput.""" def __init__(self, inner_counter, time_span_in_seconds=60): self._inner_counter = inner_counter self._time_span_in_seconds = time_span_in_seconds def makemark_event(self, value=1): self._inner_counter.inc(value) def get_time_span_in_seconds(self): return self._time_span_in_seconds class MyUDF(ScalarFunction): def __init__(self): self.meter = None def open(self, function_context): super().open(function_context) # an average rate of events per second over 120s, default is 60s. self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120) def eval(self, i): self.meter.makemark_event(1) return i - 1 |
...