...
Code Block |
---|
|
/**
* This WindowAssigner assigns all elements to the same window that is fired iff the input
* streams reach EOF.
*/
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
private static final TimeWindow TIME_WINDOW_INSTANCE =
new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
private EndOfStreamWindows() {}
public static EndOfStreamWindows get() {
return INSTANCE;
}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(TIME_WINDOW_INSTANCE);
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new EndOfStreamTrigger();
}
@Override
public boolean isEventTime() {
return true;
}
private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
...
}
} |
2) Add OperatorAttributes class to provide information such as whether the Operator should be run in batch mode.
Code Block |
---|
|
package org.apache.flink.streaming.api.operators;
/**
* OperatorAttributes element provides Job Manager with information that can be
* used to optimize job performance.
*/
@PublicEvolving
public class OperatorAttributes {
private boolean requireBatchMode;
public OperatorAttributes() {
requireBatchMode = false;
}
public OperatorAttributes setRequireBatchMode(boolean requireBatchMode) {...}
}
/**
* Returns true iff the operator should be run in batch mode.
*
* When an operator runs in batch mode:
* - Its input and output edges are blocking.
* - Its checkpoint is disabled.
* - It uses BatchExecutionStateBackend.
* - It emits results after all input streams have reached end.
*/
public boolean requireBatchMode() {...}
} |
2) Add the isBatchForced() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block |
---|
|
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
default booleanOperatorAttributes isBatchForcedgetOperatorAttributes() {
return new falseOperatorAttributes();
}
}
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
default booleanOperatorAttributes isBatchForcedgetOperatorAttributes() {
return new falseOperatorAttributes();
}
} |
Proposed Changes
1) Add the isBatchForced() API to the Transformation interface.
Code Block |
---|
|
@Internal
public abstract class Transformation<T> {
defaultpublic boolean isBatchForcedrequireBatchMode() {
return false;
}
} |
Benchmark Results
...