Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	    public <AGGstatic class MetricsAggregationConfig<AGG, V> void addMetricsAggregation( {

        public final String name,;
        public final String description;
        public final List<String> tagLabels;
        public final RecordingLevel recordingLevel;
        public final AGG initialAggregate;
      	  public final StringBiFunction<AGG, descriptionV, AGG> aggregationFunction;

        public MetricsAggregationConfig(final String name,
                                        final List<String>String tagLabelsdescription,
                                        final List<String> tagLabels,
       final RecordingLevel recordingLevel,
                                 final RecordingLevel recordingLevel,
                                 final Supplier<AGG> initialAggregateSupplier,
     final AGG initialAggregate,
                                        final BiFunction<AGG, V, AGG> aggregationFunction,);
    }

    public <AGG, V> void aggregateMetrics(final MetricsAggregationConfig<AGG, V> metricsAggregationConfig,
                                          final String groupOfMetricsToAggregate,
                                               final String nameOfMetricsToAggregate);
}

...

Code Block
languagejava
linenumberstrue
kafkaStreams.addMetricsAggregation(
	new MetricsAggregationConfig{
		"size-all-mem-tables-aggregation",
		"records the aggregation of the sizes of all mem-tables grouped by stream threads and task",
		Arrays.asList("thread-id", "task-id"),
		RecordingLevel.INFO,
		() -> BigInteger.valueOf(0),
	(oldAggregate, value) -> oldAggregate + value	BigInteger::add
	),
	"stream-state-metrics",
	"size-all-mem-tables"    
);

...