You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Let's consider a use-case that requires a job graph shown in the figure below. pipelinedRegionA will aggregate records from the two bounded inputs using aggregate/join/cogroup operations and emit records only after all its bounded inputs have ended. And operatorB can start to process records from the input_3 only after having received inputs from pipelinedRegionA, and will need to run continuously since input_3 is unbounded.

Currently, supporting such use-case requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join. This approach has a couple drawbacks: 1) operatorB has to be deployed while not being able to do real work; 2) operatorB might waste a lot of memory and disk IO on buffering the records from the unbounded input until it receives records from the pipelinedRegionA; and 3) the throughput of aggregation operations (e.g. co-group) can be 10X worse in stream mode than in batch mode.=

In this FLIP, we propose adding public APIs to support running a mixture of batch-mode and stream-mode operators in the same job, with blocking edges added between these operators. We hope to enable Flink to deliver optimal performance for use-cases that require both stream and batch mode operations, and further enhance Flink as a stream-batch unified processing engine.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    ...

    public OperatorAttributesBuilder() {
        isOutputOnEOF = false;
        isOutputOnCheckpoint = false;
        hasInternalSorter = false;
    }

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}

    public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sorts data internally.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its checkpoint is disabled.
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean getHasInternalSorter() {...}
}


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.

@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 38 sec.

This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.

Example Usages


data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.




  • No labels