Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Extend the unified Sink interface by allowing to create sink coordinator similar to the SourceEnumerator `SourceEnumerator` [4]. The creation is optional so the forwarding of the committable to the Job Manager will also only happen if the connector specifies one. In general, the behaviour of sending the committables does not change (sending them on preSnapshot) and the sink coordinator needs to write them into the checkpoint.

...

This approach is comparable to the current implementation in Flink’s Table API to solve the small-file-compaction problem [5] but we make it more extensible.

...

Code Block
languagejava
firstline1
titleCommittableAggregator
public interface CommittableAggregator<CommT> {

    /**
     * Computes a key for a committable to determine the distribution from sink writers across downstream aggregators.
     *
     * @return the key derived from the committable
     */
    default <KEY> Optional<KeySelector<CommT, KEY>> getPreAggregateSelector() {
        return Optional.empty();
    }

    /**
     * Computes a key for a committable to determine the distribution across downstream committers.
     *
     * @return the key derived from the committable
     */
    default <KEY> Optional<KeySelector<CommT, KEY>> getPostAggregateSelector() {
        return Optional.empty();
    }

    /**
     * Sets the parallelism of the committable aggregator.
     * 
     * @return empty defaults to the overall job parallelism or the parallelism to use
     */
    default OptionalInt getParallelism() {
        return OptionalInt.empty();
    }

    /**
     * Aggregates the committables emitted by the {@link SinkWriter}s and emits a new set of
     * committables to the downstream committers.
     *
     * @param committables committables sent by the {@link SinkWriter}s.
     * @return committables forwarded to the committers
     */
    List<CommT> aggregate(List<CommT> committables);
}

In the case of the small-file-compaction problem, the parallelism will likely be set to 1 for the committable aggregator so that compaction across multiple subtasks is possible. In addition, the approach also supports compaction across multiple checkpoints if the committables in on checkpoint do not meet the required size.

...

Another problem is that currently, flink-core does not know the DataStream classes currently which are in flink-streaming-java to  to circumvent this issue we could have the implementation in flink-streaming-java and only provide some base interface in flink-core. This has a downside in that it adds unnecessary complexity and might confuse users.

...