You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current stateDiscussion

Discussion threadhttps://mail-archives.apache.org/mod_mbox/flink-dev/202105.mbox/%3CCAGkx0%3DR7-vmOqDW-PkHZ75J8Rz1PiNu_f5f1wDeRe30EUXBpGA%40mail.gmail.com%3E

JIRA Unable to render Jira issues macro, execution error.

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Streaming storage systems are evolving to better support event-time processing, and their sink implementations have need for receiving watermarks as they arrive from upstream operators.  Today, a sink may only obtain the current watermark as it processes each element; this FLIP proposes to enhance the Sink API to process watermarks directly.

Some specific scenarios:

  1. Watermark propagation in multi-stage flows.  By persisting watermark information that arrives at the sink, subsequent stages may construct source watermarks with higher fidelity than with current approaches.   An example of this scenario may be seen in the Flink codebase, see FlinkKafkaShuffle.java.
  2. Sophisticated sinks that flush internal buffers, mutate state, or otherwise take action at certain points in event time. 

Public Interfaces

New methods are proposed on the various sink interfaces.

Note that the new methods use the public Watermark class from flink-core, as opposed to the Watermark class (extends StreamElement) from flink-streaming-java.

SinkFunction

A new method to write or process watermarks from upstream operators.

package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.api.common.eventtime.Watermark;

@Public
public interface SinkFunction<IN> extends Function, Serializable {
/**
* Write the given watermark to the sink. This function is called for every watermark.
*
* @param watermark The watermark.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
default void writeWatermark(Watermark watermark) throws Exception {}

default void invoke(IN value, Context context) throws Exception {}
}

SinkWriter

A new method to write watermarks from upstream operators.

package org.apache.flink.api.connector.sink;
import org.apache.flink.api.common.eventtime.Watermark;

public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

/**
* Add a watermark to the writer.
*
* @param watermark The watermark.
* @throws IOException if fail to add a watermark.
*/
default void writeWatermark(Watermark watermark) throws IOException {}

void write(InputT element, Context context) throws IOException;
}

Proposed Changes

  1. Update the public sink interfaces as described above.
  2. Update the legacy streaming sink operator (StreamSink) and the new unified sink operator (AbstractSinkWriterOperator) to invoke the new interface methods.

Compatibility, Deprecation, and Migration Plan

No compatibility issues are anticipated.   Specifically:

  1. The interface methods have a default implementation.
  2. The proposed method name (writeWatermark) is new to the Flink APIs to further minimize the impact. 

Test Plan

The Flink codebase has an integration test (FileReadingWatermarkITCase) which involves counting watermarks that arrive at a test sink.   We will update the test code to use the new functionality.

The functionality will be tested alongside an experimental version of Apache Pulsar which supports watermarking on its producer and consumer API.  The testing will focus on watermark propagation under various conditions (e.g. parallelism, transactions, task recovery).

Rejected Alternatives

  1. Combine a process function and a sink function to achieve similar effects with increased complexity.
  2. Introduce event-time timers to sink functions.  This approach seems too element-centric for the use cases.   Also, event-time timers are generally keyed.



  • No labels