Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributes class to provide information such as whether the Operator should be run in batch mode.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving
public class OperatorAttributes {

    private boolean requireBatchMode;

    public OperatorAttributes() {
        requireBatchMode = false;
    }

    public OperatorAttributes setRequireBatchMode(boolean requireBatchMode) {...}
    }

    /**
     * Returns true iff the operator should be run in batch mode.
     *
     * When an operator runs in batch mode:
     * - Its input and output edges are blocking.
     * - Its checkpoint is disabled.
     * - It uses BatchExecutionStateBackend.
     * - It emits results after all input streams have reached end.
     */
    public boolean requireBatchMode() {...}
}


2) Add the isBatchForced() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    default booleanOperatorAttributes isBatchForcedgetOperatorAttributes() {
        return new falseOperatorAttributes();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    default booleanOperatorAttributes isBatchForcedgetOperatorAttributes() {
        return new falseOperatorAttributes();
    } 
 }


Proposed Changes

1) Add the isBatchForced() API to the Transformation interface.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
      defaultpublic boolean isBatchForcedrequireBatchMode() {
        return false;
    }
}


Benchmark Results

...