You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

As a monitoring system, user may ask question like:

  • Is the system healthy?
  • what's the heat point of the system?
  • How many events is processed per second?
  • Is the data well partitioned or skewed in data source/topology?
  • what's the average latency of a restful API?

To answer these questions, an easy-to-use metric framework is necessary for quickly on boarding various metrics, following are some common use cases regarding data source, topology, service etc

  • Data source: Kafka is default data source of eagle, user may curious about data distribution info of a certain topic, like latest offsets & trend of log count of each partition, etc
  • Topology: How many events is processed in a certain interval, what's the trend and peek #events/minute
  • Service: What's the avg response time of a restful API

Dropwizard (https://github.com/dropwizard/metrics) is popular metric framework used by many projects like spark, storm & druid

Eagle also leverage dropwizard metric framework and did some extension for our specific needs

In dropwizard metric framework, it use separate thread to scan the metrics map and do flush at a fixed interval.

A typical use cases of dropwizard is that all metrics are register at startup, no frequent register/unregister operation and metrics, use system time when doing statistics

But for eagle's metric use case, things are a bit different

  • Frequent register/unregister operation
    Metric is timeseries, which means we may frequently register/unregister as with time passing if directly adopting dropwizard 
    Considering reporting thread need to lock the metric map to check which metric is ready to flush, it may have contention problem with frequent register/unregister operation
  • User time clock
    Sometimes we need user time clock to do statistics based on message time instead of system time, this is especially useful when analysis historical data
  • Tagged metric name
    In eagle, tags is part of metric key(metricKey=metricName+tags+timestamp), which in dropwizard, metric key type is fixed as string, we need to do some encoding/decoding

So in order to solve the above problem, we extended dropwizard metric framework, the main change is using push mode to flush metric instead of pull mode to avoid contention issue(In dropwizard metric framework, it start a thread running metric reporter for metric status checking and flushing)

Following are some main class of eagle metric framework

IEagleMetric
/**
 * It's just a workaround to extends Gauge instead of Metric interface
 * For MetricRegistry's notifyListenerOfRemovedMetric method will throw exception on unknown metric type
 */

public interface IEagleMetric extends Gauge<Double> {

    void registerListener(EagleMetricListener listener);

    /**
     * return true if the metric need to be flushed, otherwise return false
     * @param value
     * @param userTimeClock
     * @return
     */
    boolean update(double value, long userTimeClock);
}
EagleMetric
public abstract class EagleMetric implements IEagleMetric {

    public long latestUserTimeClock;
    public AtomicDouble value;
    public String name;
    public long granularity;
    public List<EagleMetricListener> metricListeners = new ArrayList<>();
    private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);

    public EagleMetric(EagleMetric metric) {
        this.latestUserTimeClock = metric.latestUserTimeClock;
        this.name = metric.name;
        this.value = new AtomicDouble(metric.value.doubleValue());
        this.granularity = metric.granularity;
    }

    public EagleMetric(long latestUserTimeClock, String name, double value, long granularity) {
        this.latestUserTimeClock = latestUserTimeClock;
        this.name = name;
        this.value = new AtomicDouble(value);
        this.granularity = granularity;
    }

    public EagleMetric(long latestUserTimeClock, String metricName, double value) {
        this(latestUserTimeClock, metricName, value, 5 * DateUtils.MILLIS_PER_MINUTE);
    }

    public void registerListener(EagleMetricListener listener) {
        metricListeners.add(listener);
    }

    public Double getValue() {
        return value.doubleValue();
    }
}
EagleCounterMetric
public class EagleCounterMetric extends EagleMetric {

    private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);

    public EagleCounterMetric(long latestUserTimeClock, String name, double value, long granularity) {
        super(latestUserTimeClock, name, value, granularity);
    }

    public EagleCounterMetric(EagleCounterMetric metric) {
        super(metric);
    }

    public long trim(long latestUserTimeClock) {
        return latestUserTimeClock / granularity * granularity;
    }

    public void flush(long latestUserTimeClock) {
        for (EagleMetricListener listener : metricListeners) {
            EagleCounterMetric newEagleMetric = new EagleCounterMetric(this);
            newEagleMetric.name = MetricKeyCodeDecoder.addTimestampToMetricKey(trim(latestUserTimeClock), newEagleMetric.name);
            listener.onMetricFlushed(Arrays.asList((EagleMetric)newEagleMetric));
        }
    }

    public boolean checkIfNeedFlush(long currentUserTimeClock) {
        if (currentUserTimeClock - latestUserTimeClock > granularity) {
            return true;
        }
        return false;
    }

    public boolean update(double d, long currentUserTimeClock) {
        Boolean readyToFlushed = checkIfNeedFlush(currentUserTimeClock);
        if (!readyToFlushed) {
            if (currentUserTimeClock < latestUserTimeClock) {
                LOG.warn("Something must be wrong, event should come in order of userTimeClock");
            }
            value.addAndGet(d);
        }
        else {
            flush(latestUserTimeClock);
            value.getAndSet(1);
            latestUserTimeClock = currentUserTimeClock;
        }
        return readyToFlushed;
    }
}
EagleGaugeMetric
public class EagleGaugeMetric extends EagleMetric {

    private static final Logger LOG = LoggerFactory.getLogger(EagleGaugeMetric.class);

    public EagleGaugeMetric(long latestUserTimeClock, String name, double value) {
        super(latestUserTimeClock, name, value, 0);
    }

    public EagleGaugeMetric(EagleGaugeMetric metric) {
        super(metric);
    }

    public boolean update(double d, long currentUserTimeClock) {
        value.getAndSet(d);
        this.latestUserTimeClock = currentUserTimeClock;
        return true;
    }
}

EagleMetricListener
public interface EagleMetricListener {

    /**
     * The method should be called in thread-safe mode
     * @param metrics
     */
    void onMetricFlushed(List<EagleMetric> metrics);
}
EagleServiceReporterMetricListener
public class EagleServiceReporterMetricListener implements EagleMetricListener{

    private EagleServiceClientImpl client;
    private static final Logger LOG = LoggerFactory.getLogger(EagleServiceReporterMetricListener.class);

    public EagleServiceReporterMetricListener(String host, int port, String username, String password) {
        client = new EagleServiceClientImpl(host, port, username, password);
    }

    public EagleServiceReporterMetricListener(String host, int port) {
        client = new EagleServiceClientImpl(host, port, null, null);
    }

    public void onMetricFlushed(List<EagleMetric> metrics) {
        List<GenericMetricEntity> entities = new ArrayList<>();
        for (EagleMetric metric : metrics) {
            String metricName = metric.name;
            entities.add(MetricEntityAdaptor.convert(metricName, metric));
        }
        try {
            int total = entities.size();
            GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
            if(response.isSuccess()) {
                LOG.info("Wrote " + total + " entities to service");
            }else{
                LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
            }
        }
        catch (Exception ex) {
            LOG.error("Got exception while writing entities: ", ex);
        }
    }
}

 

Eagle Metric List

  • eagle.event.count: #event went into alert executor
  • eagle.policy.eval.count: #policy evaluated times, which equals #policy * #event
  • eagle.policy.eval.fail.count #event failed during evaluation
  • eagle.alert.count: #alert generated
  • eagle.alert.fail.count: #alert not successfully generated
  • eagle.kafka.message.consumer.lag: lag between total kafka offset & comsuer's offset
  • eagle.kafka.message.count: kafka message count on a specific key, like user=hadoop

 

  • No labels