Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add metrics

...

No Format
public class Suppression {
    public enum BufferFullStrategy {
        EMIT,
        SPILL_TO_DISK,
        SHUT_DOWN
    }

    public static class IntermediateSuppression {

        public IntermediateSuppression();

        public static IntermediateSuppression emitAfter(final Duration timeToWaitForMoreEvents);

        public static IntermediateSuppression bufferKeys(final long numberOfKeysToRemember);

        public static IntermediateSuppression bufferBytes(final long bytesToUseForSuppressionStorage);

        public static IntermediateSuppression bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
    
    }

    public Suppression();

    publicstatic Suppression suppressLateEvents(final Duration maxAllowedLateness);

    public static Suppression suppressIntermediateEvents(final IntermediateSuppression intermediateSuppression);

}


Metrics

Along with the suppression operator, we will add several metrics. Note that suppress will not add to the skipped-records metrics. "Skipped" records are records that are for one reason or another invalid. "Suppressed" records are intentionally dropped, just like filtered records.

Metrics we'll add:

  • late event suppression
    • average and max observed lateness: to help configure the maxAllowedLateness
      • (INFO) event-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of late event suppressions
      • (INFO) late-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFO) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • current, average, and peak intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)
      • (INFO) suppression-disk-buffer-size-[current | avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • intermediate suppression buffer eviction rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFO) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of intermediate event suppressions
      • (INFO) intermediate-event-suppression-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-event-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

Examples

Here are some examples of programs using the new operator to achieve various goals.

...