Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Watermark-propagation-with-Sink-API-td51029.html
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-22700

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Watermark propagation in multi-stage flows.  By persisting watermark information that arrives at the sink, subsequent stages may construct source watermarks with higher fidelity than with current approaches.   An example of this scenario may be seen in the Flink codebase, see FlinkKafkaShuffle.java.
  2. Sophisticated sinks that flush internal buffers, mutate state, or otherwise take action at certain points in event time. 

Note that support for event-time timers is out-of-scope of this proposal.

Public Interfaces

New methods are proposed on the various sink interfaces.

...

package org.apache.flink.api.connector.sink;
import org.apache.flink.api.common.eventtime.Watermark;

@Experimental
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

/**
* Add a watermark to the writer.
*
* @param watermark The watermark.
* @throws IOException if fail to add a watermark.
*/
default void writeWatermark(Watermark watermark) throws IOException {}

void write(InputT element, Context context) throws IOException;
}

...

  1. Update the public sink interfaces as described above.
  2. Update the legacy streaming sink operator (StreamSink) and the new unified sink operator (AbstractSinkWriterOperator) to invoke the new interface methods.Update

Compatibility, Deprecation, and Migration Plan

No compatibility issues are anticipated.   SpecificallyRegarding backwards compatibility:

  1. The interface methods have a default implementation.
  2. The proposed method name (writeWatermark) is new to the Flink APIs to further minimize the impact. avoid possible collisions.

Regarding binary compatibility of API signatures (japicmp):

  1. SinkFunction is already excluded from the checks that are performed.
  2. SinkWriter is an Experimental interface (as of Flink 1.13), all of which are excluded from the checks that are performed.

Test Plan

The Flink codebase has an integration test (FileReadingWatermarkITCase) which involves counting watermarks that arrive at a test sink.   We will update the test code to use the new functionality.

The functionality will be tested alongside an experimental version of Apache Pulsar which supports watermarking on its producer and consumer API.  The testing will focus on watermark propagation under various conditions (e.g. parallelism, transactions, task recovery).

Rejected Alternatives

...

  1. Combine a process function and a sink function to achieve similar effects with increased complexity.
  2. Introduce event-time timers to sink functions.  This approach seems too element-centric for the use cases.   Also, event-time timers are generally keyed.

Future Enhancements

  1. Add support for idleness.  A sink operator may become idled due to upstream status changes.  A future enhancement would be to inform the sink function in this situation.