Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps:

...

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki

...

JIRA: tbd

Released: tbd

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

...

Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily only just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.

...

The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * destination. It is implemented when support for a new destination is
     * added.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * The method returns a future During checkpointing, the sink needs to ensure that indicates, once completed, that allthere are no
     * outstanding in-flight requests.
     *
     * @param requestEntries a set of request entries that haveshould beenbe passedsent to the method on invocation have
     *  either been successfully persisted in the destination or have been
     * re-queued.
     * <p>destination
     * During checkpointing, the sink@param requestResult  a ResultFuture that needs to ensurebe thatcompleted thereonce are noall
     * outstanding in-flight requests. Ie, that all futures returned by this
     * method are completed.
     *
     * @param requestEntries a set of requestsrequest entries that shouldhave bebeen sentpassed to the APImethod
     *                       endpointon invocation have either been successfully
     * @return a future that completes when all request entries                       persisted in the destination or have been
     * successfully persisted to the API or were                       re-queued
     */
    protected abstract CompletableFuture<?>void submitRequestEntries(List<RequestEntryT> requestEntries), ResultFuture<RequestEntryT> requestResult);

    ...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot.

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * The ElementConverter provides a mapping between for the elements of a
     * stream to request entries that can be sent to the destination.
     * <p>
     * The resulting request entry is buffered by the AsyncSinkWriter and sent
     * to the destination when the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;


    /**
     * Buffer to hold request entries that should be persisted into the
     * destination.
     * <p>
     * A request entry contain all relevant details to make a call to the
     * destination. Eg, for Kinesis Data Streams a request entry contains the
     * payload and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
     * persisted, rather than RequestEntryT. However, in practice, the response
     * of a failed request call can make it very hard, if not impossible, to
     * reconstruct the original event. It is much easier, to just construct a
     * new (retry) request entry from the response and add that back to the
     * queue for later retry.
     */
    private final Deque<RequestEntryT> bufferedRequestsbufferedRequestEntries = new ConcurrentLinkedDeque<>ArrayDeque<>();


    /**
     * Tracks all pending async calls that have been executed since the last
     * checkpoint. Calls that already completed (successfully or unsuccessfully) are
     * are automatically removed fromdecrementing the queuecounter. Any request entry that was not
     * successfully persisted needneeds to be handled and retried by the logic in
     * {@code submitRequestsToApi}.
     * <p>
     * There is a limit on the number of concurrent (async) requests that can be
     * handled by the client library. This limit is enforced by checking the
     * size of this queue before issuing new requests.
     * <p>
     * To complete a checkpoint, we need to make sure that no requests are in
     * flight, as they may fail, which could then lead to data loss.
     */
    private Queue<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>()int inFlightRequestsCount;


    /**@Override
    public *void Signals if enough RequestEntryTs have been buffered according to the user
write(InputT element, Context context) throws IOException, InterruptedException {
        *// specifiedblocks bufferingif hintstoo tomany makeelements ahave requestbeen againstbuffered
 the destination. This
     * functionality will be added to the sink interface by means of an
     * additional FLIP.
while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
            mailboxExecutor.yield();
        *}

     * @return a future that will be completed once there is are record available      bufferedRequestEntries.add(elementConverter.apply(element, context));

        // blocks if too many async requests are in flight
     * to make aflush();
 request against the destination}


     /**/
    public CompletableFuture<Void>* isAvailable() {
        ...Persists buffered RequestsEntries into the destination by invoking {@code
    }


 * submitRequestEntries} with @Override
batches according to the publicuser voidspecified
 write(InputT element, Context context) throws* IOExceptionbuffering {hints.
     *
    bufferedRequests.offerLast(elementConverter.apply(element, context));
    }


 * The method blocks if too many async requests are in flight.
     */**
    private void flush() throws *InterruptedException The{
 entire request may fail or single request entries that are part ofwhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

     * the request may not be persisted successfully, eg, because // create a batch of network
request entries that should be *persisted issuesin orthe servicedestination
 side throttling. All request entries that failed with
    ArrayList<RequestEntryT> *batch transient= failures need to be re-queued with this method so that aren't
     * lost and can be retried later.new ArrayList<>(MAX_BATCH_SIZE);

            while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
     * <p>
     * Request entries that are causing thetry same{
 error in a reproducible manner,
     * eg, ill-formed request entries, must not be re-queued but the error needs
 batch.add(bufferedRequestEntries.remove());
       * to be handled in the logic of {@code submitRequestEntries}. Otherwise
catch (NoSuchElementException e) {
  * these request entries will be retried indefinitely, always causing the
     * same error.
 // if there are */
not enough elements, just protectedcreate void requeueFailedRequestEntry(RequestEntryT requestEntry) {
a smaller batch
          bufferedRequests.offerFirst(requestEntry);
    }


    /**
  break;
   * In-flight requests may fail, but they will be retried if the sink is}
     * still healthy.
     * <p>}

     * To not lose any requests, there cannotResultFuture<RequestEntryT> berequestResult any=
 outstanding in-flight
         * requests when a commit is initialized. To this end, allfailedRequestEntries in-flight-> mailboxExecutor.execute(
     * requests need to be completed as part of the pre commit.
     */
    @Override
    public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

() -> completeRequest(failedRequestEntries),
           // reuse current inFlightRequests as commitable and create an empty queue 
      "Mark in-flight //request toas avoidcompleted copyand andrequeue clearing
%d request entries",
      List<Collection<CompletableFuture<?>>> committable = Collections.singletonList(inFlightRequests);

        // all in-flight requests are handled by the AsyncSinkCommiter and new  failedRequestEntries.size());

        //  elements cannot bewhile added(inFlightRequestsCount to the queue during a commit, so it's save to 
>= MAX_IN_FLIGHT_REQUESTS) {
                // create a new queue
mailboxExecutor.yield();
            inFlightRequests}

 = new ConcurrentLinkedQueue<>();

        return committableinFlightRequestsCount++;
    }


     /**   submitRequestEntries(batch, requestResult);
     * All in-flight requests}
 have been completed, but there may still be }


    /**
     * requestMarks entriesan in-flight therequest internalas buffercompleted thatand areprepends yetfailed torequestEntries beback sent to the
     * endpoint.internal TheserequestEntry requestbuffer entriesfor arelater storedretry.
 in the snapshot state so that*
     * they@param don'tfailedRequestEntries getrequestEntries lostthat inneed caseto of a failure/restart of the application.be retried
     */
    @Override
private void   public List<Collection<RequestEntryT>> snapshotState() throws IOException completeRequest(Collection<RequestEntryT> failedRequestEntries) {
        return Collections.singletonList(bufferedRequests)inFlightRequestsCount--;

    }

Limitations

...

The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

...

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
                .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))
                .partitionKey(String.valueOf(element.hashCode()))
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamSink
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?> submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {    // By just iterating through failedRequestEntries, it reverses the order of the
        // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
        // does not make any order guarantees, but may cause avoidable reorderings for other
        // destinations.
        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In flight requests will be retried if the sink is still healthy. But if in-flight requests
     * fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint,
     * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
     * outstanding in-flight requests when a commit is initialized.
     *
     * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
     */
    @Override
    public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
        if (flush) {
            flush();
        }

        // create a batch requestwait until all in-flight requests completed
        PutRecordsRequest batchRequest = PutRecordsRequestwhile (inFlightRequestsCount > 0) {
                .buildermailboxExecutor.yield();
        }

        return Collections.recordsemptyList(requestEntries);
    }


    /**
     * All  .streamName(streamName)
        in-flight requests have been completed, but there may still be
     * request  .build();

        // call api with batch requestentries in the internal buffer that are yet to be sent to the
     * endpoint. These CompletableFuture<PutRecordsResponse>request futureentries = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        CompletableFuture<PutRecordsResponse> handleResponse = futureare stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
       .whenComplete((response, err) -> {@Override
    public            if (response.failedRecordCountList<Collection<RequestEntryT>> snapshotState() >throws 0)IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

Limitations

  • The sink is designed for destinations that provide an async client. Destinations that cannot ingest events in an async fashion cannot be supported by the sink.
  • The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

  • We are not focusing on support for exactly-once semantics beyond simple upsert capable and idempotent destinations at this point.

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
                .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))
                .partitionKey(String.valueOf(element.hashCode()))
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

        // re-queue elements of failed requests
        future.whenComplete((response, err) -> {
                if (response.failedRecordCount() > 0) {
                    ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
                    List<PutRecordsResultEntry> records = response.records();
    
                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            failedRequestEntries.add(requestEntries.get(i));
                        }
                    }
    
                    requestResult.complete(failedRequestEntries);
                } else {
                    requestResult.complete(Collections.emptyList());
                }

            List<PutRecordsResultEntry> records = response.records();

                    for (int i = 0; i < records.size(); i++) {
           //TODO: handle errors of the entire request...
         if (records.get(i).errorCode() != null});
 {
                            requeueFailedRequest(requestEntries.get(i));}

    ...
}


Rejected Alternatives

The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But the benefits are too vague at this point and it would require substantial changes to the sink API. We'll hence start with a blocking implementation of the write function and see how far we get.

Code Block
languagejava
titleisAvailable pattern
    /**
     * Signals if enough RequestEntryTs have been buffered according to the     }user
     * specified buffering hints to make a request against the destination. This
    }
 * functionality will be added to the sink interface by means of    }an

                //TODO: handle errors of the entire request...* additional FLIP.
     *
     * @return a future that will be completed });

once a request against the
    // return* futuredestination tocan trackbe completionmade
 of async request
  */
    public CompletableFuture<Void> return handleResponse;isAvailable() {
    }

    ...
    }