Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps:

...

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

Apache

...

JIRA: tbd

Released: tbd

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * TheDuring methodcheckpointing, returnsthe asink futureneeds thatto indicates,ensure oncethat completed,there thatare allno
     * requestoutstanding entries that have been passed to the method on invocation have
     * either been successfully persisted in the destination or have beenin-flight requests.
     *
     * @param requestEntries a set of request entries that should be sent to the
     * re-queued.
     * <p>
     * During checkpointing, the sink needs to ensure that there are nodestination
     * outstanding in-flight requests. Ie,@param requestResult  a ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall
     * method      are completed.
     *
     * @param requestEntries a set ofrequest requestsentries that shouldhave bebeen sentpassed to the APImethod
     *                       endpoint
on invocation have either  * @return a future that completes when all request entriesbeen successfully
     *                       persisted in the destination or have been
     * successfully persisted to the API or were                       re-queued
     */
    protected abstract CompletableFuture<?>void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);

    ...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot.

Code Block
languagejava
titleAsyncSinkWriter Internals
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * The ElementConverter provides a mapping between for the elements of a Buffer to hold request entries that should be persisted into the
     * destination.
     * stream to request entries that can be sent<p>
     * A request entry contain all relevant details to make a call to the destination.
     * <p>
destination. Eg, for Kinesis Data *Streams The resultinga request entry contains the
 is  buffered by the* AsyncSinkWriterpayload and sentpartition key.
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.<p>
     * It seems more natural to buffer InputT, ie, the events that should be
     */
 persisted, rather than RequestEntryT. privateHowever, finalin ElementConverter<InputTpractice, RequestEntryT>the elementConverter;response


     /**
 of a failed request *call can Buffermake toit holdvery requesthard, entriesif thatnot shouldimpossible, beto
 persisted into the
  * reconstruct the *original destinationevent.
 It is much easier, to just *construct <p>a
     * new A(retry) request entry containfrom allthe relevantresponse detailsand toadd makethat a callback to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should bequeue for later retry.
     */
    private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that completed (successfully or unsuccessfully) are
     * persisted,automatically ratherdecrementing thanthe RequestEntryTcounter. Any However,request inentry practice,that thewas responsenot
     * of a failed request call can make it very hard, if not impossible, tosuccessfully persisted needs to be handled and retried by the logic in
     * reconstruct the original event. It is much easier, to just construct a{@code submitRequestsToApi}.
     * <p>
     * newThere (retry)is requesta entrylimit fromon the response and addnumber of concurrent (async) requests that back tocan thebe
     * handled queueby forthe laterclient retry.
     */library. This limit is enforced by checking the
    private final* BlockingDeque<RequestEntryT>size bufferedRequestEntriesof =this new LinkedBlockingDeque<>(...);


    /**queue before issuing new requests.
     * Tracks<p>
 all pending async calls that* have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully)To complete a checkpoint, we need to make sure that no requests are in
     * areflight, automaticallyas removedthey frommay thefail, queue.which Anycould requestthen entrylead thatto wasdata notloss.
     */
 successfully   persistedprivate need to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking theint inFlightRequestsCount;


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {
        // blocks if too many elements have been buffered
        while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
            mailboxExecutor.yield();
     * size of this}

 queue before issuing new requests.
     * <p>bufferedRequestEntries.add(elementConverter.apply(element, context));

     * To complete a// checkpoint,blocks we need to make sure that noif too many async requests are in flight
     * flight, as they may fail, which could then lead to data loss. flush();
    }


    /**
     */
 Persists buffered RequestsEntries privateinto BlockingDeque<CompletableFuture<?>>the inFlightRequestsdestination =by new LinkedBlockingDeque<>(...);


    @Overrideinvoking {@code
     * submitRequestEntries} with batches according to the user specified
    public void* write(InputT element, Context context) throws IOException, InterruptedException {buffering hints.
     *
     * The method blocks 
if too many async requests are in  bufferedRequestEntries.putLast(elementConverter.apply(element, context));

		// for now, flush will check if enough events have been buffered
		flush();
    }

flight.
     */
    private void flush() throws InterruptedException {
    /**
     * The entire request may fail or single request entries that are part of
     * the request may not be persisted successfully, eg, because of networkwhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

            // create a batch of request entries that should be persisted in the destination
     * issues or service side throttling. All requestArrayList<RequestEntryT> entriesbatch that= failed with
new ArrayList<>(MAX_BATCH_SIZE);

      * transient failures need to be re-queued with this method so that aren'twhile (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
     * lost and can be retried later.
     *try <p>{
     * Request entries that are causing the same error in a reproducible manner,
   batch.add(bufferedRequestEntries.remove());
  * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handled in the logic of {@code submitRequestEntries}. Otherwise
} catch (NoSuchElementException e) {
       * these request entries will be retried indefinitely, always causing the
   // if *there sameare error.
not enough elements, just create */
a smaller batch
  protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {
        bufferedRequestEntries.putFirst(requestEntry);
    }

break;
    /**
     * Persists buffered RequestsEntries into the destination by}
 invoking {@code
     * submitRequestEntries} with batches according to}

 the user specified
     * buffering hints.
  ResultFuture<RequestEntryT> requestResult =
 */
     public void flush() throws InterruptedException {
        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { failedRequestEntries -> mailboxExecutor.execute(
            // create a batch of request entries that should be persisted in the destination
   () -> completeRequest(failedRequestEntries),
       ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);

            for (int i = 0; i < MAX_BATCH_SIZE; i++) {
         "Mark in-flight request as completed and requeue %d request entries",
       try {
                    batch.add(bufferedRequestEntries.removefailedRequestEntries.size());

            while (inFlightRequestsCount >=  } catch (NoSuchElementException e) {
MAX_IN_FLIGHT_REQUESTS) {
                mailboxExecutor.yield();
           // if there are not enough elements, just create a smaller batch }

            inFlightRequestsCount++;
            submitRequestEntries(batch, requestResult);
        break;}
    }


    /**
     * Marks an }
in-flight request as completed and prepends failed requestEntries back to the
  }

   * internal requestEntry buffer for later retry.
   // call the*
 destination specific code that actually* persists@param thefailedRequestEntries requestrequestEntries entries
that need to be retried
     */
   CompletableFuture<?> futureprivate =void submitRequestEntriescompleteRequest(batch);

Collection<RequestEntryT> failedRequestEntries) {
        inFlightRequestsCount--;

    // keep track of in// flightBy request
just iterating through failedRequestEntries, it reverses the order     inFlightRequests.put(future);

of the
        // failedRequestEntries.   // remove the request from the tracking queue once it competedIt doesn't make a difference for kinesis:putRecords, as the api
        // does not  future.whenComplete((response, err) -> {
      make any order guarantees, but may cause avoidable reorderings for other
        //  inFlightRequests.remove(future);destinations.
            });
        }failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In- flight requests may fail, but they will be retried if the sink is
     * still healthy.
 But if   * <p>in-flight requests
     * Tofail notafter losea anycheckpoint requests,has therebeen cannottriggered beand anyFlink outstanding in-flight
     * requests when a commit is initialized. To this end, allneeds to recover from the checkpoint,
     * the (failed) in-flight
 requests are gone and *cannot requestsbe needretried. toHence, bethere completedcannot asbe partany
 of the pre commit.
 * outstanding in-flight requests */
when a commit is @Overrideinitialized.
    public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

        // reuse current inFlightRequests as commitable and create an empty queue *
     * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
     */
    @Override
    //public toList<Void> avoid copy and clearing
  prepareCommit(boolean flush) throws IOException, InterruptedException {
      List<Collection<CompletableFuture<?>>> committable =if Collections.singletonList(inFlightRequestsflush); {

         // all in-flight requests are handled by the AsyncSinkCommiter and new  flush();
        }

        // elementswait cannotuntil beall addedin-flight torequests thecompleted
 queue during a commit, so it's save towhile 
(inFlightRequestsCount > 0) {
     // create a new queue
   mailboxExecutor.yield();
     inFlightRequests = new ConcurrentLinkedQueue<>();}

        return committableCollections.emptyList();
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = futurefuture.whenComplete((response, err) -> {
            .whenComplete((response, err) ->     if (response.failedRecordCount() > 0) {
                if     ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount() > 0) {);
                    List<PutRecordsResultEntry> records = response.records();
    
                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            requeueFailedRequestfailedRequestEntries.add(requestEntries.get(i));
);
                        }
                    }
    
                    requestResult.complete(failedRequestEntries);
                } else {
                    }requestResult.complete(Collections.emptyList());
                }

                //TODO: handle errors of the entire request...
            });

        // return future to track completion of async request
        return handleResponse;
    }

    ...
}


Rejected Alternatives

The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But the benefits are too vague at this point and it would require substantial changes to the sink API, we. We'll hence start with a blocking implementation of the write function and see how far we get.

...