Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

...

...

JIRA: tbd

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-24041

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

The async sink is based on FLIP-143 . and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * During checkpointing, the sink needs to ensure that there are no
     * outstanding in-flight requests.
     *
     * @param requestEntries a set of request entries that should be sent to the
     *                       destination
     * @param requestResult  a ResultFuture that needs to be completed once all
     *                       request entries that have been passed to the method
     *                       on invocation have either been successfully
     *                       persisted in the destination or have been
     *                       re-queued
     * @return a future that completes when all request entries have been
     * successfully persisted to the API or were re-queued
     */
    protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<?>ResultFuture<RequestEntryT> requestResult);

    ...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot.

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final BlockingDeque<RequestEntryT>Deque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>ArrayDeque<>(...);


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully) are
     * are automatically removeddecrementing from the queuecounter. Any request entry that was not
     * successfully persisted needneeds to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...)int inFlightRequestsCount;


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {
        // blocks if too many eventselements have been buffered
        while (bufferedRequestEntries.putLast(elementConverter.apply(element, context));

   size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
     // blocks if too many async requests are in flight
 mailboxExecutor.yield();
        }

  if  (/*buffering hints are met*/) {
			flush();
		} bufferedRequestEntries.add(elementConverter.apply(element, context));

    }


    /**
/ blocks if too many *async Therequests entireare requestin mayflight
 fail or single request entries that are part of flush();
    }


     /**
 the request may not be* persistedPersists successfully,buffered eg,RequestsEntries becauseinto ofthe network
destination by invoking {@code
  * issues or service* sidesubmitRequestEntries} throttling.with Allbatches requestaccording entriesto thatthe faileduser withspecified
     * transientbuffering failureshints.
 need to be re-queued with*
 this method so that aren't
* The method blocks if *too lostmany andasync canrequests beare retriedin laterflight.
     * <p>/
    private *void Requestflush() entriesthrows thatInterruptedException are{
 causing the same error in a reproducible manner,
     * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handledwhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

            // create a batch of request entries that should be persisted in the logicdestination
  of {@code submitRequestEntries}. Otherwise
     * these requestArrayList<RequestEntryT> entriesbatch will= be retried indefinitely, always causing the
new ArrayList<>(MAX_BATCH_SIZE);

         * same error.
 while (batch.size() <=  */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {
MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
         bufferedRequestEntries.putFirst(requestEntry);
    }


   try /**{
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
 batch.add(bufferedRequestEntries.remove());
             * buffering hints.
 } catch (NoSuchElementException e) *{
     *  The method blocks if too many async requests are in flight.
   // if */
there are not enough privateelements, voidjust flush()create throwsa InterruptedExceptionsmaller {batch
         while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
           break;
              //  create}
 a batch of request entries that should be persisted in the destination}

            ArrayList<RequestEntryT>ResultFuture<RequestEntryT> batchrequestResult =
 new ArrayList<>(MAX_BATCH_SIZE);

            for (int i = 0; i < MAX_BATCH_SIZE; i++) {failedRequestEntries -> mailboxExecutor.execute(
                try {
           () -> completeRequest(failedRequestEntries),
       batch.add(bufferedRequestEntries.remove());
                } catch (NoSuchElementException e) {
 "Mark in-flight request as completed and requeue %d request entries",
          // if there are not enough elements, just create a smaller batch
       failedRequestEntries.size());

            while (inFlightRequestsCount >=  break;MAX_IN_FLIGHT_REQUESTS) {
                }mailboxExecutor.yield();
            }

            inFlightRequestsCount++;
  // call the destination specific code that actually persists the request entriessubmitRequestEntries(batch, requestResult);
        }
    CompletableFuture<?> future = submitRequestEntries(batch);

}


    /**
     * Marks an in-flight request as completed //and keepprepends trackfailed ofrequestEntries inback flightto requestthe
     * internal requestEntry buffer for later  inFlightRequests.put(future);

retry.
     *
     * @param failedRequestEntries //requestEntries removethat theneed requestto frombe theretried
 tracking queue once it competed*/
    private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
     future.whenComplete((response, err) -> {   inFlightRequestsCount--;

        // By just iterating through failedRequestEntries, it reverses the order  inFlightRequests.remove(future);of the
        // failedRequestEntries. It  });
        }doesn't make a difference for kinesis:putRecords, as the api
    }


    /**
/ does not make any * In-flight requests may fail, but they will be retried if the sink isorder guarantees, but may cause avoidable reorderings for other
        // destinations.
     *  still healthy.
 failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /* <p>*
     * ToIn notflight lose any requests, there cannot be any outstandingrequests will be retried if the sink is still healthy. But if in-flight requests
     * requestsfail whenafter a commit is initialized. To this end, all in-flightcheckpoint has been triggered and Flink needs to recover from the checkpoint,
     * requeststhe need to be completed as part of the pre commit.
     */(failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
    @Override
 * outstanding in-flight publicrequests List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

when a commit is initialized.
     *
   // reuse current* inFlightRequests<p>To asthis commitableend, andall createin-flight anrequests emptyneed queueto 
completed before proceeding with the commit.
     // to avoid copy and clearing
  */
    @Override
    public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
      List<Collection<CompletableFuture<?>>> committable =if Collections.singletonList(inFlightRequestsflush); {

         // all in-flight requests are handled by the AsyncSinkCommiter and new  flush();
        }

        // elementswait cannotuntil beall addedin-flight torequests thecompleted
 queue during a commit, so it's save towhile 
(inFlightRequestsCount > 0) {
     // create a new queue
   mailboxExecutor.yield();
     inFlightRequests = new LinkedBlockingDeque<>();}

        return committableCollections.emptyList();
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch requestrequest
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        PutRecordsRequest batchRequest = PutRecordsRequest
  // re-queue elements of failed requests
        future.whenComplete((response, err) -> {
   .builder()
             if   (response.recordsfailedRecordCount(requestEntries)
 > 0) {
             .streamName(streamName)
       ArrayList<PutRecordsRequestEntry> failedRequestEntries =       .buildnew ArrayList<>(response.failedRecordCount());

        // call api with batch request
        CompletableFuture<PutRecordsResponse>List<PutRecordsResultEntry> futurerecords = clientresponse.putRecordsrecords(batchRequest);
    
        // re-queue elements of failed requests
       for CompletableFuture<PutRecordsResponse>(int handleResponsei = future
0; i < records.size(); i++) {
       .whenComplete((response, err) -> {
                if (response.failedRecordCountrecords.get(i).errorCode() >!= 0null) {
                        List<PutRecordsResultEntry>  records = response.records(failedRequestEntries.add(requestEntries.get(i));

                    for (int i = 0; i < records.size(); i++) {
           }
              if (records.get(i).errorCode() != null) {
    }
    
                    requeueFailedRequest(requestEntriesrequestResult.getcomplete(ifailedRequestEntries));
                }        }else {
                    }requestResult.complete(Collections.emptyList());
                }

                //TODO: handle errors of the entire request...
            });

        // return future to track completion of async request
        return handleResponse;
    }

    ...
}


Rejected Alternatives

...