You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently in Kafka Streams, windowed aggregations always emit an aggregated result for each input record. The main problem with this approach is that users sometimes just need a final result when a window closes. For example, if a user defines a 1 day tumbling window

but before 1 day ends, there could be multiple results using existing output mechanism. This not only increases downstream system load, but also make it hard to interpret  the results since most of the results are partial and the user can't tell which result is final. An existing solution

is to use suppressed.untilWindowCloses  operator to buffer the upstream aggregated results and only output final results when windows close. There are several issues using existing suppress operators:

  • It's semantically undesirable since outputting final results when window closes should be controlled by window level api.
  • Existing suppress operator uses a separate in-memory buffer to buffer all results as well as a new changelog to support durability.
  • The information suppress operator needs to output final result is also maintained in windowed operator. The suppress operator seems redundant which adds performance overhead of memory, CPU, disk and network. In addition, maintaining a new changelog topic also adds more operational overhead.

To resolve these issues, we propose to add a new API to control windowed aggregation output behavior and possibly other behavior later on.

Public Interfaces

There are three options to change the public interface in order of preference:

Option 1:

In existing TimeWindowedKStream  and SessionWindowedKStream  interfaces.

public interface TimeWindowedKStream<K, V> {
    TimeWindowedKStream<K, V> emitFinal();
}

public interface SessionWindowedKStream<K, V> {
    SessionWindowedKStream<K, V> emitFinal();
}

Similar API will also be added to TimeWindowedCogroupedKStream and SessionWindowedCogroupedKStream.

Option 2:

In existing KGroupedStream  and CogroupedKStream  interfaces.

public interface KGroupedStream<K, V> {
    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows); // Already exist
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);    // Already exist
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows); // Already exist

    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows, WindowConfig config); // new
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, WindowCongig config);    // New
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, WindowConfig config); // New
}

public interface EmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements EmitConfig {
    EmitFinalConfig() {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}

Similar API will be added to CogroupedKStream.


Option 3:

 In existing Windows , SlidingWindows  and SessionWindows  classes.

public abstract class Windows<W extends Window> {
    public abstract EmitConfig getEmitConfig(); // New
}

public final class SlidingWindows {
     public abstract EmitConfig getEmitConfig(); // New
}

public final class SessionWindows {
     public abstract EmitConfig getEmitConfig(); // New
}  

public interface EmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements EmitConfig {
    EmitFinalConfig() {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}


Proposed Changes

We introduce several options of API changes discussed above to support output final result for windowed aggregations. 

Compatibility, Deprecation, and Migration Plan

We also plan to introduce a new default state store supporting more efficient time range lookup for emit final windows. This won't be backward compatible with existing state store and windowed aggregation type. However, 

if users continue to use their existing state stores, this should be backward compatible with worse performance maybe.

Rejected Alternatives

In the Public Interface section, I listed 3 API change options. I prefer option 1 and the reasons are:

  • It's concise and is a builder kind of interface which is more descriptive when applied to TimeWindowedKStream  and SessionWindowedKStream 
  • In option 2, I feel WindowConfig  should already be part of Window. Then in option 3, I feel EMIT_FINAL and EMIT_EAGER  should be applied to Stream  instead of Window 

The drawbacks of option 1 and 2 are:

  • Not every type windows can support emit final. For example, UnlimitedWindows  shouldn't support emit final.
  • No labels