Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The WithPreCommitTopology will be replaced by the SupportsPreCommitTopology mixin interface. The addPreCommitTopology method is changed so the input stream and the output stream could contain different types which is needed for the original goal of this FLIP. Also we need a different serializer for the WriteResultT objects if they are rebalanced between the steps. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer:

Code Block
languagejava
titleSupportsPreCommitTopology
public interface SupportsPreCommitTopology<WriterResultT, CommittableT>
        extends SupportsCommitter<CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<WriterResultT>> committables);

    /** Returns the serializer of the WriteResult type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}

...

The WithPostCommitTopology will be replaced by the SupportsPostCommitTopology mixin interface. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer mixin interface:

Code Block
languagejava
titleSupportsPostCommitTopology
public interface SupportsPostCommitTopology<CommittableT> extends SupportsCommitter<CommittableT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables);
}

...