Status
...
Page properties | |
---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
...
JIRA: tbd
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
There are two user-facing aspects of the generic sink. First, an abstract class that is used to implement a new sink for a concrete destination. Second, the interface that is used by end-users, who want to leverage an existing sink to persist events in a destination. Appendix A contains a simplified sample implementation for a Kinesis Data Stream sink.
The async sink is based on FLIP-143 . and FLIP-177. It is based on the following generic types to be extensible and remain agnostic to the destination.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements SinkWriter<InputT, Collection<CompletableFuture<?>>Void, Collection<RequestEntryT>> { /** * This method specifies how to persist buffered request entries into the * destination. It is implemented when support for a new destination is * added. * <p> * The method is invoked with a set of request entries according to the * buffering hints (and the valid limits of the destination). The logic then * needs to create and execute the request against the destination (ideally * by batching together multiple request entries to increase efficiency). * The logic also needs to identify individual request entries that were not * persisted successfully and resubmit them using the {@code * requeueFailedRequestEntryrequestResult} method. * <p> * During checkpointing, the sink needs to ensure that there are no * outstanding in-flight requests. * * @param requestEntries a set of request entries that should be sent to the * destination * @param requestResult a ResultFuture that needs to be completed once all * request entries that have been passed to the method * on invocation have either been successfully * persisted in the destination or have been * re-queued * @return a future that completes when all request entries have been * successfully persisted to the API or were re-queued */ protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries, ResultFuture<?>ResultFuture<RequestEntryT> requestResult); ... } |
Internally, the AsyncSinkWriter
buffers RequestEntryT
s and invokes the submitRequestEntries
method with a set of RequestEntryT
s according to user specified buffering hints. The AsyncSinkWriter
also tracks in-flight requests, ie, calls to the API that have been sent but not completed. During a commit, the sink enforces that all in-flight requests have completed and currently buffered RequestEntryT
s are persisted in the application state snapshot.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The ElementConverter provides a mapping between for the elements of a * stream to request entries that can be sent to the destination. * <p> * The resulting request entry is buffered by the AsyncSinkWriter and sent * to the destination when the {@code submitRequestEntries} method is * invoked. */ private final ElementConverter<InputT, RequestEntryT> elementConverter; /** * Buffer to hold request entries that should be persisted into the * destination. * <p> * A request entry contain all relevant details to make a call to the * destination. Eg, for Kinesis Data Streams a request entry contains the * payload and partition key. * <p> * It seems more natural to buffer InputT, ie, the events that should be * persisted, rather than RequestEntryT. However, in practice, the response * of a failed request call can make it very hard, if not impossible, to * reconstruct the original event. It is much easier, to just construct a * new (retry) request entry from the response and add that back to the * queue for later retry. */ private final BlockingDeque<RequestEntryT>Deque<RequestEntryT> bufferedRequestEntries = new LinkedBlockingDeque<>ArrayDeque<>(...); /** * Tracks all pending async calls that have been executed since the last * checkpoint. Calls that already completed (successfully or unsuccessfully) are * are automatically removeddecrementing from the queuecounter. Any request entry that was not * successfully persisted needneeds to be handled and retried by the logic in * {@code submitRequestsToApi}. * <p> * There is a limit on the number of concurrent (async) requests that can be * handled by the client library. This limit is enforced by checking the * size of this queue before issuing new requests. * <p> * To complete a checkpoint, we need to make sure that no requests are in * flight, as they may fail, which could then lead to data loss. */ private BlockingDeque<CompletableFuture<?>> inFlightRequests = new LinkedBlockingDeque<>(...)int inFlightRequestsCount; @Override public void write(InputT element, Context context) throws IOException, InterruptedException { // blocks if too many eventselements have been buffered while (bufferedRequestEntries.putLast(elementConverter.apply(element, context)); size() >= MAX_BUFFERED_REQUESTS_ENTRIES) { // blocks if too many async requests are in flight mailboxExecutor.yield(); } if (/*buffering hints are met*/) { flush(); } bufferedRequestEntries.add(elementConverter.apply(element, context)); } /** / blocks if too many *async Therequests entireare requestin mayflight fail or single request entries that are part of flush(); } /** the request may not be* persistedPersists successfully,buffered eg,RequestsEntries becauseinto ofthe network destination by invoking {@code * issues or service* sidesubmitRequestEntries} throttling.with Allbatches requestaccording entriesto thatthe faileduser withspecified * transientbuffering failureshints. need to be re-queued with* this method so that aren't * The method blocks if *too lostmany andasync canrequests beare retriedin laterflight. * <p>/ private *void Requestflush() entriesthrows thatInterruptedException are{ causing the same error in a reproducible manner, * eg, ill-formed request entries, must not be re-queued but the error needs * to be handledwhile (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { // create a batch of request entries that should be persisted in the logicdestination of {@code submitRequestEntries}. Otherwise * these requestArrayList<RequestEntryT> entriesbatch will= be retried indefinitely, always causing the new ArrayList<>(MAX_BATCH_SIZE); * same error. while (batch.size() <= */ protected void requeueFailedRequestEntry(RequestEntryT requestEntry) throws InterruptedException { MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) { bufferedRequestEntries.putFirst(requestEntry); } try /**{ * Persists buffered RequestsEntries into the destination by invoking {@code * submitRequestEntries} with batches according to the user specified batch.add(bufferedRequestEntries.remove()); * buffering hints. } catch (NoSuchElementException e) *{ * The method blocks if too many async requests are in flight. // if */ there are not enough privateelements, voidjust flush()create throwsa InterruptedExceptionsmaller {batch while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) { break; // create} a batch of request entries that should be persisted in the destination} ArrayList<RequestEntryT>ResultFuture<RequestEntryT> batchrequestResult = new ArrayList<>(MAX_BATCH_SIZE); for (int i = 0; i < MAX_BATCH_SIZE; i++) {failedRequestEntries -> mailboxExecutor.execute( try { () -> completeRequest(failedRequestEntries), batch.add(bufferedRequestEntries.remove()); } catch (NoSuchElementException e) { "Mark in-flight request as completed and requeue %d request entries", // if there are not enough elements, just create a smaller batch failedRequestEntries.size()); while (inFlightRequestsCount >= break;MAX_IN_FLIGHT_REQUESTS) { }mailboxExecutor.yield(); } inFlightRequestsCount++; // call the destination specific code that actually persists the request entriessubmitRequestEntries(batch, requestResult); } CompletableFuture<?> future = submitRequestEntries(batch); } /** * Marks an in-flight request as completed //and keepprepends trackfailed ofrequestEntries inback flightto requestthe * internal requestEntry buffer for later inFlightRequests.put(future); retry. * * @param failedRequestEntries //requestEntries removethat theneed requestto frombe theretried tracking queue once it competed*/ private void completeRequest(Collection<RequestEntryT> failedRequestEntries) { future.whenComplete((response, err) -> { inFlightRequestsCount--; // By just iterating through failedRequestEntries, it reverses the order inFlightRequests.remove(future);of the // failedRequestEntries. It }); }doesn't make a difference for kinesis:putRecords, as the api } /** / does not make any * In-flight requests may fail, but they will be retried if the sink isorder guarantees, but may cause avoidable reorderings for other // destinations. * still healthy. failedRequestEntries.forEach(bufferedRequestEntries::addFirst); } /* <p>* * ToIn notflight lose any requests, there cannot be any outstandingrequests will be retried if the sink is still healthy. But if in-flight requests * requestsfail whenafter a commit is initialized. To this end, all in-flightcheckpoint has been triggered and Flink needs to recover from the checkpoint, * requeststhe need to be completed as part of the pre commit. */(failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any @Override * outstanding in-flight publicrequests List<Collection<CompletableFuture<?>>> prepareCommit(boolean flush) throws IOException { when a commit is initialized. * // reuse current* inFlightRequests<p>To asthis commitableend, andall createin-flight anrequests emptyneed queueto completed before proceeding with the commit. // to avoid copy and clearing */ @Override public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException { List<Collection<CompletableFuture<?>>> committable =if Collections.singletonList(inFlightRequestsflush); { // all in-flight requests are handled by the AsyncSinkCommiter and new flush(); } // elementswait cannotuntil beall addedin-flight torequests thecompleted queue during a commit, so it's save towhile (inFlightRequestsCount > 0) { // create a new queue mailboxExecutor.yield(); inFlightRequests = new LinkedBlockingDeque<>();} return committableCollections.emptyList(); } /** * All in-flight requests have been completed, but there may still be * request entries in the internal buffer that are yet to be sent to the * endpoint. These request entries are stored in the snapshot state so that * they don't get lost in case of a failure/restart of the application. */ @Override public List<Collection<RequestEntryT>> snapshotState() throws IOException { return Collections.singletonList(bufferedRequestEntries); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> { @Override protected CompletableFuture<?>void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) { // create a batch requestrequest PutRecordsRequest batchRequest = PutRecordsRequest .builder() .records(requestEntries) .streamName(streamName) .build(); // call api with batch request CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest); PutRecordsRequest batchRequest = PutRecordsRequest // re-queue elements of failed requests future.whenComplete((response, err) -> { .builder() if (response.recordsfailedRecordCount(requestEntries) > 0) { .streamName(streamName) ArrayList<PutRecordsRequestEntry> failedRequestEntries = .buildnew ArrayList<>(response.failedRecordCount()); // call api with batch request CompletableFuture<PutRecordsResponse>List<PutRecordsResultEntry> futurerecords = clientresponse.putRecordsrecords(batchRequest); // re-queue elements of failed requests for CompletableFuture<PutRecordsResponse>(int handleResponsei = future 0; i < records.size(); i++) { .whenComplete((response, err) -> { if (response.failedRecordCountrecords.get(i).errorCode() >!= 0null) { List<PutRecordsResultEntry> records = response.records(failedRequestEntries.add(requestEntries.get(i)); for (int i = 0; i < records.size(); i++) { } if (records.get(i).errorCode() != null) { } requeueFailedRequest(requestEntriesrequestResult.getcomplete(ifailedRequestEntries)); } }else { }requestResult.complete(Collections.emptyList()); } //TODO: handle errors of the entire request... }); // return future to track completion of async request return handleResponse; } ... } |
Rejected Alternatives
...