Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To resolve these issues, we propose to add a new API to control windowed aggregation output behavior and possibly other behavior later on.

Public Interfaces

There are three options to change the public interface in order of preference:

Option 1:

In existing TimeWindowedKStream  and SessionWindowedKStream  interfaces.

Code Block
public interface TimeWindowedKStream<K, V> {
    TimeWindowedKStream<K, V> emitFinaltrigger(Trigger trigger);
}

public interface SessionWindowedKStream<K, V> {
    SessionWindowedKStream<K, V> emitFinal();
}

Similar API will also be added to TimeWindowedCogroupedKStream and SessionWindowedCogroupedKStream.

Option 2:

...

 trigger(Trigger trigger);
}

public interface Trigger {
    enum TriggerType {
        ON_WINDOW_CLOSE, // output final result
        ON_WINDOW_UPDATE;  // output for every record
    }

    ConfigType type();

    static Trigger onWindowClose() {
        return new WindowCloseTrigger();
    }

    static Trigger onWindowUpdate() {
        return new WindowUpdateTrigger();
    }
}

public class WindowCloseTrigger implements Trigger {
    WindowCloseTrigger() {}

    ConfigType type() {
        return ON_WINDOW_CLOSE;
    }
}

Proposed Changes

We introduce several options of API changes discussed above to support output final result for windowed aggregations. 

Compatibility, Deprecation, and Migration Plan

We also plan to introduce a new default state store supporting more efficient time range lookup for emit final windows. This won't be backward compatible with existing state store and windowed aggregation type. However, 

if users continue to use their existing state stores, this should be backward compatible with worse performance maybe.

Rejected Alternatives

First rejected option

Code Block
public interface KGroupedStream<K, V> {
    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows); // Already exist
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);    // Already exist
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows); // Already exist

    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows, WindowConfigEmitConfig config); // new
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, WindowCongigEmitConfig config);    // New
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, WindowConfigEmitConfig config); // New
}

public interface EmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements EmitConfig {
    EmitFinalConfig() {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}

Similar API will be added to CogroupedKStream.

...

This option is rejected because 

  1. It creates more overloading of the windowdedBy  function.
  2. It operates on KGroupedStream  and tries to make it a windowed stream as well as configuring the emit policy for the stream. I think this is against the builder pattern.


Second rejected option

 In existing Windows , SlidingWindows  and SessionWindows  classes.

Code Block
public abstract class Windows<W extends Window> {
    public abstract EmitConfig getEmitConfig(); // New
}

public final class SlidingWindows {
     public abstract EmitConfig getEmitConfig(); // New
}

public final class SessionWindows {
     public abstract EmitConfig getEmitConfig(); // New
}  

public interface EmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements EmitConfig {
    EmitFinalConfig() {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}

...

We introduce several options of API changes discussed above to support output final result for windowed aggregations. 

Compatibility, Deprecation, and Migration Plan

We also plan to introduce a new default state store supporting more efficient time range lookup for emit final windows. This won't be backward compatible with existing state store and windowed aggregation type. However, 

if users continue to use their existing state stores, this should be backward compatible with worse performance maybe.

Rejected Alternatives

In the Public Interface section, I listed 3 API change options. I prefer option 1 and the reasons are:

  • It's concise and is a builder kind of interface which is more descriptive when applied to TimeWindowedKStream  and SessionWindowedKStream 
  • In option 2, I feel WindowConfig  should already be part of Window. Then in option 3, I feel EMIT_FINAL and EMIT_EAGER  should be applied to Stream  instead of Window 

The drawbacks of option 1 and 2 are:

...

This option is rejected because

  1. Emit config should operator on the stream and configure how it should be outputted. Window definition shouldn't control that.