Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 12

Table of Contents

Status

Current state: Accepted

Discussion thread: here

...

I also propose to add following two metrics measuring the latency to emit final and number of records emitted for emit final:

Each exposed metric will have the following tags:

  • thread-id = [thread ID],
  • task-id = [task ID]
  • processor-node-id = [node ID]

The following metrics will be exposed in the Kafka Streams' metrics

  • emit-final-latency-max (max latency to emit final records when it COULD be emitted)
  • emit-final-latency-avg (avg latency to emit final records when it could be emitted)
  • emit-final-records-rate (rate of records emitted when it COULD be emitted)
  • emit-final-records-total (total number of records emitted)

The recording level for all metrics will be DEBUG and their group will be stream-processor-node-metrics.


As for the implementation to support emit-final, for time windows (hopping, tumbling, sliding) we already have the needed API to range-over the finalized and immutable windows to emit them, however for session windows we do not yet have the corresponding APIs to do so. Hence as part of this KIP, we also propose to add a new range API in the SessionStore to support this feature. Note this will be only add on the SessionStore API not the ReadOnlySessionStore API so that it would not be exposed for interactive query usage.

Code Block
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

    /**
     * Return all the session window entries that ends between the specified range (both ends are inclusive).
     * This function would be used to retrieve all closed and immutable windows.
     *
     * @param earliestSessionEndTime earliest session end time to search from, inclusive
     * @param latestSessionEndTime latest session end time to search to, inclusive
     */
    default KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime,
Code Block
public class TaskMetrics {

    private static final String EMITTED_RECORDS = "emitted-records";
    private static final String EMITTED_RECORDS_DESCRIPTION = "emitted records";
    private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
    private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
        RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

    private static final String EMIT_FINAL_LATENCY = "emit-final" + LATENCY_SUFFIX;
    private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";
    private static final String EMIT_FINAL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
    private static final String EMIT_FINAL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;

    public static Sensor emittedRecordsSensor(final String threadId,
                                              final String taskId,
                                              final StreamsMetricsImpl streamsMetrics) {
        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, EMITTED_RECORDS, RecordingLevel.INFO);
        addRateOfSumAndSumMetricsToSensor(
            sensor,
            TASK_LEVEL_GROUP,
            streamsMetrics.taskLevelTagMap(threadId, taskId),
            EMITTED_RECORDS,
            EMITTED_RECORDS_RATE_DESCRIPTION,
            EMITTED_RECORDS_TOTAL_DESCRIPTION
        );
        return sensor;
    }

    public static Sensor emitFinalLatencySensor(final String threadId,
                                              final String taskId,
                                              final StreamsMetricsImpllong streamsMetricslatestSessionEndTime) {
        throw returnnew avgAndMaxSensorUnsupportedOperationException(
            threadId,
    "This API is not supported by this implementation taskId,
            EMIT_FINAL_LATENCY,of SessionStore.");
            EMIT_FINAL_AVG_LATENCY_DESCRIPTION,
            EMIT_FINAL_MAX_LATENCY_DESCRIPTION,}

    // other       RecordingLevel.DEBUG,
            streamsMetrics
        );
    }existing functions
}


Proposed Changes

We introduce several options of API changes discussed above to support output final result for windowed aggregations. We also introduce two metrics to measure the emit final latency as well as the number of records emitted.

...