Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 12

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
public interface TimeWindowedKStream<K, V> {
    TimeWindowedKStream<K, V> triggeremitStrategy(TriggerEmitStrategy triggerstrategy);
}

public interface SessionWindowedKStream<K, V> {
    SessionWindowedKStream<K, V> triggeremitStrategy(TriggerEmitStrategy triggerstrategy);
}

public interface TriggerEmitStrategy {
    enum TriggerTypeStrategyType {
        ON_WINDOW_CLOSE, // output final result
        ON_WINDOW_UPDATE;  // output for every record
    }

      

	StrategyType ConfigType type();

    static TriggerEmitStrategy onWindowClose() {
        return new WindowCloseTriggerWindowCloseStrategy();
    }

    static TriggerEmitStrategy onWindowUpdate() {
        return new WindowUpdateTriggerWindowUpdateStrategy();
    }
}

public class WindowCloseTriggerWindowCloseStrategy implements TriggerEmitStrategy {
      WindowCloseTrigger
	WindowCloseStrategy() {}

    ConfigTypeStrategyType type() {
        return ON_WINDOW_CLOSE;
    }
}


I also propose to add following two metrics measuring the latency to emit final and number of records emitted for emit final:

Each exposed metric will have the following tags:

  • thread-id = [thread ID],
  • task-id = [task ID]
  • processor-node-id = [node ID]

The following metrics will be exposed in the Kafka Streams' metrics

  • emit-final-latency-max (max latency to emit final records when it COULD be emitted)
  • emit-final-latency-avg (avg latency to emit final records when it could be emitted)
  • emit-final-records-rate (rate of records emitted when it COULD be emitted)
  • emit-final-records-total (total number of records emitted)

The recording level for all metrics will be DEBUG and their group will be stream-processor-node-metrics.


As for the implementation to support emit-final, for time windows (hopping, tumbling, sliding) we already have the needed API to range-over the finalized and immutable windows to emit them, however for session windows we do not yet have the corresponding APIs to do so. Hence as part of this KIP, we also propose to add a new range API in the SessionStore to support this feature. Note this will be only add on the SessionStore API not the ReadOnlySessionStore API so that it would not be exposed for interactive query usage.

Code Block
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

    /**
     * Return all the session window entries that ends between the specified range (both ends are inclusive).
     * This function would be used to retrieve all closed and immutable windows.
     *
     * @param earliestSessionEndTime earliest session end time to search from, inclusive
     * @param latestSessionEndTime latest session end time to search to, inclusive
     */
    default KeyValueIterator<Windowed<K>, AGG> findSessions(final long earliestSessionEndTime,
                                                            final long latestSessionEndTime) {
        throw new UnsupportedOperationException(
                "This API is not supported by this implementation of SessionStore.");
    }

    // other existing functions
}


Proposed Changes

We introduce several options of API changes discussed above to support output final result for windowed aggregations.  We also introduce two metrics to measure the emit final latency as well as the number of records emitted.

Compatibility, Deprecation, and Migration Plan

...