Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Sink interface introduced in FLIP-143 has some limitations when implementing advanced sinks. The goal of this FLIP is to add additional exceptions and fields to Sink#InitContext
to facilitate more use cases. Most importantly, information necessary to port existing SinkFunctions
to the Sink
interface and to support asynchronous communication patterns (FLIP-171).
Public Interfaces
- We want to adjust the
SinkWriter
interface to allow interruptable writes. - We also extend the
Sink#InitContext
to provide more information and opportunities to the user.
Proposed Changes
For asynchronous calls to the external system, the sink needs to propagate some backpressure upstream by blocking the write
. However, there is no way to map interruptions correctly, such that in the worst case cancellation requires a hard kill. We propose to allow write
to throw InterruptedException
. Similarly, ProcessingTimeService#onProcessingTime
may throw InterruptedException
.
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable { /** * Add an element to the writer. * * @param element The input record * @param context The additional information about the input record * @throws IOException if fail to add an element * @throws InterruptedException when interrupted */ void write(InputT element, Context context) throws IOException, InterruptedException; ... // other methods }
For initializing the SinkWriter
, Sink#createWriter
gets an InitContext
that contains auxiliary information and services. We propose to add the following to the existing InitContext
:
/** The interface exposes some runtime info for creating a {@link SinkWriter}. */ interface InitContext { /** * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, * but are part of the jar file of a user job. * * @see UserCodeClassLoader */ UserCodeClassLoader getUserCodeClassLoader(); /** @return number of parallel Sink tasks. */ int getNumberOfParallelSubtasks(); /** * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task * thread in between record processing. * * <p>Note that this method should not be used per-record for performance reasons in the * same way as individual records should not be sent individually. Rather, implementers are * expected to batch records and only enqueue a single {@link Runnable} per batch to handle * the result. */ MailboxExecutor getMailboxExecutor(); ... // existing methods }
The UserCodeClassLoader
is necessary if any serialization schema needs to be created from a class name and providing it in the sinks is symmetric to the FLIP-27 source (available in SourceReaderContext
).
Similarly, getNumberOfParallelSubtasks
is used to distribute work in conjunction with the already existing getSubtaskId
. Again, sources provide the information with SplitEnumeratorContext#currentParallelism
.
Finally, asynchronous sinks currently have no way to process asynchronous responses without resorting to complicated and slow threading models. For that we want to move MailboxExecutor
from flink-streaming
to flink-core
into the to the package org.apache.flink.api.common.operators
and expose it in InitContext
similar to ProcessingTimeService
. The expectation is that most sinks do not use it directly but with a more user-friendly abstraction in FLIP-171. See appendix for a usage example
Compatibility, Deprecation, and Migration Plan
- Most changes are simple additions with no impact on existing code. Moving the
MailboxExecutor
would break existing code that relies on the experimentalYieldingOperatorFactory
. We are not aware of external users. If we want to maintain backward compatibility, we can add a simpleMailboxExecutor
at the old location that extends from the moved interface and keep the old reference in the operator factory. That interface should be immediately@deprecated
.
Test Plan
These simple API changes will be covered by extending and adding unit and integration tests.
Rejected Alternatives
No alternatives yet.
Appendix
Appendix 1 - Usage example of the MailboxExecutor
/** Base class for out testing {@link SinkWriter Writers}. */ class AsyncSinkWriter implements SinkWriter<Integer, String, String>, Serializable { private final MailboxExecutor mailboxExecutor; private final int capacity = 10; private int activeElements = 0; AsyncSinkWriter(MailboxExecutor mailboxExecutor) { this.mailboxExecutor = mailboxExecutor; } @Override public void write(Integer element, Context context) throws InterruptedException { activeElements++; // in this example, records are directly send; however, it's recommended to use that pattern with batches only sendAsync(element, r -> processResult(element, r)); while (activeElements > capacity) { // yielding is necessary, such that processResultInTaskThread can be executed mailboxExecutor.yield(); } } /** Processes the result in the external thread. Concurrent call to write possible. */ private void processResult(Integer element, Result result) { mailboxExecutor.execute( () -> processResultInTaskThread(element, result), "Process result of %s", element); } /** Processes the result in the task thread. No concurrent call to write possible. */ private void processResultInTaskThread(Integer element, Result result) { if (result.isGood()) { // no synchronization needed to access fields activeElements--; } else { sendAsync(element, r -> processResult(element, r)); } } // placeholder for some external call private void sendAsync(Integer element, Consumer<Result> resultListener) { // in external thread, send element, call resultListener } }