You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Let's consider a use-case where we need to aggregate records from a bounded stream using OperatorA in a "batch manner" (i.e., emitting the aggregation result at the end of the input stream), followed by another two-input OperatorB to process both the aggregation result and the records from an unbounded stream in a "stream manner" (i.e., continuously emitting results without waiting for the end of the input streams). Currently, supporting such use-cases requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join.

Although we can obtain the correct result, the performance and cost of this DataStream job may be significantly worse than what can be practically achieved. For instance, we could save deployment costs by scheduling OperatorB only after OperatorA has finished running. And we might be able to significantly reduce the execution time for OperatorA by applying some "batch-mode" optimizations. For example, benchmark results show that the DataStream CoGroup operation in BATCH mode can be more than 10 times faster than in STREAMING mode.

In this FLIP, we propose adding public APIs for users and operator developers to specify whether the corresponding DataStream operations and operators should be executed in batch mode. Furthermore, we will update Flink's job graph generation logic to support running a combination of stream-mode and batch-mode operators within the same job. By enabling Flink to deliver optimal performance for use-cases that require both stream and batch mode operations in the same job, we can further enhance Flink as a stream-batch unified processing engine.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add the isBatchForced() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    default boolean isBatchForced() {
        return false;
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    default boolean isBatchForced() {
        return false;
    }
}


Proposed Changes


1) Add the isBatchForced() API to the Transformation interface.

@Internal
public abstract class Transformation<T> {
    default boolean isBatchForced() {
        return false;
    }
}


Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 100 sec.

This show that the same DataStream program in stream mode can be more than 10X faster with proposed change.


Example Usages


data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.




  • No labels