You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Let's consider a use-case that requires a job graph shown in the figure below. pipelinedRegionA will aggregate records from the two bounded inputs using aggregate/join/cogroup operations and emit records only after all its bounded inputs have ended. And operatorB can start to process records from the input_3 only after having received inputs from pipelinedRegionA, and will need to run continuously since input_3 is unbounded.

Currently, supporting such use-case requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join. This approach has a couple drawbacks: 1) operatorB has to be deployed while not being able to do any useful work; 2) operatorB might waste a lot of memory and disk IO on buffering the records from the unbounded input until it receives records from the pipelinedRegionA; and 3) the throughput of aggregation operations (e.g. co-group) can be 10X worse in stream mode than in batch mode.

In this FLIP, we propose adding public APIs to support running a mixture of batch-mode and stream-mode operators in the same job where the stream part of the use-case can be executed after batch-part of the use-case, with both parts using the mode the best fits their needs. This capability can further enhance Flink as a stream-batch unified processing engine.



Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    ...

    public OperatorAttributesBuilder() {
        isOutputOnEOF = false;
        isOutputOnCheckpoint = false;
        hasInternalSorter = false;
    }

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}

    public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sorts data internally.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean getHasInternalSorter() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion.


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.

@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update the the CheckpointCoordinator

If any ExecutionVertex is reading from or writing to a blocking edge, then the checkpoint is disabled during this period.

If any operator reports getHasInternalSorter == true, then the checkpoint is disabled when this operator is running from a source with isBacklog=true.


4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.



Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.

Example Usages

data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.




  • No labels