...
Code Block |
---|
|
/**
* This WindowAssigner assigns all elements to the same window that is fired iff the input
* streams reach EOF.
*/
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
private static final TimeWindow TIME_WINDOW_INSTANCE =
new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
private EndOfStreamWindows() {}
public static EndOfStreamWindows get() {
return INSTANCE;
}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(TIME_WINDOW_INSTANCE);
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new EndOfStreamTrigger();
}
@Override
public boolean isEventTime() {
return true;
}
private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
...
}
} |
2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.
Code Block |
---|
|
package org.apache.flink.streaming.api.operators;
/** The builder class for {@link OperatorAttributes |
...
}. */
@PublicEvolving
public class OperatorAttributesBuilder {
...
public OperatorAttributesBuilder() {
isOutputOnEOF = false;
isOutputOnCheckpoint = false;
hasInternalSorter = false;
}
public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}
public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}
public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}
public OperatorAttributes build() {...}
} |
Code Block |
---|
|
package org.apache.flink.streaming.api.operators;
/**
* OperatorAttributes element provides Job Manager with information that can be
* used to optimize job performance.
*/
@PublicEvolving
public class OperatorAttributes {
/**
private boolean requireBatchMode;
* Returns true iff the operator can only emit records after inputs have reached EOF.
*
public OperatorAttributes() { * <p>Here are the implications when it is true:
*
requireBatchMode =* false;
}
<ul>
* <li>Its output edges are blocking.
* </ul>
*/
public OperatorAttributesboolean setRequireBatchModegetIsOutputOnEOF(boolean requireBatchMode) {...}
}
/**
* Returns true iff the operator can only emit records when checkpoint is triggered.
*
* <p>If true, the job should betrigger runcheckpoint in batch modeorder to flush data to sinks.
*/
public boolean getIsOutputOnCheckpoint() {...}
/** When an
* Returns true iff the operator runs in batch mode:sorts data internally.
*
* - Its input and output edges are blocking.<p>Here are the implications when it is true:
*
* <ul>
* - Its<li>Its checkpoint is disabled.
* - It uses BatchExecutionStateBackend <li>Its input records do not need to be sorted externally.
* - It emits<li>Its resultsmanaged aftermemory allshould inputbe streamsset haveaccording reached endto execution.sorted-inputs.memory.
* </ul>
*/
public boolean requireBatchModegetHasInternalSorter() {...}
} |
23) Add the isBatchForcedgetOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block |
---|
|
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesOperatorAttributesBuilder().build();
}
}
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesOperatorAttributesBuilder().build();
}
} |
Proposed Changes
1) Add the isBatchForcedAdd the getIsOutputOnEOF() and getHasInternalSorter() API to the Transformation interface.
Code Block |
---|
|
@Internal
public abstract class Transformation<T> {
public boolean requireBatchModegetIsOutputOnEOF() {
return false;
}
public boolean getHasInternalSorter() {
return false;
}
} |
2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.
Benchmark Results
1) DataStream#CoGroup
...
- Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
- Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
- With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records, average execution time is 100 38 sec.
This show that the same DataStream program in stream mode can be more than 10X 20X faster with proposed change.
...