Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

...

...

...

mod_mbox/flink-dev/202106.mbox/%3C860A1499-0166-4BCF-B24D-FBE9C823D46E%40amazon.de%3E
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-24041

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

...

Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily only just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.

...

There are two user-facing aspects of the generic sink. First, the interface an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink. 

The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.  

...

Code Block
languagejava
titleAsyncSinkWriter
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> {

    /**
     * This method specifies how to persist buffered request entries into the
     * sinkdestination. It is implemented when support for a new destination is
     * implementedadded.
     * <p>
     * The method is invoked with a set of request entries according to the
     * buffering hints (and the valid limits of the destination). The logic then
     * needs to create and execute the request against the destination (ideally
     * by batching together multiple request entries to increase efficiency).
     * The logic also needs to identify individual request entries that were not
     * persisted successfully and resubmit them using the {@code
     * requeueFailedRequestEntryrequestResult} method.
     * <p>
     * TheDuring methodcheckpointing, returnsthe asink futureneeds thatto indicates,ensure oncethat completed,there thatare allno
     * requestoutstanding entries that have been passed to the method on invocation have
     * either been successfully persisted in the destination or have beenin-flight requests.
     *
     * @param requestEntries a set of request entries that should be sent to the
     * re-queued.
     * <p>
     * During checkpointing, the sink needs to ensure that there are nodestination
     * @param outstandingrequestResult in-flight requests.a Ie,ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall
     * method are completed.
     *
     * @param requestEntries a set of requests      request entries that shouldhave bebeen sentpassed to the APImethod
     *                       on  endpointinvocation have either been successfully
     * @return      a future that completes when all request entries have been
     * successfully persisted toin the APIdestination or werehave re-queuedbeen
     */
      protected abstract 
    CompletableFuture<?> submitRequestEntries(List<RequestEntryT> requestEntries);


    /**

     * The ElementConverter provides a mapping between for the elements of a re-queued
     */
 stream to request entriesprotected thatabstract canvoid be sent to the destination.submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);

     * <p>
     * The resulting request * entry is buffered by the AsyncSinkWriter and sent
     * to the destination when * the {@code submitRequestEntries} method is
     * invoked.
     */
    private final ElementConverter<InputT, RequestEntryT> elementConverter;

    ...
}...
}

Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according Internally, the AsyncSinkWriter buffers RequestEntryTs and invokes the submitRequestEntries method with a set of RequestEntryTs according to user specified buffering hints. The AsyncSinkWriter also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryTs are persisted in the application state snapshot.

Code Block
languagejava
titleAsyncSinkWriter Internals
    /**
     * BufferThe toElementConverter holdprovides requesta entriesmapping thatbetween shouldfor bethe persistedelements intoof thea
     * stream to request entries that can be sent to the destination.
     * <p>
     * AThe resulting request entry containis allbuffered relevantby detailsthe toAsyncSinkWriter makeand asent
 call to the
  * to the * destination. Eg,when forthe Kinesis{@code DatasubmitRequestEntries} Streams a request entry contains themethod is
     * invoked.
     */
 payload and partition key.
private final ElementConverter<InputT,   * <p>RequestEntryT> elementConverter;


     /**
 It seems more natural to* bufferBuffer InputT,to ie,hold therequest eventsentries that should be persisted into the
     * persisted,destination.
 rather than RequestEntryT. However, in practice, the response* <p>
     * ofA a failed request callentry cancontain makeall itrelevant verydetails hard,to ifmake nota impossible,call to the
     * reconstructdestination. theEg, originalfor event.Kinesis ItData isStreams mucha easier,request toentry justcontains construct athe
     * newpayload (retry) request entry from the response and add that back to the
     * queue for later retry.
     */and partition key.
     * <p>
     * It seems more natural to buffer InputT, ie, the events that should be
    private final* 
persisted, rather than RequestEntryT. Deque<RequestEntryT>However, bufferedRequestsin =practice, new ConcurrentLinkedDeque<>();


the response
     /**
 of a failed request *call Trackscan allmake pendingit asyncvery callshard, thatif havenot beenimpossible, executedto
 since the last
  * reconstruct the *original checkpointevent. CallsIt is thatmuch alreadyeasier, completedto (successfullyjust orconstruct unsuccessfully)a
     * new are(retry) automaticallyrequest removedentry from the queue.response Anyand requestadd entrythat thatback wasto notthe
     * successfullyqueue persistedfor needlater toretry.
 be handled and retried by*/
 the logic in
 private final Deque<RequestEntryT> bufferedRequestEntries *= {@code submitRequestsToApi}.new ArrayDeque<>();


     /** <p>
     * ThereTracks isall apending limitasync oncalls thethat numberhave ofbeen concurrentexecuted (async)since requests that can bethe last
     * handled by the client librarycheckpoint. ThisCalls limitthat mustcompleted be(successfully checkedor beforeunsuccessfully) issuninare
     * <p>
automatically decrementing the counter. Any *request Toentry completethat awas checkpoint,not
 we need to make sure* thatsuccessfully nopersisted requestsneeds areto in
be handled and retried by *the flight,logic asin
 they may fail, which could* then lead to data loss{@code submitRequestsToApi}.
     */ <p>
    private 
    Queue<CompletableFuture<?>> inFlightRequests = new ConcurrentLinkedQueue<>();


    /** * There is a limit on the number of concurrent (async) requests that can be
     * Signals if enough RequestEntryTs have been buffered according to the userhandled by the client library. This limit is enforced by checking the
     * specifiedsize bufferingof hintsthis toqueue makebefore aissuing request against the destination. Thisnew requests.
     * functionality<p>
 will be added to the* sinkTo interfacecomplete bya meanscheckpoint, ofwe an
need to make sure that *no additionalrequests FLIP.
     *are in
     * @returnflight, aas futurethey thatmay willfail, bewhich completedcould oncethen therelead isto are record available   data loss.
     * to/
 make a request againstprivate the destinationint inFlightRequestsCount;


     */@Override
    public CompletableFuture<Void>void isAvailable() write(InputT element, Context context) throws IOException, InterruptedException {
        ...
    }


    @Override// blocks if too many elements have been buffered
    public void write(InputT element, Context contextwhile (bufferedRequestEntries.size() throws IOException>= MAX_BUFFERED_REQUESTS_ENTRIES) {
         bufferedRequests.offerLast(   mailboxExecutor.yield();
        }

        bufferedRequestEntries.add(elementConverter.apply(element, context));

    }


    /**
/ blocks if too many *async Arequests requestare orin singleflight
 request entries of a request may fail, eg, because of
   flush();
    }


   * network issues or service side throttling. All request entries that /**
     * Persists buffered RequestsEntries into the destination by invoking {@code
     * failedsubmitRequestEntries} with transientbatches failuresaccording need to bethe re-queued with this method souser specified
     * that aren't lost and can be retried later.buffering hints.
     *
     * <p>
The method blocks if too *many Requestasync entriesrequests that are causing the same error in a reproducible manner,flight.
     */
 eg, ill-formed request entries,private mustvoid notflush() bethrows re-queued but the error needs
InterruptedException {
        * to be handled in the logic of {@code submitRequestEntries}. Otherwisewhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {

     * these request entries will be retried indefinitely,// alwayscreate causinga the
batch of request entries that *should samebe error.
persisted in the   */destination
    protected void requeueFailedRequestEntry(RequestEntryT requestEntry) {
    ArrayList<RequestEntryT> batch = new bufferedRequests.offerFirst(requestEntryArrayList<>(MAX_BATCH_SIZE);

    }


      /**
  while (batch.size()  * In-flight requests may fail, but they will be retried if the sink is
<= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
               * stilltry healthy.{
     * <p>
     * To not lose any requests, there cannot be any outstanding in-flight batch.add(bufferedRequestEntries.remove());
     * requests when a commit is initialized. To this end, all in-flight
} catch (NoSuchElementException e) {
 * requests need to be completed as part of the pre commit.
     */
   // @Override
if there are not public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException {

enough elements, just create a smaller batch
         // reuse current inFlightRequests as commitable and create an empty queue break;
          //  to avoid copy and clearing}
        List<Collection<CompletableFuture<?>>>  committable = Collections.singletonList(inFlightRequests); }

        // all in-flight  requestsResultFuture<RequestEntryT> arerequestResult handled=
 by the AsyncSinkCommiter and new 
        // elements cannot be added to thefailedRequestEntries queue during a commit, so it's save to 
-> mailboxExecutor.execute(
               // create a new queue
        inFlightRequests =() new-> ConcurrentLinkedQueue<>completeRequest(failedRequestEntries);,

          return committable;
    }



       /**
     * All"Mark in-flight requestsrequest haveas been completed, butand thererequeue may%d stillrequest be
entries",
        * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
failedRequestEntries.size());

            while (inFlightRequestsCount >= MAX_IN_FLIGHT_REQUESTS) {
                mailboxExecutor.yield();
            }

           return Collections.singletonList(bufferedRequests) inFlightRequestsCount++;
            submitRequestEntries(batch, requestResult);
        }
    }

Limitations

...



    /**
     * Marks an in-flight request as completed and prepends failed requestEntries back to the
     * internal requestEntry buffer for later retry.
     *
     * @param failedRequestEntries requestEntries that need to be retried
     */
    private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
        inFlightRequestsCount--;

        // By just iterating through failedRequestEntries, it reverses the order of the
        // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
        // does not make any order guarantees, but may cause avoidable reorderings for other
        // destinations.
        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
    }


    /**
     * In flight requests will be retried if the sink is still healthy. But if in-flight requests
     * fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint,
     * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
     * outstanding in-flight requests when a commit is initialized.
     *
     * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
     */
    @Override
    public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
        if (flush) {
            flush();
        }

        // wait until all in-flight requests completed
        while (inFlightRequestsCount > 0) {
            mailboxExecutor.yield();
        }

        return Collections.emptyList();
    }


    /**
     * All in-flight requests have been completed, but there may still be
     * request entries in the internal buffer that are yet to be sent to the
     * endpoint. These request entries are stored in the snapshot state so that
     * they don't get lost in case of a failure/restart of the application.
     */
    @Override
    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
        return Collections.singletonList(bufferedRequestEntries);
    }

Limitations

  • The sink is designed for destinations that provide an async client. Destinations that cannot ingest events in an async fashion cannot be supported by the sink.
  • The sink usually persist InputTs in the order they are added to the sink, but reorderings may occur, eg, when RequestEntryTs need to be retried.

  • We are not focusing on support for exactly-once semantics beyond simple upsert capable and idempotent destinations at this point.

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
                .builder()
                .data(SdkBytes.fromUtf8String(element.toString()))
                .partitionKey(String.valueOf(element.hashCode()))
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API using the KinesisAsyncClient. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamWriter
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {

    @Override
    protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {

        // create a batch request
        PutRecordsRequest batchRequest = PutRecordsRequest
                .builder()
                .records(requestEntries)
                .streamName(streamName)
                .build();

        // call api with batch request

Appendix A – Simplified example implementation of a sink for Kinesis Data Streams

Example ElementConverter for Kinesis Data Streams

This is the functionality that needs to be implemented by end users of the sink. It specifies how an element of a DataStream is mapped to a PutRecordsRequestEntry that can be submitted to the Kinesis Data Streams API.

Code Block
languagejava
titleElementConverter for Kinesis Data Streams

new ElementConverter<InputT, PutRecordsRequestEntry>() {
    @Override
    public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
        return PutRecordsRequestEntry
        CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);

     .builder()
   // re-queue elements of failed requests
        future.datawhenComplete(SdkBytes.fromUtf8String(element.toString()))(response, err) -> {
                .partitionKeyif (Stringresponse.valueOffailedRecordCount(element.hashCode()))) > 0) {
                .build();
    }
}

Simplified AsyncSinkWriter for Kinesis Data Streams

This is a simplified sample implementation of the AsyncSinkWriter for Kinesis Data Streams. Given a set of buffered PutRecordRequestEntries, it creates and submits a batch request against the Kinesis Data Stream API. The response of the API call is then checked for events that were not persisted successfully (eg, because of throttling or network failures) and those events are added back to the internal buffer of the AsyncSinkWriter.

Code Block
languagejava
titleAmazonKinesisDataStreamSink
public class AmazonKinesisDataStreamSink<InputT> extends AsyncSink<InputT, PutRecordsRequestEntry> {

    @Override
    protected CompletableFuture<?>
    submitRequestEntries(List<PutRecordsRequestEntry> requestEntries) {

    ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
                    List<PutRecordsResultEntry> records = response.records();
    
         // create a batch request
       for PutRecordsRequest(int batchRequesti = PutRecordsRequest
0; i < records.size(); i++) {
           .builder()
                .records(requestEntries)
   if (records.get(i).errorCode() != null) {
             .streamName(streamName)
                  failedRequestEntries.add(requestEntries.buildget(i));

        // call api with batch request
        CompletableFuture<PutRecordsResponse> future = }
            client.putRecords(batchRequest);

        // re-queue elements of failed requests}
    
        CompletableFuture<PutRecordsResponse> handleResponse = future
            requestResult.whenComplete((response, err) -> {
  complete(failedRequestEntries);
              if (response.failedRecordCount() > 0) } else {
                    List<PutRecordsResultEntry> records = response.records();

requestResult.complete(Collections.emptyList());
                }

             for (int i = 0; i < records.size(); i++) {//TODO: handle errors of the entire request...
            });
    }

        if (records.get(i).errorCode() != null) {
                            requeueFailedRequest(requests.get(i));
    ...
}


Rejected Alternatives

The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But the benefits are too vague at this point and it would require substantial changes to the sink API. We'll hence start with a blocking implementation of the write function and see how far we get.

Code Block
languagejava
titleisAvailable pattern

    /**
     * Signals if enough RequestEntryTs have been buffered according to the   user
  }
   * specified buffering hints to make a request against the destination. This
     * }
functionality will be added to the sink interface by means of an
     }

* additional FLIP.
     *
     * @return a future //TODO: handle errors of the entire request...
  that will be completed once a request against the
     * destination can be  });made

        *//
 return future to trackpublic completionCompletableFuture<Void> of async requestisAvailable() {
        return handleResponse;...
    }

}