Status
...
Page properties | ||
---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache
...
JIRA: tbd
Released: tbd
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently.
...
There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink.
The async sink is based on FLIP-143 and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> { /** * This method specifies how to persist buffered request entries into the * destination. It is implemented when support for a new destination is * added. * <p> * The method is invoked with a set of request entries according to the * buffering hints (and the valid limits of the destination). The logic then * needs to create and execute the request against the destination (ideally * by batching together multiple request entries to increase efficiency). * The logic also needs to identify individual request entries that were not * persisted successfully and resubmit them using the {@code * requeueFailedRequestEntryrequestResult} method. * <p> * TheDuring methodcheckpointing, returnsthe asink futureneeds thatto indicates,ensure oncethat completed,there thatare allno * requestoutstanding entries that have been passed to the method on invocation have * either been successfully persisted in the destination or have beenin-flight requests. * * @param requestEntries a set of request entries that should be sent to the * re-queued. * <p> * During checkpointing, the sink needs to ensure that there are nodestination * outstanding in-flight requests. Ie,@param requestResult a ResultFuture that allneeds to futuresbe returnedcompleted byonce thisall * method are completed. * * @param requestEntries a set ofrequest requestsentries that shouldhave bebeen sentpassed to the APImethod * endpoint on invocation have either * @return a future that completes when all request entriesbeen successfully * persisted in the destination or have been * successfully persisted to the API or were re-queued */ protected abstract CompletableFuture<?>void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult); ... } |
Internally, the AsyncSinkWriter
buffers RequestEntryT
s and invokes the submitRequestEntries
method with a set of RequestEntryT
s according to user specified buffering hints. The AsyncSinkWriter
also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryT
s are persisted in the application state snapshot..
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* The ElementConverter provides a mapping between for the elements of a
* stream to request entries that can be sent to the destination.
* <p>
* The resulting request entry is buffered by the AsyncSinkWriter and sent
* to the destination when the {@code submitRequestEntries} method is
* invoked.
*/
private final ElementConverter<InputT, RequestEntryT> elementConverter;
| ||||
Code Block | ||||
| ||||
/** * The ElementConverter provides a mapping between for the elements of a Buffer to hold request entries that should be persisted into the * destination. * <p> * stream to request entries that can be sent A request entry contain all relevant details to make a call to the destination. * <p> destination. Eg, for Kinesis Data *Streams The resultinga request entry iscontains buffered by the AsyncSinkWriter and sent * payload toand thepartition destinationkey. when the {@code submitRequestEntries} method* is<p> * invoked. It seems more natural to */ buffer InputT, ie, the privateevents finalthat ElementConverter<InputT,should RequestEntryT> elementConverter; be /** persisted, rather than RequestEntryT. *However, Bufferin topractice, holdthe requestresponse entries that should be persisted* intoof the a failed request call can *make destination. it very hard, if not *impossible, <p>to * Areconstruct the requestoriginal entryevent. containIt allis relevantmuch detailseasier, to makejust construct a call to the * destination. Eg, for Kinesis Data Streams a new (retry) request entry containsfrom the response and add that back *to payloadthe and partition key. * queue for *later <p>retry. */ It seems moreprivate naturalfinal toDeque<RequestEntryT> bufferbufferedRequestEntries InputT,= ie, the events that should benew ArrayDeque<>(); /** * persisted,Tracks ratherall thanpending RequestEntryT.async However,calls inthat practice,have thebeen response executed since the * of a failed request call can make it very hard, if not impossible, tolast * checkpoint. Calls that completed (successfully or unsuccessfully) are * reconstructautomatically decrementing the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to thecounter. Any request entry that was not * successfully persisted needs to be handled and retried by the logic in * {@code submitRequestsToApi}. * queue for later retry.<p> */ There is a privatelimit finalon BlockingDeque<RequestEntryT>the bufferedRequestEntriesnumber =of newconcurrent LinkedBlockingDeque<>(...); /**async) requests that can be * Trackshandled by allthe pendingclient asynclibrary. callsThis thatlimit haveis beenenforced executedby sincechecking the last * checkpoint.size Callsof thatthis alreadyqueue completedbefore (successfullyissuing ornew unsuccessfully)requests. * are<p> automatically removed from the* queue.To Anycomplete requesta entrycheckpoint, thatwe wasneed not to make sure that no *requests successfullyare persistedin need to be handled and* retriedflight, byas thethey logicmay in fail, which could then lead *to {@codedata submitRequestsToApi}loss. */ <p> private int *inFlightRequestsCount; There is a limit@Override on the number ofpublic concurrentvoid write(async)InputT requestselement, thatContext cancontext) be throws IOException, InterruptedException { * handled by the client library. This limit is// enforcedblocks byif checkingtoo the many elements have been buffered * size of this queue before issuing newwhile requests. * <p> (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) { * To complete a checkpoint, we need to make sure that no requests are in mailboxExecutor.yield(); } * flight, as they may fail, which could then lead to data loss. */ bufferedRequestEntries.add(elementConverter.apply(element, context)); // blocks if too many async requests are in flight private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...flush(); } /** @Override * Persists buffered RequestsEntries publicinto voidthe write(InputT element, Context context) throws IOException, InterruptedException { destination by invoking {@code * submitRequestEntries} with batches according to the user specified * buffering bufferedRequestEntries.putLast(elementConverter.apply(element, context)); // for now, flush will check if enough events have been buffered flush(); } /**hints. * * The method blocks if too many async requests are in flight. */ The entire requestprivate mayvoid failflush() orthrows singleInterruptedException request{ entries that are part of while * the request may not be persisted successfully, eg, because of network * issues or service side throttling. All request entries that failed with(bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the destination * transient failures need to be re-queued withArrayList<RequestEntryT> thisbatch method= so that aren'tnew ArrayList<>(MAX_BATCH_SIZE); * lost and can be retriedwhile later. * <p>(batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) { * Request entries that are causing the same error in a reproducibletry manner,{ * eg, ill-formed request entries, must not be re-queued but the error needs batch.add(bufferedRequestEntries.remove()); * to be handled in the logic of {@code submitRequestEntries}. Otherwise } catch *(NoSuchElementException thesee) request{ entries will be retried indefinitely, always causing the * same error. *// if there are not protectedenough void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException {elements, just create a smaller batch bufferedRequestEntries.putFirst(requestEntry); } /** break; * Persists buffered RequestsEntries into the destination by invoking {@code * submitRequestEntries} with batches according to the user specified * buffering hints.} */ public void flush() throwsResultFuture<RequestEntryT> InterruptedExceptionrequestResult {= while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { failedRequestEntries -> mailboxExecutor.execute( // create a batch of request entries that should be persisted in the destination ArrayList<RequestEntryT> batch =() new-> ArrayList<>(MAX_BATCH_SIZE); completeRequest(failedRequestEntries), for (int i = 0; i < MAX_BATCH_SIZE; i++) { "Mark in-flight request as completed and requeue %d request entries", try { batchfailedRequestEntries.addsize(bufferedRequestEntries.remove()); while (inFlightRequestsCount } catch (NoSuchElementException e>= MAX_IN_FLIGHT_REQUESTS) { // if there are not enough elements, just create a smaller batch mailboxExecutor.yield(); } inFlightRequestsCount++; break; submitRequestEntries(batch, requestResult); } } /** } * Marks an in-flight request as completed and prepends failed requestEntries //back callto the destination specific code that actually* persists the request entries internal requestEntry buffer for later retry. * CompletableFuture<?> future = submitRequestEntries(batch); * @param failedRequestEntries requestEntries that need to be retried *// keep track of inprivate flight request void completeRequest(Collection<RequestEntryT> failedRequestEntries) { inFlightRequests.put(future)inFlightRequestsCount--; // By just iterating through failedRequestEntries, //it removereverses the requestorder fromof the tracking queue once it competed // failedRequestEntries. It doesn't make a difference future.whenComplete((response, err) -> {for kinesis:putRecords, as the api // does not make any order guarantees, inFlightRequests.remove(future); but may cause avoidable reorderings for other });// destinations. }failedRequestEntries.forEach(bufferedRequestEntries::addFirst); } /** * In- flight requests may fail, but they will be retried if the sink is * still healthy. But if * <p>in-flight requests * Tofail notafter losea anycheckpoint requests,has therebeen cannottriggered beand anyFlink outstanding in-flight * requests when a commit is initialized. To this end, allneeds to recover from the checkpoint, * the (failed) in-flight requests are gone and *cannot requestsbe needretried. toHence, bethere completedcannot asbe partany of the pre commit. * outstanding in-flight requests */ when a commit is @Overrideinitialized. public List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException { // reuse current inFlightRequests as commitable and create an empty queue * * <p>To this end, all in-flight requests need to completed before proceeding with the commit. */ @Override //public toList<Void> avoid copy and clearing prepareCommit(boolean flush) throws IOException, InterruptedException { List<Collection<CompletableFuture<?>>> committable =if Collections.singletonList(inFlightRequestsflush); { // all in-flight requests are handled by the AsyncSinkCommiter and new flush(); } // elements cannot be added to the queue during a commit, so it's save to wait until all in-flight requests completed while (inFlightRequestsCount > 0) { // create a new queue mailboxExecutor.yield(); inFlightRequests = new ConcurrentLinkedQueue<>(); } return committableCollections.emptyList(); } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) { // create a batch request PutRecordsRequest batchRequest = PutRecordsRequest .builder() .records(requestEntries) .streamName(streamName) .build(); // call api with batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); // re-queue elements of failed requests CompletableFuture<PutRecordsResponse> handleResponse = future future.whenComplete((response, err) -> { if (response.whenCompletefailedRecordCount((response, err) -> 0) { ArrayList<PutRecordsRequestEntry> failedRequestEntries = ifnew ArrayList<>(response.failedRecordCount() > 0) {)); List<PutRecordsResultEntry> records = response.records(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { requeueFailedRequestfailedRequestEntries.add(requestEntries.get(i)); } } } requestResult.complete(failedRequestEntries); } else { //TODO: handle errors of the entire requestrequestResult... complete(Collections.emptyList()); }); // return future to track completion of async request //TODO: handle errors of the entire request... return handleResponse }); } ... } |
Rejected Alternatives
The sink needs a way to build back pressure, eg, if the throughput limit of the destination is exceeded. Initially we were planning to adopt the isAvailable pattern from the source interface. But the benefits are too vague at this point and it would require substantial changes to the sink API. We'll hence start with a blocking implementation of the write
function and see how far we get.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Signals if enough RequestEntryTs have been buffered according to the user * specified buffering hints to make a request against the destination. This * functionality will be added to the sink interface by means of an * additional FLIP. * * @return a future that will be completed once a request against the * destination can be made */ public CompletableFuture<Void> isAvailable() { ... } |
...