Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

...

...

JIRA: tbd

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-24041

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

...

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that completed (successfully or unsuccessfully) are
     * automatically decrementing the counter. Any request entry that was not
     * successfully persisted needs to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private int inFlightRequestsCount;


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {
        // blocks if too many elements have been buffered
        while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
            mailboxExecutor.yield();
        }

        bufferedRequestEntries.add(elementConverter.apply(element, context));

        // blocks if too many async requests are in flight
        flush();
    }


    /**
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
     * buffering hints.
     *
     * The method blocks if too many async requests are in flight.
     */
    private void flush() throws InterruptedException {
        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

            // create a batch of request entries that should be persisted in the destination
            ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);

            while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
                try {
                    batch.add(bufferedRequestEntries.remove());
                } catch (NoSuchElementException e) {
                    // if there are not enough elements, just create a smaller batch
                    break;
                }
            }

            ResultFuture<RequestEntryT> requestResult =
                    failedRequestEntries -> mailboxExecutor.execute(
                            () -> completeRequest(failedRequestEntries),
                            "Mark in-flight request as completed and requeue %d request entries",
                            failedRequestEntries.size());

            while (inFlightRequestsCount >= MAX_IN_FLIGHT_REQUESTS) {
                mailboxExecutor.yield();
            }

            inFlightRequestsCount++;
            submitRequestEntries(batch, requestResult);
        }
    }


    /**
     * Marks an in-flight request as completed and prepends failed requestEntries back to the
     * internal requestEntry buffer for later retry.
     *
     * @param failedRequestEntries requestEntries that need to be retried
     */
    private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
        inFlightRequestsCount--;

        // By just iterating through failedRequestEntries, it reverses the order of the
        // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
        // does not make any order guarantees, but may cause avoidable reorderings for other
        // destinations.
        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In flight requests will be retried if the sink is still healthy. But if
   in-flight requests
   * in-flight requests* fail after a checkpoint has been triggered and Flink
     * needs to recover from the checkpoint,
     * the (failed) in-flight requests are
     * gone and cannot be retried. Hence, there cannot be any outstanding
     * outstanding in-flight requests when a commit is initialized.
     * <p>
     * To<p>To this end, all in-flight requests need to be passed to the {@code
     * AsyncSinkCommiter} in order to be completed asbefore partproceeding ofwith the pre commit.
     */
    @Override
    public List<AtomicLong>List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
        if (flush) {
            flush();
        }

        // wait until all in-flight requests completed
        while (inFlightRequestsCount > 0) {
            mailboxExecutor.yield();
        }

        return Collections.emptyList();
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...