Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public interface TimeWindowedKStream<K, V> {
    TimeWindowedKStream<K, V> emitFinal(final int allowedLateness);
}

public interface SessionWindowedKStream<K, V> {
    SessionWindowedKStream<K, V> emitFinal(final int allowedLateness);
}

Similar API will also be added to TimeWindowedCogroupedKStream and SessionWindowedCogroupedKStream.

...

Code Block
public interface KGroupedStream<K, V> {
    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows); // Already exist
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows);    // Already exist
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows); // Already exist

    <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows, WindowConfig config); // new
    TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, WindowCongig config);    // New
    SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, WindowConfig config); // New
}

public interface WindowConfigEmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements WindowConfigEmitConfig {
    EmitFinalConfig(final int allowedLateness);) {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}

...

Code Block
public abstract class Windows<W extends Window> {
    public abstract WindownConfigEmitConfig getConfiggetEmitConfig(); // New
}

public final class SlidingWindows {
     public abstract WindownConfigEmitConfig getConfiggetEmitConfig(); // New
}

public final class SessionWindows {
     public abstract WindownConfigEmitConfig getConfiggetEmitConfig(); // New
}  

public interface WindowConfigEmitConfig {
    enum ConfigType {
        EMIT_FINAL, // output final result
        EMIT_EAGER;  // output for every record
    }

    ConfigType type();

    static EmitConfig emitFinal() {
        return new EmitFinalConfig();
    }

    static EmitConfig emitEager() {
        return new EmitEagerConfig(); // EmitEagerConfig will be similar to EmigFinalConfig
    }
}

public class EmitFinalConfig implements WindowConfigEmitConfig {
    EmitFinalConfig(final int allowedLateness);) {}

    ConfigType type() {
        return EMIT_FINAL;
    }
}

...