Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

...

...

JIRA: tbd

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-24041

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

...

There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * TheDuring methodcheckpointing, returnsthe asink futureneeds thatto indicates,ensure oncethat completed,there thatare allno
     * requestoutstanding entries that have been passed to the method on invocation have
     * either been successfully persisted in the destination or have beenin-flight requests.
     *
     * @param requestEntries a set of request entries that should be sent to the
     * re-queued.
     * <p>
     * During checkpointing, the sink needs to ensure that there are nodestination
     * outstanding in-flight requests. Ie,@param requestResult  a ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall
     * method are completed.
     *
     * @param requestEntries a set of requests that should be sent request entries that have been passed to the APImethod
     *                       on invocation have either been endpointsuccessfully
     *  @return     a future that completes when all request entries have been
     * successfully persisted toin the APIdestination or werehave re-queuedbeen
     */
        protected abstract CompletableFuture<?> submitRequestEntries(List<RequestEntryT> requestEntries);

    ...
       re-queued
     */
    protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);

    ...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot.

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final BlockingDeque<RequestEntryT>Deque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>ArrayDeque<>(...);


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully) are
     * are automatically removed fromdecrementing the queuecounter. Any request entry that was not
     * successfully persisted needneeds to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...)int inFlightRequestsCount;


    @Override
    public void write(InputT element, Context context) throws IOException, InterruptedException {
        // blocks if too many eventselements have been buffered
        while (bufferedRequestEntries.putLast(elementConverter.apply(element, context));
size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
        // blocks if too many async requests are in flight
     mailboxExecutor.yield();
        }

    if (/*buffering hints are met*/) {
			flush( bufferedRequestEntries.add(elementConverter.apply(element, context));
		}
    }


    /**
/ blocks if too many async * The entire request may fail or single request entries that are part ofrequests are in flight
        flush();
    }


     /**
 the request may not be* persistedPersists successfully,buffered eg,RequestsEntries becauseinto ofthe network
destination by invoking {@code
  * issues or service* sidesubmitRequestEntries} throttling.with Allbatches requestaccording entriesto thatthe faileduser withspecified
     * transientbuffering failureshints.
 need to be re-queued with*
 this method so that aren't
* The method blocks if *too lostmany andasync canrequests beare retriedin laterflight.
     * <p>/
    private *void Requestflush() entriesthrows thatInterruptedException are{
 causing the same error in a reproducible manner,
     * eg, ill-formed request entries, must not be re-queued but the error needs
     * to be handledwhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

            // create a batch of request entries that should be persisted in the logicdestination
 of {@code submitRequestEntries}. Otherwise
     * these request entriesArrayList<RequestEntryT> willbatch be= retried indefinitely, always causing thenew ArrayList<>(MAX_BATCH_SIZE);

     * same error.
     */
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
        bufferedRequestEntries.putFirst(requestEntry);
     }


   try /**{
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * submitRequestEntries} with batches according to the user specified
 batch.add(bufferedRequestEntries.remove());
            * buffering hints.
  } catch  *(NoSuchElementException e) {
     * The method blocks if too many async requests are in flight.
     *//
 if there  private void flush() throws InterruptedException {
are not enough elements, just create a smaller batch
         while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
       break;
     // create a batch of request entries that should be persisted in}
 the destination
          }

  ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);

      ResultFuture<RequestEntryT> requestResult =
    for (int i = 0; i < MAX_BATCH_SIZE; i++) {
       failedRequestEntries -> mailboxExecutor.execute(
       try {
                    batch.add(bufferedRequestEntries.remove());() -> completeRequest(failedRequestEntries),
                } catch (NoSuchElementException e) {
        "Mark in-flight request as completed and requeue %d request entries",
   // if there are not enough elements, just create a smaller batch
                    break;failedRequestEntries.size());

            while (inFlightRequestsCount   }>= MAX_IN_FLIGHT_REQUESTS) {
            }

    mailboxExecutor.yield();
        // call the destination specific}

 code that actually persists the request entries
     inFlightRequestsCount++;
       CompletableFuture<?>   future = submitRequestEntries(batch, requestResult);

        }
    // keep track of in flight request}


    /**
     * Marks an     inFlightRequests.put(future);

      in-flight request as completed and prepends failed requestEntries back to the
     * //internal removerequestEntry thebuffer requestfor fromlater theretry.
 tracking queue once it competed*
     * @param failedRequestEntries requestEntries that need  future.whenComplete((response, err) -> {to be retried
     */
    private void  completeRequest(Collection<RequestEntryT>    inFlightRequests.remove(future);failedRequestEntries) {
        inFlightRequestsCount--;

    });
    // By just iterating }
through failedRequestEntries, it reverses }


the order of  /**the
     * In-flight requests may fail, but they will be retried if the sink is
// failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
      * still healthy.
// does not make any *order <p>
guarantees, but may cause avoidable *reorderings Tofor notother
 lose any requests, there cannot be any outstanding in-flight// destinations.
     * requests when a commit is initialized. To this end, all in-flight failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In flight requests needwill to be completedretried asif partthe ofsink theis prestill commithealthy.
 But if   */in-flight requests
    @Override
 * fail after publica List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

        // reuse current inFlightRequests as commitable and create an empty queue checkpoint has been triggered and Flink needs to recover from the checkpoint,
     * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
     * outstanding in-flight //requests towhen avoida copycommit andis clearinginitialized.
     *
    List<Collection<CompletableFuture<?>>> committable* = Collections.singletonList(inFlightRequests);

        //<p>To this end, all in-flight requests areneed handledto bycompleted thebefore AsyncSinkCommiterproceeding andwith newthe commit.
     */
   // elements@Override
 cannot be added topublic theList<Void> queue during a commit, so it's save to 
prepareCommit(boolean flush) throws IOException, InterruptedException {
        if (flush) {
 // create a new queue
        inFlightRequests = new LinkedBlockingDeque<>flush();

        return committable;}

    }


    /**
/ wait until  all * All in-flight requests have been completed,
 but there may still be
   while (inFlightRequestsCount *> request0) entries{
 in the internal buffer that are yet to be sent to the mailboxExecutor.yield();
     * endpoint. These request}

 entries are stored in the snapshot state so thatreturn Collections.emptyList();
    }


    /**
   they don't get* lostAll in case of a failure/restart of the application.-flight requests have been completed, but there may still be
     */
 request entries in @Override
the internal buffer that publicare yet List<Collection<RequestEntryT>>to snapshotState()be throwssent IOExceptionto {the
     * endpoint.  return Collections.singletonList(bufferedRequestEntries);
    }

Limitations

These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

Limitations

  • The sink is designed for destinations that provide an async client. Destinations that cannot ingest events in an async fashion cannot be supported by the sink.
  • The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

  • We are not focusing on support for exactly-once semantics beyond simple upsert capable and idempotent destinations
  • The sink is designed for destinations that provide an async client. Destinations that cannot ingest events in an async fashion cannot be supported by the sink.
  • The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

  • We are not focusing on support for exactly-once semantics beyond simple upsert capable and idempotent destinations at this point.

Appendix A – Simplified example implementation of a sink a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
                .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))
                .partitionKey(String.valueOf(element.hashCode()))
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

...

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams

...

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleElementConverter for Kinesis Data StreamsAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
    protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry>        .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))requestResult) {

         // create a batch  request
  .partitionKey(String.valueOf(element.hashCode()))
      PutRecordsRequest batchRequest = PutRecordsRequest
       .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {     .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // createcall api awith batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

    PutRecordsRequest batchRequest = PutRecordsRequest
 // re-queue elements of failed requests
          future.builder()whenComplete((response, err) -> {
                if (response.recordsfailedRecordCount(requestEntries)
 > 0) {
             .streamName(streamName)
       ArrayList<PutRecordsRequestEntry> failedRequestEntries =       .build();
new ArrayList<>(response.failedRecordCount());
        // call api with batch request
        CompletableFuture<PutRecordsResponse>List<PutRecordsResultEntry> futurerecords = clientresponse.putRecordsrecords(batchRequest);
    
        // re-queue elements of failed requests
       for CompletableFuture<PutRecordsResponse>(int handleResponsei = future
0; i < records.size(); i++) {
       .whenComplete((response, err) -> {
                if (response.failedRecordCountrecords.get(i).errorCode() >!= 0null) {
                         List<PutRecordsResultEntry> records = response.records(failedRequestEntries.add(requestEntries.get(i));

                    for (int i = 0; i < records.size(); i++) { }
                        if (records.get(i).errorCode() != null) {
    }
    
                    requeueFailedRequest(requestEntriesrequestResult.getcomplete(ifailedRequestEntries));
                }        }else {
                    }requestResult.complete(Collections.emptyList());
                }

                //TODO: handle errors of the entire request...
            });

        // return future to track completion of async request
        return handleResponse;
    }

    ...
}


Rejected Alternatives

...