Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps:

...

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

Apache

...

JIRA: tbd

Released: tbd

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * TheDuring methodcheckpointing, returnsthe asink futureneeds thatto indicates,ensure oncethat completed,there thatare allno
     * requestoutstanding entries that have been passed to the method on invocation have
     * either been successfully persisted in the destination or have beenin-flight requests.
     *
     * @param requestEntries a set of request entries that should be sent to the
     * re-queued.
     * <p>
     * During checkpointing, the sink needs to ensure that there are nodestination
     * outstanding in-flight requests. Ie,@param requestResult  a ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall
     * method      are completed.
     *
     * @param requestEntries a set ofrequest requestsentries that shouldhave bebeen sentpassed to the APImethod
     *                       endpoint
on invocation have either  * @return a future that completes when all request entriesbeen successfully
     *                       persisted in the destination or have been
     * successfully persisted to the API or were                       re-queued
     */
    protected abstract CompletableFuture<?>void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);

    ...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot..

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * stream to request entries that can be sent A request entry contain all relevant details to make a call to the destination.
     * <p>
destination. Eg, for Kinesis Data *Streams The resultinga request entry iscontains buffered by the AsyncSinkWriter and sent
     * payload toand thepartition destinationkey.
 when the {@code submitRequestEntries} method* is<p>
     * invoked.
It seems more natural to */
buffer InputT, ie, the privateevents finalthat ElementConverter<InputT,should RequestEntryT> elementConverter;


be
     /**
 persisted, rather than RequestEntryT. *However, Bufferin topractice, holdthe requestresponse
 entries that should be persisted* intoof the
a failed request call can *make destination.
it very hard, if not *impossible, <p>to
     * Areconstruct the requestoriginal entryevent. containIt allis relevantmuch detailseasier, to makejust construct a call to the
     * destination. Eg, for Kinesis Data Streams a new (retry) request entry containsfrom the
 response and add that back *to payloadthe
 and partition key.
  * queue for *later <p>retry.
     */
 It  seems moreprivate naturalfinal toDeque<RequestEntryT> bufferbufferedRequestEntries InputT,= ie, the events that should benew ArrayDeque<>();


    /**
     * persisted,Tracks ratherall thanpending RequestEntryT.async However,calls inthat practice,have thebeen response
executed since the   * of a failed request call can make it very hard, if not impossible, tolast
     * checkpoint. Calls that completed (successfully or unsuccessfully) are
     * reconstructautomatically decrementing the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to thecounter. Any request entry that was not
     * successfully persisted needs to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * queue for later retry.<p>
     */
 There is a privatelimit finalon BlockingDeque<RequestEntryT>the bufferedRequestEntriesnumber =of newconcurrent LinkedBlockingDeque<>(...);


    /**async) requests that can be
     * Trackshandled by allthe pendingclient asynclibrary. callsThis thatlimit haveis beenenforced executedby sincechecking the last
     * checkpoint.size Callsof thatthis alreadyqueue completedbefore (successfullyissuing ornew unsuccessfully)requests.
     * are<p>
  automatically removed from the* queue.To Anycomplete requesta entrycheckpoint, thatwe wasneed not
to make sure that no *requests successfullyare persistedin
 need to be handled and* retriedflight, byas thethey logicmay in
fail, which could then lead *to {@codedata submitRequestsToApi}loss.
     */
 <p>
   private int *inFlightRequestsCount;


 There is a limit@Override
 on the number ofpublic concurrentvoid write(async)InputT requestselement, thatContext cancontext) be
throws IOException, InterruptedException {
  * handled by the client library. This limit is// enforcedblocks byif checkingtoo the
many elements have been buffered
 * size of this queue before issuing newwhile requests.
     * <p>
(bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
        * To complete a checkpoint, we need to make sure that no requests are in
 mailboxExecutor.yield();
        }

       * flight, as they may fail, which could then lead to data loss.
     */ bufferedRequestEntries.add(elementConverter.apply(element, context));

        // blocks if too many async requests are in flight
    private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...flush();
    }


    /**
     @Override
* Persists buffered RequestsEntries publicinto voidthe write(InputT element, Context context) throws IOException, InterruptedException {        destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
     * buffering  bufferedRequestEntries.putLast(elementConverter.apply(element, context));

		// for now, flush will check if enough events have been buffered
		flush();
    }


    /**hints.
     *
     * The method blocks if too many async requests are in flight.
     */
  The entire requestprivate mayvoid failflush() orthrows singleInterruptedException request{
 entries that are part of
   while  * the request may not be persisted successfully, eg, because of network
     * issues or service side throttling. All request entries that failed with(bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

            // create a batch of request entries that should be persisted in the destination
     * transient failures need to be re-queued withArrayList<RequestEntryT> thisbatch method= so that aren'tnew ArrayList<>(MAX_BATCH_SIZE);

     *   lost and can be retriedwhile later.
     * <p>(batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
     * Request entries that are causing the same error in a reproducibletry manner,{
      * eg, ill-formed request entries, must not be re-queued but the error needs
  batch.add(bufferedRequestEntries.remove());
   * to be handled in the logic of {@code submitRequestEntries}. Otherwise
   } catch *(NoSuchElementException thesee) request{
 entries will be retried indefinitely, always causing the
     * same error.
     *//
 if there are not protectedenough void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {elements, just create a smaller batch
        bufferedRequestEntries.putFirst(requestEntry);
    }


    /**
    break;
 * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries}
 with batches according to the user specified
     * buffering hints.}

     */
    public void flush() throwsResultFuture<RequestEntryT> InterruptedExceptionrequestResult {=
        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
             failedRequestEntries -> mailboxExecutor.execute(
 // create a batch of request entries that should be persisted in the destination
            ArrayList<RequestEntryT> batch =() new-> ArrayList<>(MAX_BATCH_SIZE);

completeRequest(failedRequestEntries),
                 for (int i = 0; i < MAX_BATCH_SIZE; i++) {
              "Mark in-flight request as completed and requeue %d request entries",
       try {
                    batchfailedRequestEntries.addsize(bufferedRequestEntries.remove());

            while (inFlightRequestsCount   } catch (NoSuchElementException e>= MAX_IN_FLIGHT_REQUESTS) {
                    // if there are not enough elements, just create a smaller batch
   mailboxExecutor.yield();
            }

            inFlightRequestsCount++;
     break;
       submitRequestEntries(batch, requestResult);
        }
    }


    /**
    }

 * Marks an in-flight request as completed and prepends failed requestEntries //back callto the
 destination specific code that actually* persists the request entries
    internal requestEntry buffer for later retry.
     *
   CompletableFuture<?> future = submitRequestEntries(batch);

       * @param failedRequestEntries requestEntries that need to be retried
     *//
 keep track of inprivate flight request
  void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
          inFlightRequests.put(future)inFlightRequestsCount--;

        // By just iterating through failedRequestEntries, //it removereverses the requestorder fromof the
 tracking queue once it competed
   // failedRequestEntries. It doesn't make a difference   future.whenComplete((response, err) -> {for kinesis:putRecords, as the api
        // does not make any order guarantees,  inFlightRequests.remove(future);
   but may cause avoidable reorderings for other
         });// destinations.
        }failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In- flight requests may fail, but they will be retried if the sink is
     * still healthy.
 But if   * <p>in-flight requests
     * Tofail notafter losea anycheckpoint requests,has therebeen cannottriggered beand anyFlink outstanding in-flight
     * requests when a commit is initialized. To this end, allneeds to recover from the checkpoint,
     * the (failed) in-flight
 requests are gone and *cannot requestsbe needretried. toHence, bethere completedcannot asbe partany
 of the pre commit.
 * outstanding in-flight requests */
when a commit is @Overrideinitialized.
    public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

        // reuse current inFlightRequests as commitable and create an empty queue *
     * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
     */
    @Override
    //public toList<Void> avoid copy and clearing
  prepareCommit(boolean flush) throws IOException, InterruptedException {
      List<Collection<CompletableFuture<?>>> committable =if Collections.singletonList(inFlightRequestsflush); {

         // all in-flight requests are handled by the AsyncSinkCommiter and new  flush();
        }

        // elements cannot be added to the queue during a commit, so it's save to wait until all in-flight requests completed
        while (inFlightRequestsCount > 0) {
        // create a new queue
   mailboxExecutor.yield();
      inFlightRequests = new ConcurrentLinkedQueue<>(); }

        return committableCollections.emptyList();
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

...

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = future
future.whenComplete((response, err) -> {
                if (response.whenCompletefailedRecordCount((response, err) -> 0) {
                    ArrayList<PutRecordsRequestEntry> failedRequestEntries = ifnew ArrayList<>(response.failedRecordCount() > 0) {));
                    List<PutRecordsResultEntry> records = response.records();
    
                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            requeueFailedRequestfailedRequestEntries.add(requestEntries.get(i));
                        }
                    }
    
                 }

     requestResult.complete(failedRequestEntries);
                } else {
              //TODO: handle errors of the entire requestrequestResult...
complete(Collections.emptyList());
                });

         // return future to track completion of async request       //TODO: handle errors of the entire request...
        return handleResponse    });
    }

    ...
}


Rejected Alternatives

The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But the benefits are too vague at this point and it would require substantial changes to the sink API. We'll hence start with a blocking implementation of the write function and see how far we get.

Code Block
languagejava
titleisAvailable pattern
    /**
     * Signals if enough RequestEntryTs have been buffered according to the user
     * specified buffering hints to make a request against the destination. This
     * functionality will be added to the sink interface by means of an
     * additional FLIP.
     *
     * @return a future that will be completed once a request against the
     * destination can be made
     */
    public CompletableFuture<Void> isAvailable() {
        ...
    }

...