Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface TimeWindowedKStream<K, V> {
    TimeWindowedKStream<K, V> emitStrategy(EmitStrategy strategy);
}

public interface SessionWindowedKStream<K, V> {
    SessionWindowedKStream<K, V> emitStrategy(EmitStrategy strategy);
}

public interface EmitStrategy {
    enum StrategyType {
        ON_WINDOW_CLOSE, // output final result
        ON_WINDOW_UPDATE;  // output for every record
    }      

	StrategyType type();

    static EmitStrategy onWindowClose() {
        return new WindowCloseStrategy();
    }

    static EmitStrategy onWindowUpdate() {
        return new WindowUpdateStrategy();
    }
}

public class WindowCloseStrategy implements EmitStrategy {     
	WindowCloseStrategy() {}

    StrategyType type() {
        return ON_WINDOW_CLOSE;
    }
}


I also propose to add following two metrics measuring the latency to emit final and number of records emitted for emit final:

Code Block
public class TaskMetrics {

    private static final String EMITTED_RECORDS = "emitted-records";
    private static final String EMITTED_RECORDS_DESCRIPTION = "emitted records";
    private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
    private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
        RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

    private static final String EMIT_FINAL_LATENCY = "emit-final" + LATENCY_SUFFIX;
    private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";
    private static final String EMIT_FINAL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
    private static final String EMIT_FINAL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;

    public static Sensor emittedRecordsSensor(final String threadId,
                                              final String taskId,
                                              final StreamsMetricsImpl streamsMetrics) {
        final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, EMITTED_RECORDS, RecordingLevel.INFO);
        addRateOfSumAndSumMetricsToSensor(
            sensor,
            TASK_LEVEL_GROUP,
            streamsMetrics.taskLevelTagMap(threadId, taskId),
            EMITTED_RECORDS,
            EMITTED_RECORDS_RATE_DESCRIPTION,
            EMITTED_RECORDS_TOTAL_DESCRIPTION
        );
        return sensor;
    }

    public static Sensor emitFinalLatencySensor(final String threadId,
                                              final String taskId,
                                              final StreamsMetricsImpl streamsMetrics) {
        return avgAndMaxSensor(
            threadId,
            taskId,
            EMIT_FINAL_LATENCY,
            EMIT_FINAL_AVG_LATENCY_DESCRIPTION,
            EMIT_FINAL_MAX_LATENCY_DESCRIPTION,
            RecordingLevel.DEBUG,
            streamsMetrics
        );
    }
}


Proposed Changes

We introduce several options of API changes discussed above to support output final result for windowed aggregations.  We also introduce two metrics to measure the emit final latency as well as the number of records emitted.

Compatibility, Deprecation, and Migration Plan

...