Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes

...

}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    ...

    public OperatorAttributesBuilder() {
        isOutputOnEOF = false;
        isOutputOnCheckpoint = false;
        hasInternalSorter = false;
    }

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}

    public OperatorAttributes build() {...}
}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
    private boolean requireBatchMode;
 * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
    public OperatorAttributes() { * <p>Here are the implications when it is true:
     *
   requireBatchMode  =* false;
    }
<ul>
     *   <li>Its output edges are blocking.
     * </ul>
     */
    public OperatorAttributesboolean setRequireBatchModegetIsOutputOnEOF(boolean requireBatchMode) {...}
    }

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should betrigger runcheckpoint in batch modeorder to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /** When an
     * Returns true iff the operator runs in batch mode:sorts data internally.
     *
     * - Its input and output edges are blocking.<p>Here are the implications when it is true:
     *
     * <ul>
     * -  Its<li>Its checkpoint is disabled.
     * - It uses BatchExecutionStateBackend   <li>Its input records do not need to be sorted externally.
     * - It emits<li>Its resultsmanaged aftermemory allshould inputbe streamsset haveaccording reached endto execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean requireBatchModegetHasInternalSorter() {...}
}


23) Add the isBatchForcedgetOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesOperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesOperatorAttributesBuilder().build();
    } 
 }


Proposed Changes

1)  Add the isBatchForcedAdd the getIsOutputOnEOF() and getHasInternalSorter() API to the Transformation interface.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean requireBatchModegetIsOutputOnEOF() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


Benchmark Results

1) DataStream#CoGroup

...

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 100 38 sec.

This show that the same DataStream program in stream mode can be more than 10X 20X faster with proposed change.

...